Merge pull request #82 from binwiederhier/attachments

Attachments
This commit is contained in:
Philipp C. Heckel 2022-01-13 15:47:58 -05:00 committed by GitHub
commit bc16ef8480
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
36 changed files with 2161 additions and 341 deletions

1
.gitignore vendored
View file

@ -3,4 +3,5 @@ build/
.idea/
server/docs/
tools/fbsend/fbsend
playground/
*.iml

View file

@ -67,6 +67,12 @@ func New(config *Config) *Client {
}
// Publish sends a message to a specific topic, optionally using options.
// See PublishReader for details.
func (c *Client) Publish(topic, message string, options ...PublishOption) (*Message, error) {
return c.PublishReader(topic, strings.NewReader(message), options...)
}
// PublishReader sends a message to a specific topic, optionally using options.
//
// A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
// (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
@ -74,9 +80,9 @@ func New(config *Config) *Client {
//
// To pass title, priority and tags, check out WithTitle, WithPriority, WithTagsList, WithDelay, WithNoCache,
// WithNoFirebase, and the generic WithHeader.
func (c *Client) Publish(topic, message string, options ...PublishOption) (*Message, error) {
func (c *Client) PublishReader(topic string, body io.Reader, options ...PublishOption) (*Message, error) {
topicURL := c.expandTopicURL(topic)
req, _ := http.NewRequest("POST", topicURL, strings.NewReader(message))
req, _ := http.NewRequest("POST", topicURL, body)
for _, option := range options {
if err := option(req); err != nil {
return nil, err

View file

@ -16,6 +16,11 @@ type PublishOption = RequestOption
// SubscribeOption is an option that can be passed to a Client.Subscribe or Client.Poll call
type SubscribeOption = RequestOption
// WithMessage sets the notification message. This is an alternative way to passing the message body.
func WithMessage(message string) PublishOption {
return WithHeader("X-Message", message)
}
// WithTitle adds a title to a message
func WithTitle(title string) PublishOption {
return WithHeader("X-Title", title)
@ -50,6 +55,16 @@ func WithClick(url string) PublishOption {
return WithHeader("X-Click", url)
}
// WithAttach sets a URL that will be used by the client to download an attachment
func WithAttach(attach string) PublishOption {
return WithHeader("X-Attach", attach)
}
// WithFilename sets a filename for the attachment, and/or forces the HTTP body to interpreted as an attachment
func WithFilename(filename string) PublishOption {
return WithHeader("X-Filename", filename)
}
// WithEmail instructs the server to also send the message to the given e-mail address
func WithEmail(email string) PublishOption {
return WithHeader("X-Email", email)

View file

@ -5,6 +5,9 @@ import (
"fmt"
"github.com/urfave/cli/v2"
"heckel.io/ntfy/client"
"io"
"os"
"path/filepath"
"strings"
)
@ -21,6 +24,9 @@ var cmdPublish = &cli.Command{
&cli.StringFlag{Name: "tags", Aliases: []string{"tag", "T"}, Usage: "comma separated list of tags and emojis"},
&cli.StringFlag{Name: "delay", Aliases: []string{"at", "in", "D"}, Usage: "delay/schedule message"},
&cli.StringFlag{Name: "click", Aliases: []string{"U"}, Usage: "URL to open when notification is clicked"},
&cli.StringFlag{Name: "attach", Aliases: []string{"a"}, Usage: "URL to send as an external attachment"},
&cli.StringFlag{Name: "filename", Aliases: []string{"name", "n"}, Usage: "Filename for the attachment"},
&cli.StringFlag{Name: "file", Aliases: []string{"f"}, Usage: "File to upload as an attachment"},
&cli.StringFlag{Name: "email", Aliases: []string{"e-mail", "mail", "e"}, Usage: "also send to e-mail address"},
&cli.BoolFlag{Name: "no-cache", Aliases: []string{"C"}, Usage: "do not cache message server-side"},
&cli.BoolFlag{Name: "no-firebase", Aliases: []string{"F"}, Usage: "do not forward message to Firebase"},
@ -37,6 +43,9 @@ Examples:
ntfy pub --at=8:30am delayed_topic Laterzz # Send message at 8:30am
ntfy pub -e phil@example.com alerts 'App is down!' # Also send email to phil@example.com
ntfy pub --click="https://reddit.com" redd 'New msg' # Opens Reddit when notification is clicked
ntfy pub --attach="http://some.tld/file.zip" files # Send ZIP archive from URL as attachment
ntfy pub --file=flower.jpg flowers 'Nice!' # Send image.jpg as attachment
cat flower.jpg | ntfy pub --file=- flowers 'Nice!' # Same as above, send image.jpg as attachment
ntfy trigger mywebhook # Sending without message, useful for webhooks
Please also check out the docs on publishing messages. Especially for the --tags and --delay options,
@ -59,6 +68,9 @@ func execPublish(c *cli.Context) error {
tags := c.String("tags")
delay := c.String("delay")
click := c.String("click")
attach := c.String("attach")
filename := c.String("filename")
file := c.String("file")
email := c.String("email")
noCache := c.Bool("no-cache")
noFirebase := c.Bool("no-firebase")
@ -82,7 +94,13 @@ func execPublish(c *cli.Context) error {
options = append(options, client.WithDelay(delay))
}
if click != "" {
options = append(options, client.WithClick(email))
options = append(options, client.WithClick(click))
}
if attach != "" {
options = append(options, client.WithAttach(attach))
}
if filename != "" {
options = append(options, client.WithFilename(filename))
}
if email != "" {
options = append(options, client.WithEmail(email))
@ -93,8 +111,30 @@ func execPublish(c *cli.Context) error {
if noFirebase {
options = append(options, client.WithNoFirebase())
}
var body io.Reader
if file == "" {
body = strings.NewReader(message)
} else {
if message != "" {
options = append(options, client.WithMessage(message))
}
if file == "-" {
if filename == "" {
options = append(options, client.WithFilename("stdin"))
}
body = c.App.Reader
} else {
if filename == "" {
options = append(options, client.WithFilename(filepath.Base(file)))
}
body, err = os.Open(file)
if err != nil {
return err
}
}
}
cl := client.New(conf)
m, err := cl.Publish(topic, message, options...)
m, err := cl.PublishReader(topic, body, options...)
if err != nil {
return err
}

View file

@ -2,11 +2,13 @@ package cmd
import (
"errors"
"fmt"
"github.com/urfave/cli/v2"
"github.com/urfave/cli/v2/altsrc"
"heckel.io/ntfy/server"
"heckel.io/ntfy/util"
"log"
"math"
"time"
)
@ -20,6 +22,10 @@ var flagsServe = []cli.Flag{
altsrc.NewStringFlag(&cli.StringFlag{Name: "firebase-key-file", Aliases: []string{"F"}, EnvVars: []string{"NTFY_FIREBASE_KEY_FILE"}, Usage: "Firebase credentials file; if set additionally publish to FCM topic"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "cache-file", Aliases: []string{"C"}, EnvVars: []string{"NTFY_CACHE_FILE"}, Usage: "cache file used for message caching"}),
altsrc.NewDurationFlag(&cli.DurationFlag{Name: "cache-duration", Aliases: []string{"b"}, EnvVars: []string{"NTFY_CACHE_DURATION"}, Value: server.DefaultCacheDuration, Usage: "buffer messages for this time to allow `since` requests"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "attachment-cache-dir", EnvVars: []string{"NTFY_ATTACHMENT_CACHE_DIR"}, Usage: "cache directory for attached files"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "attachment-total-size-limit", Aliases: []string{"A"}, EnvVars: []string{"NTFY_ATTACHMENT_TOTAL_SIZE_LIMIT"}, DefaultText: "5G", Usage: "limit of the on-disk attachment cache"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "attachment-file-size-limit", Aliases: []string{"Y"}, EnvVars: []string{"NTFY_ATTACHMENT_FILE_SIZE_LIMIT"}, DefaultText: "15M", Usage: "per-file attachment size limit (e.g. 300k, 2M, 100M)"}),
altsrc.NewDurationFlag(&cli.DurationFlag{Name: "attachment-expiry-duration", Aliases: []string{"X"}, EnvVars: []string{"NTFY_ATTACHMENT_EXPIRY_DURATION"}, Value: server.DefaultAttachmentExpiryDuration, DefaultText: "3h", Usage: "duration after which uploaded attachments will be deleted (e.g. 3h, 20h)"}),
altsrc.NewDurationFlag(&cli.DurationFlag{Name: "keepalive-interval", Aliases: []string{"k"}, EnvVars: []string{"NTFY_KEEPALIVE_INTERVAL"}, Value: server.DefaultKeepaliveInterval, Usage: "interval of keepalive messages"}),
altsrc.NewDurationFlag(&cli.DurationFlag{Name: "manager-interval", Aliases: []string{"m"}, EnvVars: []string{"NTFY_MANAGER_INTERVAL"}, Value: server.DefaultManagerInterval, Usage: "interval of for message pruning and stats printing"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "smtp-sender-addr", EnvVars: []string{"NTFY_SMTP_SENDER_ADDR"}, Usage: "SMTP server address (host:port) for outgoing emails"}),
@ -29,8 +35,10 @@ var flagsServe = []cli.Flag{
altsrc.NewStringFlag(&cli.StringFlag{Name: "smtp-server-listen", EnvVars: []string{"NTFY_SMTP_SERVER_LISTEN"}, Usage: "SMTP server address (ip:port) for incoming emails, e.g. :25"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "smtp-server-domain", EnvVars: []string{"NTFY_SMTP_SERVER_DOMAIN"}, Usage: "SMTP domain for incoming e-mail, e.g. ntfy.sh"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "smtp-server-addr-prefix", EnvVars: []string{"NTFY_SMTP_SERVER_ADDR_PREFIX"}, Usage: "SMTP email address prefix for topics to prevent spam (e.g. 'ntfy-')"}),
altsrc.NewIntFlag(&cli.IntFlag{Name: "global-topic-limit", Aliases: []string{"T"}, EnvVars: []string{"NTFY_GLOBAL_TOPIC_LIMIT"}, Value: server.DefaultGlobalTopicLimit, Usage: "total number of topics allowed"}),
altsrc.NewIntFlag(&cli.IntFlag{Name: "global-topic-limit", Aliases: []string{"T"}, EnvVars: []string{"NTFY_GLOBAL_TOPIC_LIMIT"}, Value: server.DefaultTotalTopicLimit, Usage: "total number of topics allowed"}),
altsrc.NewIntFlag(&cli.IntFlag{Name: "visitor-subscription-limit", EnvVars: []string{"NTFY_VISITOR_SUBSCRIPTION_LIMIT"}, Value: server.DefaultVisitorSubscriptionLimit, Usage: "number of subscriptions per visitor"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "visitor-attachment-total-size-limit", EnvVars: []string{"NTFY_VISITOR_ATTACHMENT_TOTAL_SIZE_LIMIT"}, Value: "100M", Usage: "total storage limit used for attachments per visitor"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "visitor-attachment-daily-bandwidth-limit", EnvVars: []string{"NTFY_VISITOR_ATTACHMENT_DAILY_BANDWIDTH_LIMIT"}, Value: "500M", Usage: "total daily attachment download/upload bandwidth limit per visitor"}),
altsrc.NewIntFlag(&cli.IntFlag{Name: "visitor-request-limit-burst", EnvVars: []string{"NTFY_VISITOR_REQUEST_LIMIT_BURST"}, Value: server.DefaultVisitorRequestLimitBurst, Usage: "initial limit of requests per visitor"}),
altsrc.NewDurationFlag(&cli.DurationFlag{Name: "visitor-request-limit-replenish", EnvVars: []string{"NTFY_VISITOR_REQUEST_LIMIT_REPLENISH"}, Value: server.DefaultVisitorRequestLimitReplenish, Usage: "interval at which burst limit is replenished (one per x)"}),
altsrc.NewIntFlag(&cli.IntFlag{Name: "visitor-email-limit-burst", EnvVars: []string{"NTFY_VISITOR_EMAIL_LIMIT_BURST"}, Value: server.DefaultVisitorEmailLimitBurst, Usage: "initial limit of e-mails per visitor"}),
@ -69,6 +77,10 @@ func execServe(c *cli.Context) error {
firebaseKeyFile := c.String("firebase-key-file")
cacheFile := c.String("cache-file")
cacheDuration := c.Duration("cache-duration")
attachmentCacheDir := c.String("attachment-cache-dir")
attachmentTotalSizeLimitStr := c.String("attachment-total-size-limit")
attachmentFileSizeLimitStr := c.String("attachment-file-size-limit")
attachmentExpiryDuration := c.Duration("attachment-expiry-duration")
keepaliveInterval := c.Duration("keepalive-interval")
managerInterval := c.Duration("manager-interval")
smtpSenderAddr := c.String("smtp-sender-addr")
@ -78,8 +90,10 @@ func execServe(c *cli.Context) error {
smtpServerListen := c.String("smtp-server-listen")
smtpServerDomain := c.String("smtp-server-domain")
smtpServerAddrPrefix := c.String("smtp-server-addr-prefix")
globalTopicLimit := c.Int("global-topic-limit")
totalTopicLimit := c.Int("global-topic-limit")
visitorSubscriptionLimit := c.Int("visitor-subscription-limit")
visitorAttachmentTotalSizeLimitStr := c.String("visitor-attachment-total-size-limit")
visitorAttachmentDailyBandwidthLimitStr := c.String("visitor-attachment-daily-bandwidth-limit")
visitorRequestLimitBurst := c.Int("visitor-request-limit-burst")
visitorRequestLimitReplenish := c.Duration("visitor-request-limit-replenish")
visitorEmailLimitBurst := c.Int("visitor-email-limit-burst")
@ -105,6 +119,28 @@ func execServe(c *cli.Context) error {
return errors.New("if smtp-sender-addr is set, base-url, smtp-sender-user, smtp-sender-pass and smtp-sender-from must also be set")
} else if smtpServerListen != "" && smtpServerDomain == "" {
return errors.New("if smtp-server-listen is set, smtp-server-domain must also be set")
} else if attachmentCacheDir != "" && baseURL == "" {
return errors.New("if attachment-cache-dir is set, base-url must also be set")
}
// Convert sizes to bytes
attachmentTotalSizeLimit, err := parseSize(attachmentTotalSizeLimitStr, server.DefaultAttachmentTotalSizeLimit)
if err != nil {
return err
}
attachmentFileSizeLimit, err := parseSize(attachmentFileSizeLimitStr, server.DefaultAttachmentFileSizeLimit)
if err != nil {
return err
}
visitorAttachmentTotalSizeLimit, err := parseSize(visitorAttachmentTotalSizeLimitStr, server.DefaultVisitorAttachmentTotalSizeLimit)
if err != nil {
return err
}
visitorAttachmentDailyBandwidthLimit, err := parseSize(visitorAttachmentDailyBandwidthLimitStr, server.DefaultVisitorAttachmentDailyBandwidthLimit)
if err != nil {
return err
} else if visitorAttachmentDailyBandwidthLimit > math.MaxInt {
return fmt.Errorf("config option visitor-attachment-daily-bandwidth-limit must be lower than %d", math.MaxInt)
}
// Run server
@ -117,6 +153,10 @@ func execServe(c *cli.Context) error {
conf.FirebaseKeyFile = firebaseKeyFile
conf.CacheFile = cacheFile
conf.CacheDuration = cacheDuration
conf.AttachmentCacheDir = attachmentCacheDir
conf.AttachmentTotalSizeLimit = attachmentTotalSizeLimit
conf.AttachmentFileSizeLimit = attachmentFileSizeLimit
conf.AttachmentExpiryDuration = attachmentExpiryDuration
conf.KeepaliveInterval = keepaliveInterval
conf.ManagerInterval = managerInterval
conf.SMTPSenderAddr = smtpSenderAddr
@ -126,8 +166,10 @@ func execServe(c *cli.Context) error {
conf.SMTPServerListen = smtpServerListen
conf.SMTPServerDomain = smtpServerDomain
conf.SMTPServerAddrPrefix = smtpServerAddrPrefix
conf.GlobalTopicLimit = globalTopicLimit
conf.TotalTopicLimit = totalTopicLimit
conf.VisitorSubscriptionLimit = visitorSubscriptionLimit
conf.VisitorAttachmentTotalSizeLimit = visitorAttachmentTotalSizeLimit
conf.VisitorAttachmentDailyBandwidthLimit = int(visitorAttachmentDailyBandwidthLimit)
conf.VisitorRequestLimitBurst = visitorRequestLimitBurst
conf.VisitorRequestLimitReplenish = visitorRequestLimitReplenish
conf.VisitorEmailLimitBurst = visitorEmailLimitBurst
@ -143,3 +185,14 @@ func execServe(c *cli.Context) error {
log.Printf("Exiting.")
return nil
}
func parseSize(s string, defaultValue int64) (v int64, err error) {
if s == "" {
return defaultValue, nil
}
v, err = util.ParseSize(s)
if err != nil {
return 0, err
}
return v, nil
}

View file

@ -35,6 +35,43 @@ the message to the subscribers.
Subscribers can retrieve cached messaging using the [`poll=1` parameter](subscribe/api.md#poll-for-messages), as well as the
[`since=` parameter](subscribe/api.md#fetch-cached-messages).
## Attachments
If desired, you may allow users to upload and [attach files to notifications](publish.md#attachments). To enable
this feature, you have to simply configure an attachment cache directory and a base URL (`attachment-cache-dir`, `base-url`).
Once these options are set and the directory is writable by the server user, you can upload attachments via PUT.
By default, attachments are stored in the disk-cache **for only 3 hours**. The main reason for this is to avoid legal issues
and such when hosting user controlled content. Typically, this is more than enough time for the user (or the auto download
feature) to download the file. The following config options are relevant to attachments:
* `base-url` is the root URL for the ntfy server; this is needed for the generated attachment URLs
* `attachment-cache-dir` is the cache directory for attached files
* `attachment-total-size-limit` is the size limit of the on-disk attachment cache (default: 5G)
* `attachment-file-size-limit` is the per-file attachment size limit (e.g. 300k, 2M, 100M, default: 15M)
* `attachment-expiry-duration` is the duration after which uploaded attachments will be deleted (e.g. 3h, 20h, default: 3h)
Here's an example config using mostly the defaults (except for the cache directory, which is empty by default):
=== "/etc/ntfy/server.yml (minimal)"
``` yaml
base-url: "https://ntfy.sh"
attachment-cache-dir: "/var/cache/ntfy/attachments"
```
=== "/etc/ntfy/server.yml (all options)"
``` yaml
base-url: "https://ntfy.sh"
attachment-cache-dir: "/var/cache/ntfy/attachments"
attachment-total-size-limit: "5G"
attachment-file-size-limit: "15M"
attachment-expiry-duration: "3h"
visitor-attachment-total-size-limit: "100M"
visitor-attachment-daily-bandwidth-limit: "500M"
```
Please also refer to the [rate limiting](#rate-limiting) settings below, specifically `visitor-attachment-total-size-limit`
and `visitor-attachment-daily-bandwidth-limit`. Setting these conservatively is necessary to avoid abuse.
## E-mail notifications
To allow forwarding messages via e-mail, you can configure an **SMTP server for outgoing messages**. Once configured,
you can set the `X-Email` header to [send messages via e-mail](publish.md#e-mail-notifications) (e.g.
@ -124,7 +161,7 @@ which lets you use [AWS Route 53](https://aws.amazon.com/route53/) as the challe
HTTP challenge. I've found [this guide](https://nandovieira.com/using-lets-encrypt-in-development-with-nginx-and-aws-route53) to
be incredibly helpful.
### nginx/Apache2
### nginx/Apache2/caddy
For your convenience, here's a working config that'll help configure things behind a proxy. In this
example, ntfy runs on `:2586` and we proxy traffic to it. We also redirect HTTP to HTTPS for GET requests against a topic
or the root domain:
@ -153,6 +190,7 @@ or the root domain:
proxy_http_version 1.1;
proxy_buffering off;
proxy_request_buffering off;
proxy_redirect off;
proxy_set_header Host $http_host;
@ -161,6 +199,8 @@ or the root domain:
proxy_connect_timeout 3m;
proxy_send_timeout 3m;
proxy_read_timeout 3m;
client_max_body_size 20m; # Must be >= attachment-file-size-limit in /etc/ntfy/server.yml
}
}
@ -181,6 +221,7 @@ or the root domain:
proxy_http_version 1.1;
proxy_buffering off;
proxy_request_buffering off;
proxy_redirect off;
proxy_set_header Host $http_host;
@ -189,6 +230,8 @@ or the root domain:
proxy_connect_timeout 3m;
proxy_send_timeout 3m;
proxy_read_timeout 3m;
client_max_body_size 20m; # Must be >= attachment-file-size-limit in /etc/ntfy/server.yml
}
}
```
@ -239,6 +282,19 @@ or the root domain:
</VirtualHost>
```
=== "caddy"
```
# Note that this config is most certainly incomplete. Please help out and let me know what's missing
# via Discord/Matrix or in a GitHub issue.
ntfy.sh {
reverse_proxy 127.0.0.1:2586
}
http://nfty.sh {
reverse_proxy 127.0.0.1:2586
}
```
## Firebase (FCM)
!!! info
Using Firebase is **optional** and only works if you modify and [build your own Android .apk](develop.md#android-app).
@ -272,14 +328,23 @@ firebase-key-file: "/etc/ntfy/ntfy-sh-firebase-adminsdk-ahnce-9f4d6f14b5.json"
Otherwise, all visitors are rate limited as if they are one.
By default, ntfy runs without authentication, so it is vitally important that we protect the server from abuse or overload.
There are various limits and rate limits in place that you can use to configure the server. Let's do the easy ones first:
There are various limits and rate limits in place that you can use to configure the server:
* `global-topic-limit` defines the total number of topics before the server rejects new topics. It defaults to 5000.
* **Global limit**: A global limit applies across all visitors (IPs, clients, users)
* **Visitor limit**: A visitor limit only applies to a certain visitor. A **visitor** is identified by its IP address
(or the `X-Forwarded-For` header if `behind-proxy` is set). All config options that start with the word `visitor` apply
only on a per-visitor basis.
During normal usage, you shouldn't encounter these limits at all, and even if you burst a few requests or emails
(e.g. when you reconnect after a connection drop), it shouldn't have any effect.
### General limits
Let's do the easy limits first:
* `global-topic-limit` defines the total number of topics before the server rejects new topics. It defaults to 15,000.
* `visitor-subscription-limit` is the number of subscriptions (open connections) per visitor. This value defaults to 30.
A **visitor** is identified by its IP address (or the `X-Forwarded-For` header if `behind-proxy` is set). All config
options that start with the word `visitor` apply only on a per-visitor basis.
### Request limits
In addition to the limits above, there is a requests/second limit per visitor for all sensitive GET/PUT/POST requests.
This limit uses a [token bucket](https://en.wikipedia.org/wiki/Token_bucket) (using Go's [rate package](https://pkg.go.dev/golang.org/x/time/rate)):
@ -290,15 +355,24 @@ request every 10s (defined by `visitor-request-limit-replenish`)
* `visitor-request-limit-burst` is the initial bucket of requests each visitor has. This defaults to 60.
* `visitor-request-limit-replenish` is the rate at which the bucket is refilled (one request per x). Defaults to 10s.
### Attachment limits
Aside from the global file size and total attachment cache limits (see [above](#attachments)), there are two relevant
per-visitor limits:
* `visitor-attachment-total-size-limit` is the total storage limit used for attachments per visitor. It defaults to 100M.
The per-visitor storage is automatically decreased as attachments expire. External attachments (attached via `X-Attach`,
see [publishing docs](publish.md#attachments)) do not count here.
* `visitor-attachment-daily-bandwidth-limit` is the total daily attachment download/upload bandwidth limit per visitor,
including PUT and GET requests. This is to protect your precious bandwidth from abuse, since egress costs money in
most cloud providers. This defaults to 500M.
### E-mail limits
Similarly to the request limit, there is also an e-mail limit (only relevant if [e-mail notifications](#e-mail-notifications)
are enabled):
* `visitor-email-limit-burst` is the initial bucket of emails each visitor has. This defaults to 16.
* `visitor-email-limit-replenish` is the rate at which the bucket is refilled (one email per x). Defaults to 1h.
During normal usage, you shouldn't encounter these limits at all, and even if you burst a few requests or emails
(e.g. when you reconnect after a connection drop), it shouldn't have any effect.
## Tuning for scale
If you're running ntfy for your home server, you probably don't need to worry about scale at all. In its default config,
if it's not behind a proxy, the ntfy server can keep about **as many connections as the open file limit allows**.
@ -309,7 +383,7 @@ Depending on *how you run it*, here are a few limits that are relevant:
### For systemd services
If you're running ntfy in a systemd service (e.g. for .deb/.rpm packages), the main limiting factor is the
`LimitNOFILE` setting in the systemd unit. The default open files limit for `ntfy.service` is 10000. You can override it
`LimitNOFILE` setting in the systemd unit. The default open files limit for `ntfy.service` is 10,000. You can override it
by creating a `/etc/systemd/system/ntfy.service.d/override.conf` file. As far as I can tell, `/etc/security/limits.conf`
is not relevant.
@ -322,7 +396,7 @@ is not relevant.
### Outside of systemd
If you're running outside systemd, you may want to adjust your `/etc/security/limits.conf` file to
increase the `nofile` setting. Here's an example that increases the limit to 5000. You can find out the current setting
increase the `nofile` setting. Here's an example that increases the limit to 5,000. You can find out the current setting
by running `ulimit -n`, or manually override it temporarily by running `ulimit -n 50000`.
=== "/etc/security/limits.conf"
@ -404,7 +478,7 @@ CLI option (e.g. `--listen-http :80`. Here's a list of all available options. Al
variable before running the `ntfy` command (e.g. `export NTFY_LISTEN_HTTP=:80`).
| Config option | Env variable | Format | Default | Description |
|---|---|---|---|---|
|--------------------------------------------|-------------------------------------------------|------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `base-url` | `NTFY_BASE_URL` | *URL* | - | Public facing base URL of the service (e.g. `https://ntfy.sh`) |
| `listen-http` | `NTFY_LISTEN_HTTP` | `[host]:port` | `:80` | Listen address for the HTTP web server |
| `listen-https` | `NTFY_LISTEN_HTTPS` | `[host]:port` | - | Listen address for the HTTPS web server. If set, you also need to set `key-file` and `cert-file`. |
@ -414,6 +488,10 @@ variable before running the `ntfy` command (e.g. `export NTFY_LISTEN_HTTP=:80`).
| `cache-file` | `NTFY_CACHE_FILE` | *filename* | - | If set, messages are cached in a local SQLite database instead of only in-memory. This allows for service restarts without losing messages in support of the since= parameter. See [message cache](#message-cache). |
| `cache-duration` | `NTFY_CACHE_DURATION` | *duration* | 12h | Duration for which messages will be buffered before they are deleted. This is required to support the `since=...` and `poll=1` parameter. Set this to `0` to disable the cache entirely. |
| `behind-proxy` | `NTFY_BEHIND_PROXY` | *bool* | false | If set, the X-Forwarded-For header is used to determine the visitor IP address instead of the remote address of the connection. |
| `attachment-cache-dir` | `NTFY_ATTACHMENT_CACHE_DIR` | *directory* | - | Cache directory for attached files. To enable attachments, this has to be set. |
| `attachment-total-size-limit` | `NTFY_ATTACHMENT_TOTAL_SIZE_LIMIT` | *size* | 5G | Limit of the on-disk attachment cache directory. If the limits is exceeded, new attachments will be rejected. |
| `attachment-file-size-limit` | `NTFY_ATTACHMENT_FILE_SIZE_LIMIT` | *size* | 15M | Per-file attachment size limit (e.g. 300k, 2M, 100M). Larger attachment will be rejected. |
| `attachment-expiry-duration` | `NTFY_ATTACHMENT_EXPIRY_DURATION` | *duration* | 3h | Duration after which uploaded attachments will be deleted (e.g. 3h, 20h). Strongly affects `visitor-attachment-total-size-limit`. |
| `smtp-sender-addr` | `NTFY_SMTP_SENDER_ADDR` | `host:port` | - | SMTP server address to allow email sending |
| `smtp-sender-user` | `NTFY_SMTP_SENDER_USER` | *string* | - | SMTP user; only used if e-mail sending is enabled |
| `smtp-sender-pass` | `NTFY_SMTP_SENDER_PASS` | *string* | - | SMTP password; only used if e-mail sending is enabled |
@ -423,14 +501,17 @@ variable before running the `ntfy` command (e.g. `export NTFY_LISTEN_HTTP=:80`).
| `smtp-server-addr-prefix` | `NTFY_SMTP_SERVER_ADDR_PREFIX` | `[ip]:port` | - | Optional prefix for the e-mail addresses to prevent spam, e.g. `ntfy-` |
| `keepalive-interval` | `NTFY_KEEPALIVE_INTERVAL` | *duration* | 55s | Interval in which keepalive messages are sent to the client. This is to prevent intermediaries closing the connection for inactivity. Note that the Android app has a hardcoded timeout at 77s, so it should be less than that. |
| `manager-interval` | `$NTFY_MANAGER_INTERVAL` | *duration* | 1m | Interval in which the manager prunes old messages, deletes topics and prints the stats. |
| `global-topic-limit` | `NTFY_GLOBAL_TOPIC_LIMIT` | *number* | 5000 | Rate limiting: Total number of topics before the server rejects new topics. |
| `global-topic-limit` | `NTFY_GLOBAL_TOPIC_LIMIT` | *number* | 15,000 | Rate limiting: Total number of topics before the server rejects new topics. |
| `visitor-subscription-limit` | `NTFY_VISITOR_SUBSCRIPTION_LIMIT` | *number* | 30 | Rate limiting: Number of subscriptions per visitor (IP address) |
| `visitor-request-limit-burst` | `NTFY_VISITOR_REQUEST_LIMIT_BURST` | *number* | 60 | Allowed GET/PUT/POST requests per second, per visitor. This setting is the initial bucket of requests each visitor has |
| `visitor-request-limit-replenish` | `NTFY_VISITOR_REQUEST_LIMIT_REPLENISH` | *duration* | 10s | Strongly related to `visitor-request-limit-burst`: The rate at which the bucket is refilled |
| `visitor-email-limit-burst` | `NTFY_VISITOR_EMAIL_LIMIT_BURST` | *number* | 16 |Initial limit of e-mails per visitor |
| `visitor-email-limit-replenish` | `NTFY_VISITOR_EMAIL_LIMIT_REPLENISH` | *duration* | 1h | Strongly related to `visitor-email-limit-burst`: The rate at which the bucket is refilled |
| `visitor-attachment-total-size-limit` | `NTFY_VISITOR_ATTACHMENT_TOTAL_SIZE_LIMIT` | *size* | 100M | Rate limiting: Total storage limit used for attachments per visitor, for all attachments combined. Storage is freed after attachments expire. See `attachment-expiry-duration`. |
| `visitor-attachment-daily-bandwidth-limit` | `NTFY_VISITOR_ATTACHMENT_DAILY_BANDWIDTH_LIMIT` | *size* | 500M | Rate limiting: Total daily attachment download/upload traffic limit per visitor. This is to protect your bandwidth costs from exploding. |
| `visitor-request-limit-burst` | `NTFY_VISITOR_REQUEST_LIMIT_BURST` | *number* | 60 | Rate limiting: Allowed GET/PUT/POST requests per second, per visitor. This setting is the initial bucket of requests each visitor has |
| `visitor-request-limit-replenish` | `NTFY_VISITOR_REQUEST_LIMIT_REPLENISH` | *duration* | 10s | Rate limiting: Strongly related to `visitor-request-limit-burst`: The rate at which the bucket is refilled |
| `visitor-email-limit-burst` | `NTFY_VISITOR_EMAIL_LIMIT_BURST` | *number* | 16 | Rate limiting:Initial limit of e-mails per visitor |
| `visitor-email-limit-replenish` | `NTFY_VISITOR_EMAIL_LIMIT_REPLENISH` | *duration* | 1h | Rate limiting: Strongly related to `visitor-email-limit-burst`: The rate at which the bucket is refilled |
The format for a *duration* is: `<number>(smh)`, e.g. 30s, 20m or 1h.
The format for a *size* is: `<number>(GMK)`, e.g. 1G, 200M or 4000k.
## Command line options
```
@ -461,6 +542,10 @@ OPTIONS:
--firebase-key-file value, -F value Firebase credentials file; if set additionally publish to FCM topic [$NTFY_FIREBASE_KEY_FILE]
--cache-file value, -C value cache file used for message caching [$NTFY_CACHE_FILE]
--cache-duration since, -b since buffer messages for this time to allow since requests (default: 12h0m0s) [$NTFY_CACHE_DURATION]
--attachment-cache-dir value cache directory for attached files [$NTFY_ATTACHMENT_CACHE_DIR]
--attachment-total-size-limit value, -A value limit of the on-disk attachment cache (default: 5G) [$NTFY_ATTACHMENT_TOTAL_SIZE_LIMIT]
--attachment-file-size-limit value, -Y value per-file attachment size limit (e.g. 300k, 2M, 100M) (default: 15M) [$NTFY_ATTACHMENT_FILE_SIZE_LIMIT]
--attachment-expiry-duration value, -X value duration after which uploaded attachments will be deleted (e.g. 3h, 20h) (default: 3h) [$NTFY_ATTACHMENT_EXPIRY_DURATION]
--keepalive-interval value, -k value interval of keepalive messages (default: 55s) [$NTFY_KEEPALIVE_INTERVAL]
--manager-interval value, -m value interval of for message pruning and stats printing (default: 1m0s) [$NTFY_MANAGER_INTERVAL]
--smtp-sender-addr value SMTP server address (host:port) for outgoing emails [$NTFY_SMTP_SENDER_ADDR]
@ -470,8 +555,10 @@ OPTIONS:
--smtp-server-listen value SMTP server address (ip:port) for incoming emails, e.g. :25 [$NTFY_SMTP_SERVER_LISTEN]
--smtp-server-domain value SMTP domain for incoming e-mail, e.g. ntfy.sh [$NTFY_SMTP_SERVER_DOMAIN]
--smtp-server-addr-prefix value SMTP email address prefix for topics to prevent spam (e.g. 'ntfy-') [$NTFY_SMTP_SERVER_ADDR_PREFIX]
--global-topic-limit value, -T value total number of topics allowed (default: 5000) [$NTFY_GLOBAL_TOPIC_LIMIT]
--global-topic-limit value, -T value total number of topics allowed (default: 15000) [$NTFY_GLOBAL_TOPIC_LIMIT]
--visitor-subscription-limit value number of subscriptions per visitor (default: 30) [$NTFY_VISITOR_SUBSCRIPTION_LIMIT]
--visitor-attachment-total-size-limit value total storage limit used for attachments per visitor (default: "100M") [$NTFY_VISITOR_ATTACHMENT_TOTAL_SIZE_LIMIT]
--visitor-attachment-daily-bandwidth-limit value total daily attachment download/upload bandwidth limit per visitor (default: "500M") [$NTFY_VISITOR_ATTACHMENT_DAILY_BANDWIDTH_LIMIT]
--visitor-request-limit-burst value initial limit of requests per visitor (default: 60) [$NTFY_VISITOR_REQUEST_LIMIT_BURST]
--visitor-request-limit-replenish value interval at which burst limit is replenished (one per x) (default: 10s) [$NTFY_VISITOR_REQUEST_LIMIT_REPLENISH]
--visitor-email-limit-burst value initial limit of e-mails per visitor (default: 16) [$NTFY_VISITOR_EMAIL_LIMIT_BURST]

View file

@ -659,6 +659,166 @@ Here's an example that will open Reddit when the notification is clicked:
]));
```
## Attachments
You can **send images and other files to your phone** as attachments to a notification. The attachments are then downloaded
onto your phone (depending on size and setting automatically), and can be used from the Downloads folder.
There are two different ways to send attachments:
* sending [a local file](#attach-local-file) via PUT, e.g. from `~/Flowers/flower.jpg` or `ringtone.mp3`
* or by [passing an external URL](#attach-file-from-a-url) as an attachment, e.g. `https://f-droid.org/F-Droid.apk`
### Attach local file
To **send a file from your computer** as an attachment, you can send it as the PUT request body. If a message is greater
than the maximum message size (4,096 bytes) or consists of non UTF-8 characters, the ntfy server will automatically
detect the mime type and size, and send the message as an attachment file. To send smaller text-only messages or files
as attachments, you must pass a filename by passing the `X-Filename` header or query parameter (or any of its aliases
`Filename`, `File` or `f`).
By default, and how ntfy.sh is configured, the **max attachment size is 15 MB** (with 100 MB total per visitor).
Attachments **expire after 3 hours**, which typically is plenty of time for the user to download it, or for the Android app
to auto-download it. Please also check out the [other limits below](#limitations).
Here's an example showing how to upload an image:
=== "Command line (curl)"
```
curl \
-T flower.jpg \
-H "Filename: flower.jpg" \
ntfy.sh/flowers
```
=== "ntfy CLI"
```
ntfy publish \
--file=flower.jpg \
flowers
```
=== "HTTP"
``` http
PUT /flowers HTTP/1.1
Host: ntfy.sh
Filename: flower.jpg
Content-Type: 52312
<binary JPEG data>
```
=== "JavaScript"
``` javascript
fetch('https://ntfy.sh/flowers', {
method: 'PUT',
body: document.getElementById("file").files[0],
headers: { 'Filename': 'flower.jpg' }
})
```
=== "Go"
``` go
file, _ := os.Open("flower.jpg")
req, _ := http.NewRequest("PUT", "https://ntfy.sh/flowers", file)
req.Header.Set("Filename", "flower.jpg")
http.DefaultClient.Do(req)
```
=== "Python"
``` python
requests.put("https://ntfy.sh/flowers",
data=open("flower.jpg", 'rb'),
headers={ "Filename": "flower.jpg" })
```
=== "PHP"
``` php-inline
file_get_contents('https://ntfy.sh/flowers', false, stream_context_create([
'http' => [
'method' => 'PUT',
'header' =>
"Content-Type: application/octet-stream\r\n" . // Does not matter
"Filename: flower.jpg",
'content' => file_get_contents('flower.jpg') // Dangerous for large files
]
]));
```
Here's what that looks like on Android:
<figure markdown>
![image attachment](static/img/android-screenshot-attachment-image.png){ width=500 }
<figcaption>Image attachment sent from a local file</figcaption>
</figure>
### Attach file from a URL
Instead of sending a local file to your phone, you can use **an external URL** to specify where the attachment is hosted.
This could be a Google Drive or Dropbox link, or any other publicly available URL. The ntfy server will briefly probe
the URL to retrieve type and size for you. Since the files are externally hosted, the expiration or size limits from
above do not apply here.
To attach an external file, simple pass the `X-Attach` header or query parameter (or any of its aliases `Attach` or `a`)
to specify the attachment URL. It can be any type of file. Here's an example showing how to upload an image:
=== "Command line (curl)"
```
curl \
-X POST \
-H "Attach: https://f-droid.org/F-Droid.apk" \
ntfy.sh/mydownloads
```
=== "ntfy CLI"
```
ntfy publish \
--attach="https://f-droid.org/F-Droid.apk" \
mydownloads
```
=== "HTTP"
``` http
POST /mydownloads HTTP/1.1
Host: ntfy.sh
Attach: https://f-droid.org/F-Droid.apk
```
=== "JavaScript"
``` javascript
fetch('https://ntfy.sh/mydownloads', {
method: 'POST',
headers: { 'Attach': 'https://f-droid.org/F-Droid.apk' }
})
```
=== "Go"
``` go
req, _ := http.NewRequest("POST", "https://ntfy.sh/mydownloads", file)
req.Header.Set("Attach", "https://f-droid.org/F-Droid.apk")
http.DefaultClient.Do(req)
```
=== "Python"
``` python
requests.put("https://ntfy.sh/mydownloads",
headers={ "Attach": "https://f-droid.org/F-Droid.apk" })
```
=== "PHP"
``` php-inline
file_get_contents('https://ntfy.sh/mydownloads', false, stream_context_create([
'http' => [
'method' => 'PUT',
'header' =>
"Content-Type: text/plain\r\n" . // Does not matter
"Attach: https://f-droid.org/F-Droid.apk",
]
]));
```
<figure markdown>
![file attachment](static/img/android-screenshot-attachment-file.png){ width=500 }
<figcaption>File attachment sent from an external URL</figcaption>
</figure>
## E-mail notifications
You can forward messages to e-mail by specifying an address in the header. This can be useful for messages that
you'd like to persist longer, or to blast-notify yourself on all possible channels.
@ -927,17 +1087,33 @@ to `no`. This will instruct the server not to forward messages to Firebase.
]));
```
### UnifiedPush
!!! info
This setting is not relevant to users, only to app developers and people interested in [UnifiedPush](https://unifiedpush.org).
[UnifiedPush](https://unifiedpush.org) is a standard for receiving push notifications without using the Google-owned
[Firebase Cloud Messaging (FCM)](https://firebase.google.com/docs/cloud-messaging) service. It puts push notifications
in the control of the user. ntfy can act as a **UnifiedPush distributor**, forwarding messages to apps that support it.
When publishing messages to a topic, apps using ntfy as a UnifiedPush distributor can set the `X-UnifiedPush` header or query
parameter (or any of its aliases `unifiedpush` or `up`) to `1` to [disable Firebase](#disable-firebase). As of today, this
option is equivalent to `Firebase: no`, but was introduced to allow future flexibility.
## Limitations
There are a few limitations to the API to prevent abuse and to keep the server healthy. Most of them you won't run into,
There are a few limitations to the API to prevent abuse and to keep the server healthy. Almost all of these settings
are configurable via the server side [rate limiting settings](config.md#rate-limiting). Most of these limits you won't run into,
but just in case, let's list them all:
| Limit | Description |
|---|---|
| **Message length** | Each message can be up to 4096 bytes long. Longer messages are truncated. |
| **Requests** | By default, the server is configured to allow 60 requests at once, and then refills the your allowed requests bucket at a rate of one request per 10 seconds. You can read more about this in the [rate limiting](config.md#rate-limiting) section. |
| **E-mails** | By default, the server is configured to allow sending 16 e-mails at once, and then refills the your allowed e-mail bucket at a rate of one per hour. You can read more about this in the [rate limiting](config.md#rate-limiting) section. |
| **Subscription limits** | By default, the server allows each visitor to keep 30 connections to the server open. |
| **Total number of topics** | By default, the server is configured to allow 5,000 topics. The ntfy.sh server has higher limits though. |
|---------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **Message length** | Each message can be up to 4,096 bytes long. Longer messages are treated as [attachments](#attachments). |
| **Requests** | By default, the server is configured to allow 60 requests per visitor at once, and then refills the your allowed requests bucket at a rate of one request per 10 seconds. |
| **E-mails** | By default, the server is configured to allow sending 16 e-mails per visitor at once, and then refills the your allowed e-mail bucket at a rate of one per hour. |
| **Subscription limit** | By default, the server allows each visitor to keep 30 connections to the server open. |
| **Attachment size limit** | By default, the server allows attachments up to 15 MB in size, up to 100 MB in total per visitor and up to 5 GB across all visitors. |
| **Attachment expiry** | By default, the server deletes attachments after 3 hours and thereby frees up space from the total visitor attachment limit. |
| **Attachment bandwidth** | By default, the server allows 500 MB of GET/PUT/POST traffic for attachments per visitor in a 24 hour period. Traffic exceeding that is rejected. |
| **Total number of topics** | By default, the server is configured to allow 15,000 topics. The ntfy.sh server has higher limits though. |
## List of all parameters
The following is a list of all parameters that can be passed when publishing a message. Parameter names are **case-insensitive**,
@ -951,6 +1127,9 @@ and can be passed as **HTTP headers** or **query parameters in the URL**. They a
| `X-Tags` | `Tags`, `Tag`, `ta` | [Tags and emojis](#tags-emojis) |
| `X-Delay` | `Delay`, `X-At`, `At`, `X-In`, `In` | Timestamp or duration for [delayed delivery](#scheduled-delivery) |
| `X-Click` | `Click` | URL to open when [notification is clicked](#click-action) |
| `X-Attach` | `Attach`, `a` | URL to send as an [attachment](#attachments), as an alternative to PUT/POST-ing an attachment |
| `X-Filename` | `Filename`, `file`, `f` | Optional [attachment](#attachments) filename, as it appears in the client |
| `X-Email` | `X-E-Mail`, `Email`, `E-Mail`, `mail`, `e` | E-mail address for [e-mail notifications](#e-mail-notifications) |
| `X-Cache` | `Cache` | Allows disabling [message caching](#message-caching) |
| `X-Firebase` | `Firebase` | Allows disabling [sending to Firebase](#disable-firebase) |
| `X-UnifiedPush` | `UnifiedPush`, `up` | [UnifiedPush](#unifiedpush) publish option, currently equivalent to `Firebase: no` |

Binary file not shown.

After

Width:  |  Height:  |  Size: 52 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 156 KiB

1
go.mod
View file

@ -30,6 +30,7 @@ require (
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21 // indirect
github.com/envoyproxy/go-control-plane v0.10.1 // indirect
github.com/envoyproxy/protoc-gen-validate v0.6.2 // indirect
github.com/gabriel-vasile/mimetype v1.4.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.6 // indirect

3
go.sum
View file

@ -106,6 +106,8 @@ github.com/envoyproxy/go-control-plane v0.10.1/go.mod h1:AY7fTTXNdv/aJ2O5jwpxAPO
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v0.6.2 h1:JiO+kJTpmYGjEodY7O1Zk8oZcNz1+f30UtwtXoFUPzE=
github.com/envoyproxy/protoc-gen-validate v0.6.2/go.mod h1:2t7qjJNvHPx8IjnBOzl9E9/baC+qXE/TeeyBRzgJDws=
github.com/gabriel-vasile/mimetype v1.4.0 h1:Cn9dkdYsMIu56tGho+fqzh7XmvY2YyGU0FnbhiOsEro=
github.com/gabriel-vasile/mimetype v1.4.0/go.mod h1:fA8fi6KUiG7MgQQ+mEWotXoEOvmxRtOJlERCzSmRvr8=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
@ -323,6 +325,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210505024714-0287a6fb4125/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d h1:LO7XpTYMwTqxjLcGWPijK3vRXg1aWdlNOVOHRq45d7c=
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=

View file

@ -4,8 +4,6 @@ set -e
# Restart systemd service if it was already running. Note that "deb-systemd-invoke try-restart" will
# only act if the service is already running. If it's not running, it's a no-op.
#
# TODO: This is only tested on Debian.
#
if [ "$1" = "configure" ] || [ "$1" -ge 1 ]; then
if [ -d /run/systemd/system ]; then
# Create ntfy user/group

View file

@ -20,4 +20,6 @@ type cache interface {
Topics() (map[string]*topic, error)
Prune(olderThan time.Time) error
MarkPublished(m *message) error
AttachmentsSize(owner string) (int64, error)
AttachmentsExpired() ([]string, error)
}

View file

@ -125,6 +125,35 @@ func (c *memCache) Prune(olderThan time.Time) error {
return nil
}
func (c *memCache) AttachmentsSize(owner string) (int64, error) {
c.mu.Lock()
defer c.mu.Unlock()
var size int64
for topic := range c.messages {
for _, m := range c.messages[topic] {
counted := m.Attachment != nil && m.Attachment.Owner == owner && m.Attachment.Expires > time.Now().Unix()
if counted {
size += m.Attachment.Size
}
}
}
return size, nil
}
func (c *memCache) AttachmentsExpired() ([]string, error) {
c.mu.Lock()
defer c.mu.Unlock()
ids := make([]string, 0)
for topic := range c.messages {
for _, m := range c.messages[topic] {
if m.Attachment != nil && m.Attachment.Expires > 0 && m.Attachment.Expires < time.Now().Unix() {
ids = append(ids, m.ID)
}
}
}
return ids, nil
}
func (c *memCache) pruneTopic(topic string, olderThan time.Time) {
messages := make([]*message, 0)
for _, m := range c.messages[topic] {

View file

@ -25,6 +25,10 @@ func TestMemCache_Prune(t *testing.T) {
testCachePrune(t, newMemCache())
}
func TestMemCache_Attachments(t *testing.T) {
testCacheAttachments(t, newMemCache())
}
func TestMemCache_NopCache(t *testing.T) {
c := newNopCache()
assert.Nil(t, c.AddMessage(newDefaultMessage("mytopic", "my message")))

View file

@ -23,27 +23,36 @@ const (
priority INT NOT NULL,
tags TEXT NOT NULL,
click TEXT NOT NULL,
attachment_name TEXT NOT NULL,
attachment_type TEXT NOT NULL,
attachment_size INT NOT NULL,
attachment_expires INT NOT NULL,
attachment_url TEXT NOT NULL,
attachment_owner TEXT NOT NULL,
published INT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
COMMIT;
`
insertMessageQuery = `INSERT INTO messages (id, time, topic, message, title, priority, tags, click, published) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
insertMessageQuery = `
INSERT INTO messages (id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, published)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`
pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1`
selectMessagesSinceTimeQuery = `
SELECT id, time, topic, message, title, priority, tags, click
SELECT id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner
FROM messages
WHERE topic = ? AND time >= ? AND published = 1
ORDER BY time ASC
`
selectMessagesSinceTimeIncludeScheduledQuery = `
SELECT id, time, topic, message, title, priority, tags, click
SELECT id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner
FROM messages
WHERE topic = ? AND time >= ?
ORDER BY time ASC
`
selectMessagesDueQuery = `
SELECT id, time, topic, message, title, priority, tags, click
SELECT id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner
FROM messages
WHERE time <= ? AND published = 0
`
@ -51,6 +60,8 @@ const (
selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic`
selectAttachmentsSizeQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE attachment_owner = ? AND attachment_expires >= ?`
selectAttachmentsExpiredQuery = `SELECT id FROM messages WHERE attachment_expires > 0 AND attachment_expires < ?`
)
// Schema management queries
@ -82,7 +93,15 @@ const (
// 2 -> 3
migrate2To3AlterMessagesTableQuery = `
BEGIN;
ALTER TABLE messages ADD COLUMN click TEXT NOT NULL DEFAULT('');
ALTER TABLE messages ADD COLUMN attachment_name TEXT NOT NULL DEFAULT('');
ALTER TABLE messages ADD COLUMN attachment_type TEXT NOT NULL DEFAULT('');
ALTER TABLE messages ADD COLUMN attachment_size INT NOT NULL DEFAULT('0');
ALTER TABLE messages ADD COLUMN attachment_expires INT NOT NULL DEFAULT('0');
ALTER TABLE messages ADD COLUMN attachment_owner TEXT NOT NULL DEFAULT('');
ALTER TABLE messages ADD COLUMN attachment_url TEXT NOT NULL DEFAULT('');
COMMIT;
`
)
@ -110,7 +129,35 @@ func (c *sqliteCache) AddMessage(m *message) error {
return errUnexpectedMessageType
}
published := m.Time <= time.Now().Unix()
_, err := c.db.Exec(insertMessageQuery, m.ID, m.Time, m.Topic, m.Message, m.Title, m.Priority, strings.Join(m.Tags, ","), m.Click, published)
tags := strings.Join(m.Tags, ",")
var attachmentName, attachmentType, attachmentURL, attachmentOwner string
var attachmentSize, attachmentExpires int64
if m.Attachment != nil {
attachmentName = m.Attachment.Name
attachmentType = m.Attachment.Type
attachmentSize = m.Attachment.Size
attachmentExpires = m.Attachment.Expires
attachmentURL = m.Attachment.URL
attachmentOwner = m.Attachment.Owner
}
_, err := c.db.Exec(
insertMessageQuery,
m.ID,
m.Time,
m.Topic,
m.Message,
m.Title,
m.Priority,
tags,
m.Click,
attachmentName,
attachmentType,
attachmentSize,
attachmentExpires,
attachmentURL,
attachmentOwner,
published,
)
return err
}
@ -187,20 +234,69 @@ func (c *sqliteCache) Prune(olderThan time.Time) error {
return err
}
func (c *sqliteCache) AttachmentsSize(owner string) (int64, error) {
rows, err := c.db.Query(selectAttachmentsSizeQuery, owner, time.Now().Unix())
if err != nil {
return 0, err
}
defer rows.Close()
var size int64
if !rows.Next() {
return 0, errors.New("no rows found")
}
if err := rows.Scan(&size); err != nil {
return 0, err
} else if err := rows.Err(); err != nil {
return 0, err
}
return size, nil
}
func (c *sqliteCache) AttachmentsExpired() ([]string, error) {
rows, err := c.db.Query(selectAttachmentsExpiredQuery, time.Now().Unix())
if err != nil {
return nil, err
}
defer rows.Close()
ids := make([]string, 0)
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, err
}
ids = append(ids, id)
}
if err := rows.Err(); err != nil {
return nil, err
}
return ids, nil
}
func readMessages(rows *sql.Rows) ([]*message, error) {
defer rows.Close()
messages := make([]*message, 0)
for rows.Next() {
var timestamp int64
var timestamp, attachmentSize, attachmentExpires int64
var priority int
var id, topic, msg, title, tagsStr, click string
if err := rows.Scan(&id, &timestamp, &topic, &msg, &title, &priority, &tagsStr, &click); err != nil {
var id, topic, msg, title, tagsStr, click, attachmentName, attachmentType, attachmentURL, attachmentOwner string
if err := rows.Scan(&id, &timestamp, &topic, &msg, &title, &priority, &tagsStr, &click, &attachmentName, &attachmentType, &attachmentSize, &attachmentExpires, &attachmentURL, &attachmentOwner); err != nil {
return nil, err
}
var tags []string
if tagsStr != "" {
tags = strings.Split(tagsStr, ",")
}
var att *attachment
if attachmentName != "" && attachmentURL != "" {
att = &attachment{
Name: attachmentName,
Type: attachmentType,
Size: attachmentSize,
Expires: attachmentExpires,
URL: attachmentURL,
Owner: attachmentOwner,
}
}
messages = append(messages, &message{
ID: id,
Time: timestamp,
@ -211,6 +307,7 @@ func readMessages(rows *sql.Rows) ([]*message, error) {
Priority: priority,
Tags: tags,
Click: click,
Attachment: att,
})
}
if err := rows.Err(); err != nil {

View file

@ -29,6 +29,10 @@ func TestSqliteCache_Prune(t *testing.T) {
testCachePrune(t, newSqliteTestCache(t))
}
func TestSqliteCache_Attachments(t *testing.T) {
testCacheAttachments(t, newSqliteTestCache(t))
}
func TestSqliteCache_Migration_From0(t *testing.T) {
filename := newSqliteTestCacheFile(t)
db, err := sql.Open("sqlite3", filename)

View file

@ -1,7 +1,7 @@
package server
import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
"time"
)
@ -13,71 +13,71 @@ func testCacheMessages(t *testing.T, c cache) {
m2 := newDefaultMessage("mytopic", "my other message")
m2.Time = 2
assert.Nil(t, c.AddMessage(m1))
assert.Nil(t, c.AddMessage(newDefaultMessage("example", "my example message")))
assert.Nil(t, c.AddMessage(m2))
require.Nil(t, c.AddMessage(m1))
require.Nil(t, c.AddMessage(newDefaultMessage("example", "my example message")))
require.Nil(t, c.AddMessage(m2))
// Adding invalid
assert.Equal(t, errUnexpectedMessageType, c.AddMessage(newKeepaliveMessage("mytopic"))) // These should not be added!
assert.Equal(t, errUnexpectedMessageType, c.AddMessage(newOpenMessage("example"))) // These should not be added!
require.Equal(t, errUnexpectedMessageType, c.AddMessage(newKeepaliveMessage("mytopic"))) // These should not be added!
require.Equal(t, errUnexpectedMessageType, c.AddMessage(newOpenMessage("example"))) // These should not be added!
// mytopic: count
count, err := c.MessageCount("mytopic")
assert.Nil(t, err)
assert.Equal(t, 2, count)
require.Nil(t, err)
require.Equal(t, 2, count)
// mytopic: since all
messages, _ := c.Messages("mytopic", sinceAllMessages, false)
assert.Equal(t, 2, len(messages))
assert.Equal(t, "my message", messages[0].Message)
assert.Equal(t, "mytopic", messages[0].Topic)
assert.Equal(t, messageEvent, messages[0].Event)
assert.Equal(t, "", messages[0].Title)
assert.Equal(t, 0, messages[0].Priority)
assert.Nil(t, messages[0].Tags)
assert.Equal(t, "my other message", messages[1].Message)
require.Equal(t, 2, len(messages))
require.Equal(t, "my message", messages[0].Message)
require.Equal(t, "mytopic", messages[0].Topic)
require.Equal(t, messageEvent, messages[0].Event)
require.Equal(t, "", messages[0].Title)
require.Equal(t, 0, messages[0].Priority)
require.Nil(t, messages[0].Tags)
require.Equal(t, "my other message", messages[1].Message)
// mytopic: since none
messages, _ = c.Messages("mytopic", sinceNoMessages, false)
assert.Empty(t, messages)
require.Empty(t, messages)
// mytopic: since 2
messages, _ = c.Messages("mytopic", sinceTime(time.Unix(2, 0)), false)
assert.Equal(t, 1, len(messages))
assert.Equal(t, "my other message", messages[0].Message)
require.Equal(t, 1, len(messages))
require.Equal(t, "my other message", messages[0].Message)
// example: count
count, err = c.MessageCount("example")
assert.Nil(t, err)
assert.Equal(t, 1, count)
require.Nil(t, err)
require.Equal(t, 1, count)
// example: since all
messages, _ = c.Messages("example", sinceAllMessages, false)
assert.Equal(t, "my example message", messages[0].Message)
require.Equal(t, "my example message", messages[0].Message)
// non-existing: count
count, err = c.MessageCount("doesnotexist")
assert.Nil(t, err)
assert.Equal(t, 0, count)
require.Nil(t, err)
require.Equal(t, 0, count)
// non-existing: since all
messages, _ = c.Messages("doesnotexist", sinceAllMessages, false)
assert.Empty(t, messages)
require.Empty(t, messages)
}
func testCacheTopics(t *testing.T, c cache) {
assert.Nil(t, c.AddMessage(newDefaultMessage("topic1", "my example message")))
assert.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 1")))
assert.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 2")))
assert.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 3")))
require.Nil(t, c.AddMessage(newDefaultMessage("topic1", "my example message")))
require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 1")))
require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 2")))
require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 3")))
topics, err := c.Topics()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 2, len(topics))
assert.Equal(t, "topic1", topics["topic1"].ID)
assert.Equal(t, "topic2", topics["topic2"].ID)
require.Equal(t, 2, len(topics))
require.Equal(t, "topic1", topics["topic1"].ID)
require.Equal(t, "topic2", topics["topic2"].ID)
}
func testCachePrune(t *testing.T, c cache) {
@ -90,23 +90,23 @@ func testCachePrune(t *testing.T, c cache) {
m3 := newDefaultMessage("another_topic", "and another one")
m3.Time = 1
assert.Nil(t, c.AddMessage(m1))
assert.Nil(t, c.AddMessage(m2))
assert.Nil(t, c.AddMessage(m3))
assert.Nil(t, c.Prune(time.Unix(2, 0)))
require.Nil(t, c.AddMessage(m1))
require.Nil(t, c.AddMessage(m2))
require.Nil(t, c.AddMessage(m3))
require.Nil(t, c.Prune(time.Unix(2, 0)))
count, err := c.MessageCount("mytopic")
assert.Nil(t, err)
assert.Equal(t, 1, count)
require.Nil(t, err)
require.Equal(t, 1, count)
count, err = c.MessageCount("another_topic")
assert.Nil(t, err)
assert.Equal(t, 0, count)
require.Nil(t, err)
require.Equal(t, 0, count)
messages, err := c.Messages("mytopic", sinceAllMessages, false)
assert.Nil(t, err)
assert.Equal(t, 1, len(messages))
assert.Equal(t, "my other message", messages[0].Message)
require.Nil(t, err)
require.Equal(t, 1, len(messages))
require.Equal(t, "my other message", messages[0].Message)
}
func testCacheMessagesTagsPrioAndTitle(t *testing.T, c cache) {
@ -114,12 +114,12 @@ func testCacheMessagesTagsPrioAndTitle(t *testing.T, c cache) {
m.Tags = []string{"tag1", "tag2"}
m.Priority = 5
m.Title = "some title"
assert.Nil(t, c.AddMessage(m))
require.Nil(t, c.AddMessage(m))
messages, _ := c.Messages("mytopic", sinceAllMessages, false)
assert.Equal(t, []string{"tag1", "tag2"}, messages[0].Tags)
assert.Equal(t, 5, messages[0].Priority)
assert.Equal(t, "some title", messages[0].Title)
require.Equal(t, []string{"tag1", "tag2"}, messages[0].Tags)
require.Equal(t, 5, messages[0].Priority)
require.Equal(t, "some title", messages[0].Title)
}
func testCacheMessagesScheduled(t *testing.T, c cache) {
@ -130,20 +130,93 @@ func testCacheMessagesScheduled(t *testing.T, c cache) {
m3.Time = time.Now().Add(time.Minute).Unix() // earlier than m2!
m4 := newDefaultMessage("mytopic2", "message 4")
m4.Time = time.Now().Add(time.Minute).Unix()
assert.Nil(t, c.AddMessage(m1))
assert.Nil(t, c.AddMessage(m2))
assert.Nil(t, c.AddMessage(m3))
require.Nil(t, c.AddMessage(m1))
require.Nil(t, c.AddMessage(m2))
require.Nil(t, c.AddMessage(m3))
messages, _ := c.Messages("mytopic", sinceAllMessages, false) // exclude scheduled
assert.Equal(t, 1, len(messages))
assert.Equal(t, "message 1", messages[0].Message)
require.Equal(t, 1, len(messages))
require.Equal(t, "message 1", messages[0].Message)
messages, _ = c.Messages("mytopic", sinceAllMessages, true) // include scheduled
assert.Equal(t, 3, len(messages))
assert.Equal(t, "message 1", messages[0].Message)
assert.Equal(t, "message 3", messages[1].Message) // Order!
assert.Equal(t, "message 2", messages[2].Message)
require.Equal(t, 3, len(messages))
require.Equal(t, "message 1", messages[0].Message)
require.Equal(t, "message 3", messages[1].Message) // Order!
require.Equal(t, "message 2", messages[2].Message)
messages, _ = c.MessagesDue()
assert.Empty(t, messages)
require.Empty(t, messages)
}
func testCacheAttachments(t *testing.T, c cache) {
expires1 := time.Now().Add(-4 * time.Hour).Unix()
m := newDefaultMessage("mytopic", "flower for you")
m.ID = "m1"
m.Attachment = &attachment{
Name: "flower.jpg",
Type: "image/jpeg",
Size: 5000,
Expires: expires1,
URL: "https://ntfy.sh/file/AbDeFgJhal.jpg",
Owner: "1.2.3.4",
}
require.Nil(t, c.AddMessage(m))
expires2 := time.Now().Add(2 * time.Hour).Unix() // Future
m = newDefaultMessage("mytopic", "sending you a car")
m.ID = "m2"
m.Attachment = &attachment{
Name: "car.jpg",
Type: "image/jpeg",
Size: 10000,
Expires: expires2,
URL: "https://ntfy.sh/file/aCaRURL.jpg",
Owner: "1.2.3.4",
}
require.Nil(t, c.AddMessage(m))
expires3 := time.Now().Add(1 * time.Hour).Unix() // Future
m = newDefaultMessage("another-topic", "sending you another car")
m.ID = "m3"
m.Attachment = &attachment{
Name: "another-car.jpg",
Type: "image/jpeg",
Size: 20000,
Expires: expires3,
URL: "https://ntfy.sh/file/zakaDHFW.jpg",
Owner: "1.2.3.4",
}
require.Nil(t, c.AddMessage(m))
messages, err := c.Messages("mytopic", sinceAllMessages, false)
require.Nil(t, err)
require.Equal(t, 2, len(messages))
require.Equal(t, "flower for you", messages[0].Message)
require.Equal(t, "flower.jpg", messages[0].Attachment.Name)
require.Equal(t, "image/jpeg", messages[0].Attachment.Type)
require.Equal(t, int64(5000), messages[0].Attachment.Size)
require.Equal(t, expires1, messages[0].Attachment.Expires)
require.Equal(t, "https://ntfy.sh/file/AbDeFgJhal.jpg", messages[0].Attachment.URL)
require.Equal(t, "1.2.3.4", messages[0].Attachment.Owner)
require.Equal(t, "sending you a car", messages[1].Message)
require.Equal(t, "car.jpg", messages[1].Attachment.Name)
require.Equal(t, "image/jpeg", messages[1].Attachment.Type)
require.Equal(t, int64(10000), messages[1].Attachment.Size)
require.Equal(t, expires2, messages[1].Attachment.Expires)
require.Equal(t, "https://ntfy.sh/file/aCaRURL.jpg", messages[1].Attachment.URL)
require.Equal(t, "1.2.3.4", messages[1].Attachment.Owner)
size, err := c.AttachmentsSize("1.2.3.4")
require.Nil(t, err)
require.Equal(t, int64(30000), size)
size, err = c.AttachmentsSize("5.6.7.8")
require.Nil(t, err)
require.Equal(t, int64(0), size)
ids, err := c.AttachmentsExpired()
require.Nil(t, err)
require.Equal(t, []string{"m1"}, ids)
}

View file

@ -4,7 +4,7 @@ import (
"time"
)
// Defines default config settings
// Defines default config settings (excluding limits, see below)
const (
DefaultListenHTTP = ":80"
DefaultCacheDuration = 12 * time.Hour
@ -13,22 +13,35 @@ const (
DefaultAtSenderInterval = 10 * time.Second
DefaultMinDelay = 10 * time.Second
DefaultMaxDelay = 3 * 24 * time.Hour
DefaultMessageLimit = 4096
DefaultFirebaseKeepaliveInterval = 3 * time.Hour // Not too frequently to save battery
)
// Defines all the limits
// - global topic limit: max number of topics overall
// Defines all global and per-visitor limits
// - message size limit: the max number of bytes for a message
// - total topic limit: max number of topics overall
// - various attachment limits
const (
DefaultMessageLengthLimit = 4096 // Bytes
DefaultTotalTopicLimit = 15000
DefaultAttachmentTotalSizeLimit = int64(5 * 1024 * 1024 * 1024) // 5 GB
DefaultAttachmentFileSizeLimit = int64(15 * 1024 * 1024) // 15 MB
DefaultAttachmentExpiryDuration = 3 * time.Hour
)
// Defines all per-visitor limits
// - per visitor subscription limit: max number of subscriptions (active HTTP connections) per per-visitor/IP
// - per visitor request limit: max number of PUT/GET/.. requests (here: 60 requests bucket, replenished at a rate of one per 10 seconds)
// - per visitor email limit: max number of emails (here: 16 email bucket, replenished at a rate of one per hour)
// - per visitor subscription limit: max number of subscriptions (active HTTP connections) per per-visitor/IP
// - per visitor attachment size limit: total per-visitor attachment size in bytes to be stored on the server
// - per visitor attachment daily bandwidth limit: number of bytes that can be transferred to/from the server
const (
DefaultGlobalTopicLimit = 5000
DefaultVisitorSubscriptionLimit = 30
DefaultVisitorRequestLimitBurst = 60
DefaultVisitorRequestLimitReplenish = 10 * time.Second
DefaultVisitorEmailLimitBurst = 16
DefaultVisitorEmailLimitReplenish = time.Hour
DefaultVisitorSubscriptionLimit = 30
DefaultVisitorAttachmentTotalSizeLimit = 100 * 1024 * 1024 // 100 MB
DefaultVisitorAttachmentDailyBandwidthLimit = 500 * 1024 * 1024 // 500 MB
)
// Config is the main config struct for the application. Use New to instantiate a default config struct.
@ -41,6 +54,10 @@ type Config struct {
FirebaseKeyFile string
CacheFile string
CacheDuration time.Duration
AttachmentCacheDir string
AttachmentTotalSizeLimit int64
AttachmentFileSizeLimit int64
AttachmentExpiryDuration time.Duration
KeepaliveInterval time.Duration
ManagerInterval time.Duration
AtSenderInterval time.Duration
@ -55,12 +72,15 @@ type Config struct {
MessageLimit int
MinDelay time.Duration
MaxDelay time.Duration
GlobalTopicLimit int
TotalTopicLimit int
TotalAttachmentSizeLimit int64
VisitorSubscriptionLimit int
VisitorAttachmentTotalSizeLimit int64
VisitorAttachmentDailyBandwidthLimit int
VisitorRequestLimitBurst int
VisitorRequestLimitReplenish time.Duration
VisitorEmailLimitBurst int
VisitorEmailLimitReplenish time.Duration
VisitorSubscriptionLimit int
BehindProxy bool
}
@ -75,19 +95,25 @@ func NewConfig() *Config {
FirebaseKeyFile: "",
CacheFile: "",
CacheDuration: DefaultCacheDuration,
AttachmentCacheDir: "",
AttachmentTotalSizeLimit: DefaultAttachmentTotalSizeLimit,
AttachmentFileSizeLimit: DefaultAttachmentFileSizeLimit,
AttachmentExpiryDuration: DefaultAttachmentExpiryDuration,
KeepaliveInterval: DefaultKeepaliveInterval,
ManagerInterval: DefaultManagerInterval,
MessageLimit: DefaultMessageLimit,
MessageLimit: DefaultMessageLengthLimit,
MinDelay: DefaultMinDelay,
MaxDelay: DefaultMaxDelay,
AtSenderInterval: DefaultAtSenderInterval,
FirebaseKeepaliveInterval: DefaultFirebaseKeepaliveInterval,
GlobalTopicLimit: DefaultGlobalTopicLimit,
TotalTopicLimit: DefaultTotalTopicLimit,
VisitorSubscriptionLimit: DefaultVisitorSubscriptionLimit,
VisitorAttachmentTotalSizeLimit: DefaultVisitorAttachmentTotalSizeLimit,
VisitorAttachmentDailyBandwidthLimit: DefaultVisitorAttachmentDailyBandwidthLimit,
VisitorRequestLimitBurst: DefaultVisitorRequestLimitBurst,
VisitorRequestLimitReplenish: DefaultVisitorRequestLimitReplenish,
VisitorEmailLimitBurst: DefaultVisitorEmailLimitBurst,
VisitorEmailLimitReplenish: DefaultVisitorEmailLimitReplenish,
VisitorSubscriptionLimit: DefaultVisitorSubscriptionLimit,
BehindProxy: false,
}
}

121
server/file_cache.go Normal file
View file

@ -0,0 +1,121 @@
package server
import (
"errors"
"heckel.io/ntfy/util"
"io"
"os"
"path/filepath"
"regexp"
"sync"
)
var (
fileIDRegex = regexp.MustCompile(`^[-_A-Za-z0-9]+$`)
errInvalidFileID = errors.New("invalid file ID")
errFileExists = errors.New("file exists")
)
type fileCache struct {
dir string
totalSizeCurrent int64
totalSizeLimit int64
fileSizeLimit int64
mu sync.Mutex
}
func newFileCache(dir string, totalSizeLimit int64, fileSizeLimit int64) (*fileCache, error) {
if err := os.MkdirAll(dir, 0700); err != nil {
return nil, err
}
size, err := dirSize(dir)
if err != nil {
return nil, err
}
return &fileCache{
dir: dir,
totalSizeCurrent: size,
totalSizeLimit: totalSizeLimit,
fileSizeLimit: fileSizeLimit,
}, nil
}
func (c *fileCache) Write(id string, in io.Reader, limiters ...util.Limiter) (int64, error) {
if !fileIDRegex.MatchString(id) {
return 0, errInvalidFileID
}
file := filepath.Join(c.dir, id)
if _, err := os.Stat(file); err == nil {
return 0, errFileExists
}
f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
return 0, err
}
defer f.Close()
limiters = append(limiters, util.NewFixedLimiter(c.Remaining()), util.NewFixedLimiter(c.fileSizeLimit))
limitWriter := util.NewLimitWriter(f, limiters...)
size, err := io.Copy(limitWriter, in)
if err != nil {
os.Remove(file)
return 0, err
}
if err := f.Close(); err != nil {
os.Remove(file)
return 0, err
}
c.mu.Lock()
c.totalSizeCurrent += size
c.mu.Unlock()
return size, nil
}
func (c *fileCache) Remove(ids ...string) error {
for _, id := range ids {
if !fileIDRegex.MatchString(id) {
return errInvalidFileID
}
file := filepath.Join(c.dir, id)
_ = os.Remove(file) // Best effort delete
}
size, err := dirSize(c.dir)
if err != nil {
return err
}
c.mu.Lock()
c.totalSizeCurrent = size
c.mu.Unlock()
return nil
}
func (c *fileCache) Size() int64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.totalSizeCurrent
}
func (c *fileCache) Remaining() int64 {
c.mu.Lock()
defer c.mu.Unlock()
remaining := c.totalSizeLimit - c.totalSizeCurrent
if remaining < 0 {
return 0
}
return remaining
}
func dirSize(dir string) (int64, error) {
entries, err := os.ReadDir(dir)
if err != nil {
return 0, err
}
var size int64
for _, e := range entries {
info, err := e.Info()
if err != nil {
return 0, err
}
size += info.Size()
}
return size, nil
}

83
server/file_cache_test.go Normal file
View file

@ -0,0 +1,83 @@
package server
import (
"bytes"
"fmt"
"github.com/stretchr/testify/require"
"heckel.io/ntfy/util"
"os"
"strings"
"testing"
)
var (
oneKilobyteArray = make([]byte, 1024)
)
func TestFileCache_Write_Success(t *testing.T) {
dir, c := newTestFileCache(t)
size, err := c.Write("abc", strings.NewReader("normal file"), util.NewFixedLimiter(999))
require.Nil(t, err)
require.Equal(t, int64(11), size)
require.Equal(t, "normal file", readFile(t, dir+"/abc"))
require.Equal(t, int64(11), c.Size())
require.Equal(t, int64(10229), c.Remaining())
}
func TestFileCache_Write_Remove_Success(t *testing.T) {
dir, c := newTestFileCache(t) // max = 10k (10240), each = 1k (1024)
for i := 0; i < 10; i++ { // 10x999 = 9990
size, err := c.Write(fmt.Sprintf("abc%d", i), bytes.NewReader(make([]byte, 999)))
require.Nil(t, err)
require.Equal(t, int64(999), size)
}
require.Equal(t, int64(9990), c.Size())
require.Equal(t, int64(250), c.Remaining())
require.FileExists(t, dir+"/abc1")
require.FileExists(t, dir+"/abc5")
require.Nil(t, c.Remove("abc1", "abc5"))
require.NoFileExists(t, dir+"/abc1")
require.NoFileExists(t, dir+"/abc5")
require.Equal(t, int64(7992), c.Size())
require.Equal(t, int64(2248), c.Remaining())
}
func TestFileCache_Write_FailedTotalSizeLimit(t *testing.T) {
dir, c := newTestFileCache(t)
for i := 0; i < 10; i++ {
size, err := c.Write(fmt.Sprintf("abc%d", i), bytes.NewReader(oneKilobyteArray))
require.Nil(t, err)
require.Equal(t, int64(1024), size)
}
_, err := c.Write("abc11", bytes.NewReader(oneKilobyteArray))
require.Equal(t, util.ErrLimitReached, err)
require.NoFileExists(t, dir+"/abc11")
}
func TestFileCache_Write_FailedFileSizeLimit(t *testing.T) {
dir, c := newTestFileCache(t)
_, err := c.Write("abc", bytes.NewReader(make([]byte, 1025)))
require.Equal(t, util.ErrLimitReached, err)
require.NoFileExists(t, dir+"/abc")
}
func TestFileCache_Write_FailedAdditionalLimiter(t *testing.T) {
dir, c := newTestFileCache(t)
_, err := c.Write("abc", bytes.NewReader(make([]byte, 1001)), util.NewFixedLimiter(1000))
require.Equal(t, util.ErrLimitReached, err)
require.NoFileExists(t, dir+"/abc")
}
func newTestFileCache(t *testing.T) (dir string, cache *fileCache) {
dir = t.TempDir()
cache, err := newFileCache(dir, 10*1024, 1*1024)
require.Nil(t, err)
return dir, cache
}
func readFile(t *testing.T, f string) string {
b, err := os.ReadFile(f)
require.Nil(t, err)
return string(b)
}

View file

@ -25,10 +25,20 @@ type message struct {
Priority int `json:"priority,omitempty"`
Tags []string `json:"tags,omitempty"`
Click string `json:"click,omitempty"`
Attachment *attachment `json:"attachment,omitempty"`
Title string `json:"title,omitempty"`
Message string `json:"message,omitempty"`
}
type attachment struct {
Name string `json:"name"`
Type string `json:"type,omitempty"`
Size int64 `json:"size,omitempty"`
Expires int64 `json:"expires,omitempty"`
URL string `json:"url"`
Owner string `json:"-"` // IP address of uploader, used for rate limiting
}
// messageEncoder is a function that knows how to encode a message
type messageEncoder func(msg *message) (string, error)

View file

@ -18,11 +18,14 @@ import (
"net"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"time"
"unicode/utf8"
)
// TODO add "max messages in a topic" limit
@ -41,6 +44,7 @@ type Server struct {
mailer mailer
messages int64
cache cache
fileCache *fileCache
closeChan chan bool
mu sync.Mutex
}
@ -96,7 +100,9 @@ var (
staticRegex = regexp.MustCompile(`^/static/.+`)
docsRegex = regexp.MustCompile(`^/docs(|/.*)$`)
disallowedTopics = []string{"docs", "static"}
fileRegex = regexp.MustCompile(`^/file/([-_A-Za-z0-9]{1,64})(?:\.[A-Za-z0-9]{1,16})?$`)
disallowedTopics = []string{"docs", "static", "file"}
attachURLRegex = regexp.MustCompile(`^https?://`)
templateFnMap = template.FuncMap{
"durationToHuman": util.DurationToHuman,
@ -117,11 +123,6 @@ var (
docsStaticFs embed.FS
docsStaticCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: docsStaticFs}
errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", ""}
errHTTPTooManyRequestsLimitRequests = &errHTTP{42901, http.StatusTooManyRequests, "limit reached: too many requests, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsLimitEmails = &errHTTP{42902, http.StatusTooManyRequests, "limit reached: too many emails, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsLimitSubscriptions = &errHTTP{42903, http.StatusTooManyRequests, "limit reached: too many active subscriptions, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsLimitGlobalTopics = &errHTTP{42904, http.StatusTooManyRequests, "limit reached: the total number of topics on the server has been reached, please contact the admin", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPBadRequestEmailDisabled = &errHTTP{40001, http.StatusBadRequest, "e-mail notifications are not enabled", "https://ntfy.sh/docs/config/#e-mail-notifications"}
errHTTPBadRequestDelayNoCache = &errHTTP{40002, http.StatusBadRequest, "cannot disable cache for delayed message", ""}
errHTTPBadRequestDelayNoEmail = &errHTTP{40003, http.StatusBadRequest, "delayed e-mail notifications are not supported", ""}
@ -132,12 +133,27 @@ var (
errHTTPBadRequestSinceInvalid = &errHTTP{40008, http.StatusBadRequest, "invalid since parameter", "https://ntfy.sh/docs/subscribe/api/#fetch-cached-messages"}
errHTTPBadRequestTopicInvalid = &errHTTP{40009, http.StatusBadRequest, "invalid topic: path invalid", ""}
errHTTPBadRequestTopicDisallowed = &errHTTP{40010, http.StatusBadRequest, "invalid topic: topic name is disallowed", ""}
errHTTPBadRequestMessageNotUTF8 = &errHTTP{40011, http.StatusBadRequest, "invalid message: message must be UTF-8 encoded", ""}
errHTTPBadRequestAttachmentTooLarge = &errHTTP{40012, http.StatusBadRequest, "invalid request: attachment too large, or bandwidth limit reached", ""}
errHTTPBadRequestAttachmentURLInvalid = &errHTTP{40013, http.StatusBadRequest, "invalid request: attachment URL is invalid", ""}
errHTTPBadRequestAttachmentURLPeakGeneral = &errHTTP{40014, http.StatusBadRequest, "invalid request: attachment URL peak failed", ""}
errHTTPBadRequestAttachmentURLPeakNon2xx = &errHTTP{40015, http.StatusBadRequest, "invalid request: attachment URL peak failed with non-2xx status code", ""}
errHTTPBadRequestAttachmentsDisallowed = &errHTTP{40016, http.StatusBadRequest, "invalid request: attachments not allowed", ""}
errHTTPBadRequestAttachmentsExpiryBeforeDelivery = &errHTTP{40017, http.StatusBadRequest, "invalid request: attachment expiry before delayed delivery date", ""}
errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", ""}
errHTTPTooManyRequestsLimitRequests = &errHTTP{42901, http.StatusTooManyRequests, "limit reached: too many requests, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsLimitEmails = &errHTTP{42902, http.StatusTooManyRequests, "limit reached: too many emails, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsLimitSubscriptions = &errHTTP{42903, http.StatusTooManyRequests, "limit reached: too many active subscriptions, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsLimitTotalTopics = &errHTTP{42904, http.StatusTooManyRequests, "limit reached: the total number of topics on the server has been reached, please contact the admin", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPTooManyRequestsAttachmentBandwidthLimit = &errHTTP{42905, http.StatusTooManyRequests, "too many requests: daily bandwidth limit reached", "https://ntfy.sh/docs/publish/#limitations"}
errHTTPInternalError = &errHTTP{50001, http.StatusInternalServerError, "internal server error", ""}
errHTTPInternalErrorInvalidFilePath = &errHTTP{50002, http.StatusInternalServerError, "internal server error: invalid file path", ""}
)
const (
firebaseControlTopic = "~control" // See Android if changed
emptyMessageBody = "triggered"
emptyMessageBody = "triggered" // Used if message body is empty
defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment
fcmMessageLimit = 4000 // see maybeTruncateFCMMessage for details
)
@ -164,9 +180,17 @@ func New(conf *Config) (*Server, error) {
if err != nil {
return nil, err
}
var fileCache *fileCache
if conf.AttachmentCacheDir != "" {
fileCache, err = newFileCache(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, conf.AttachmentFileSizeLimit)
if err != nil {
return nil, err
}
}
return &Server{
config: conf,
cache: cache,
fileCache: fileCache,
firebase: firebaseSubscriber,
mailer: mailer,
topics: topics,
@ -214,6 +238,13 @@ func createFirebaseSubscriber(conf *Config) (subscriber, error) {
"title": m.Title,
"message": m.Message,
}
if m.Attachment != nil {
data["attachment_name"] = m.Attachment.Name
data["attachment_type"] = m.Attachment.Type
data["attachment_size"] = fmt.Sprintf("%d", m.Attachment.Size)
data["attachment_expires"] = fmt.Sprintf("%d", m.Attachment.Expires)
data["attachment_url"] = m.Attachment.URL
}
}
var androidConfig *messaging.AndroidConfig
if m.Priority >= 4 {
@ -270,7 +301,7 @@ func (s *Server) Run() error {
errChan <- s.httpServer.ListenAndServe()
}()
if s.config.ListenHTTPS != "" {
s.httpsServer = &http.Server{Addr: s.config.ListenHTTP, Handler: mux}
s.httpsServer = &http.Server{Addr: s.config.ListenHTTPS, Handler: mux}
go func() {
errChan <- s.httpsServer.ListenAndServeTLS(s.config.CertFile, s.config.KeyFile)
}()
@ -311,7 +342,7 @@ func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
if e, ok = err.(*errHTTP); !ok {
e = errHTTPInternalError
}
log.Printf("[%s] %s - %d - %s", r.RemoteAddr, r.Method, e.HTTPCode, err.Error())
log.Printf("[%s] %s - %d - %d - %s", r.RemoteAddr, r.Method, e.HTTPCode, e.Code, err.Error())
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
w.WriteHeader(e.HTTPCode)
@ -330,6 +361,8 @@ func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request) error {
return s.handleStatic(w, r)
} else if r.Method == http.MethodGet && docsRegex.MatchString(r.URL.Path) {
return s.handleDocs(w, r)
} else if r.Method == http.MethodGet && fileRegex.MatchString(r.URL.Path) && s.config.AttachmentCacheDir != "" {
return s.withRateLimit(w, r, s.handleFile)
} else if r.Method == http.MethodOptions {
return s.handleOptions(w, r)
} else if r.Method == http.MethodGet && topicPathRegex.MatchString(r.URL.Path) {
@ -385,28 +418,52 @@ func (s *Server) handleDocs(w http.ResponseWriter, r *http.Request) error {
return nil
}
func (s *Server) handleFile(w http.ResponseWriter, r *http.Request, v *visitor) error {
if s.config.AttachmentCacheDir == "" {
return errHTTPInternalError
}
matches := fileRegex.FindStringSubmatch(r.URL.Path)
if len(matches) != 2 {
return errHTTPInternalErrorInvalidFilePath
}
messageID := matches[1]
file := filepath.Join(s.config.AttachmentCacheDir, messageID)
stat, err := os.Stat(file)
if err != nil {
return errHTTPNotFound
}
if err := v.BandwidthLimiter().Allow(stat.Size()); err != nil {
return errHTTPTooManyRequestsAttachmentBandwidthLimit
}
w.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size()))
f, err := os.Open(file)
if err != nil {
return err
}
defer f.Close()
_, err = io.Copy(util.NewContentTypeWriter(w, r.URL.Path), f)
return err
}
func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error {
t, err := s.topicFromPath(r.URL.Path)
if err != nil {
return err
}
reader := io.LimitReader(r.Body, int64(s.config.MessageLimit))
b, err := io.ReadAll(reader)
body, err := util.Peak(r.Body, s.config.MessageLimit)
if err != nil {
return err
}
m := newDefaultMessage(t.ID, strings.TrimSpace(string(b)))
cache, firebase, email, err := s.parsePublishParams(r, m)
m := newDefaultMessage(t.ID, "")
cache, firebase, email, err := s.parsePublishParams(r, v, m)
if err != nil {
return err
}
if email != "" {
if err := v.EmailAllowed(); err != nil {
return errHTTPTooManyRequestsLimitEmails
if err := maybePeakAttachmentURL(m); err != nil {
return err
}
}
if s.mailer == nil && email != "" {
return errHTTPBadRequestEmailDisabled
if err := s.handlePublishBody(r, v, m, body); err != nil {
return err
}
if m.Message == "" {
m.Message = emptyMessageBody
@ -445,12 +502,34 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
return nil
}
func (s *Server) parsePublishParams(r *http.Request, m *message) (cache bool, firebase bool, email string, err error) {
func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (cache bool, firebase bool, email string, err error) {
cache = readParam(r, "x-cache", "cache") != "no"
firebase = readParam(r, "x-firebase", "firebase") != "no"
email = readParam(r, "x-email", "x-e-mail", "email", "e-mail", "mail", "e")
m.Title = readParam(r, "x-title", "title", "t")
m.Click = readParam(r, "x-click", "click")
attach := readParam(r, "x-attach", "attach", "a")
filename := readParam(r, "x-filename", "filename", "file", "f")
if attach != "" || filename != "" {
m.Attachment = &attachment{}
}
if attach != "" {
if !attachURLRegex.MatchString(attach) {
return false, false, "", errHTTPBadRequestAttachmentURLInvalid
}
m.Attachment.URL = attach
}
if filename != "" {
m.Attachment.Name = filename
}
email = readParam(r, "x-email", "x-e-mail", "email", "e-mail", "mail", "e")
if email != "" {
if err := v.EmailAllowed(); err != nil {
return false, false, "", errHTTPTooManyRequestsLimitEmails
}
}
if s.mailer == nil && email != "" {
return false, false, "", errHTTPBadRequestEmailDisabled
}
messageStr := readParam(r, "x-message", "message", "m")
if messageStr != "" {
m.Message = messageStr
@ -507,6 +586,81 @@ func readParam(r *http.Request, names ...string) string {
return ""
}
// handlePublishBody consumes the PUT/POST body and decides whether the body is an attachment or the message.
//
// 1. curl -H "Attach: http://example.com/file.jpg" ntfy.sh/mytopic
// Body must be a message, because we attached an external URL
// 2. curl -T short.txt -H "Filename: short.txt" ntfy.sh/mytopic
// Body must be attachment, because we passed a filename
// 3. curl -T file.txt ntfy.sh/mytopic
// If file.txt is <= 4096 (message limit) and valid UTF-8, treat it as a message
// 4. curl -T file.txt ntfy.sh/mytopic
// If file.txt is > message limit, treat it as an attachment
func (s *Server) handlePublishBody(r *http.Request, v *visitor, m *message, body *util.PeakedReadCloser) error {
if m.Attachment != nil && m.Attachment.URL != "" {
return s.handleBodyAsMessage(m, body) // Case 1
} else if m.Attachment != nil && m.Attachment.Name != "" {
return s.handleBodyAsAttachment(r, v, m, body) // Case 2
} else if !body.LimitReached && utf8.Valid(body.PeakedBytes) {
return s.handleBodyAsMessage(m, body) // Case 3
}
return s.handleBodyAsAttachment(r, v, m, body) // Case 4
}
func (s *Server) handleBodyAsMessage(m *message, body *util.PeakedReadCloser) error {
if !utf8.Valid(body.PeakedBytes) {
return errHTTPBadRequestMessageNotUTF8
}
if len(body.PeakedBytes) > 0 { // Empty body should not override message (publish via GET!)
m.Message = strings.TrimSpace(string(body.PeakedBytes)) // Truncates the message to the peak limit if required
}
if m.Attachment != nil && m.Attachment.Name != "" && m.Message == "" {
m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
}
return nil
}
func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *message, body *util.PeakedReadCloser) error {
if s.fileCache == nil || s.config.BaseURL == "" || s.config.AttachmentCacheDir == "" {
return errHTTPBadRequestAttachmentsDisallowed
} else if m.Time > time.Now().Add(s.config.AttachmentExpiryDuration).Unix() {
return errHTTPBadRequestAttachmentsExpiryBeforeDelivery
}
visitorAttachmentsSize, err := s.cache.AttachmentsSize(v.ip)
if err != nil {
return err
}
remainingVisitorAttachmentSize := s.config.VisitorAttachmentTotalSizeLimit - visitorAttachmentsSize
contentLengthStr := r.Header.Get("Content-Length")
if contentLengthStr != "" { // Early "do-not-trust" check, hard limit see below
contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
if err == nil && (contentLength > remainingVisitorAttachmentSize || contentLength > s.config.AttachmentFileSizeLimit) {
return errHTTPBadRequestAttachmentTooLarge
}
}
if m.Attachment == nil {
m.Attachment = &attachment{}
}
var ext string
m.Attachment.Owner = v.ip // Important for attachment rate limiting
m.Attachment.Expires = time.Now().Add(s.config.AttachmentExpiryDuration).Unix()
m.Attachment.Type, ext = util.DetectContentType(body.PeakedBytes, m.Attachment.Name)
m.Attachment.URL = fmt.Sprintf("%s/file/%s%s", s.config.BaseURL, m.ID, ext)
if m.Attachment.Name == "" {
m.Attachment.Name = fmt.Sprintf("attachment%s", ext)
}
if m.Message == "" {
m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
}
m.Attachment.Size, err = s.fileCache.Write(m.ID, body, v.BandwidthLimiter(), util.NewFixedLimiter(remainingVisitorAttachmentSize))
if err == util.ErrLimitReached {
return errHTTPBadRequestAttachmentTooLarge
} else if err != nil {
return err
}
return nil
}
func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request, v *visitor) error {
encoder := func(msg *message) (string, error) {
var buf bytes.Buffer
@ -720,8 +874,8 @@ func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) {
return nil, errHTTPBadRequestTopicDisallowed
}
if _, ok := s.topics[id]; !ok {
if len(s.topics) >= s.config.GlobalTopicLimit {
return nil, errHTTPTooManyRequestsLimitGlobalTopics
if len(s.topics) >= s.config.TotalTopicLimit {
return nil, errHTTPTooManyRequestsLimitTotalTopics
}
s.topics[id] = newTopic(id)
}
@ -741,6 +895,18 @@ func (s *Server) updateStatsAndPrune() {
}
}
// Delete expired attachments
if s.fileCache != nil {
ids, err := s.cache.AttachmentsExpired()
if err == nil {
if err := s.fileCache.Remove(ids...); err != nil {
log.Printf("error while deleting attachments: %s", err.Error())
}
} else {
log.Printf("error retrieving expired attachments: %s", err.Error())
}
}
// Prune message cache
olderThan := time.Now().Add(-1 * s.config.CacheDuration)
if err := s.cache.Prune(olderThan); err != nil {
@ -857,13 +1023,12 @@ func (s *Server) sendDelayedMessages() error {
if err := t.Publish(m); err != nil {
log.Printf("unable to publish message %s to topic %s: %v", m.ID, m.Topic, err.Error())
}
if s.firebase != nil {
}
if s.firebase != nil { // Firebase subscribers may not show up in topics map
if err := s.firebase(m); err != nil {
log.Printf("unable to publish to Firebase: %v", err.Error())
}
}
// TODO delayed email sending
}
if err := s.cache.MarkPublished(m); err != nil {
return err
}

View file

@ -36,7 +36,7 @@
#
# You can disable the cache entirely by setting this to 0.
#
# cache-duration: 12h
# cache-duration: "12h"
# If set, the X-Forwarded-For header is used to determine the visitor IP address
# instead of the remote address of the connection.
@ -46,6 +46,19 @@
#
# behind-proxy: false
# If enabled, clients can attach files to notifications as attachments. Minimum settings to enable attachments
# are "attachment-cache-dir" and "base-url".
#
# - attachment-cache-dir is the cache directory for attached files
# - attachment-total-size-limit is the limit of the on-disk attachment cache directory (total size)
# - attachment-file-size-limit is the per-file attachment size limit (e.g. 300k, 2M, 100M)
# - attachment-expiry-duration is the duration after which uploaded attachments will be deleted (e.g. 3h, 20h)
#
# attachment-cache-dir:
# attachment-total-size-limit: "5G"
# attachment-file-size-limit: "15M"
# attachment-expiry-duration: "3h"
# If enabled, allow outgoing e-mail notifications via the 'X-Email' header. If this header is set,
# messages will additionally be sent out as e-mail using an external SMTP server. As of today, only
# SMTP servers with plain text auth and STARTLS are supported. Please also refer to the rate limiting settings
@ -78,16 +91,16 @@
#
# Note that the Android app has a hardcoded timeout at 77s, so it should be less than that.
#
# keepalive-interval: 30s
# keepalive-interval: "30s"
# Interval in which the manager prunes old messages, deletes topics
# and prints the stats.
#
# manager-interval: 1m
# manager-interval: "1m"
# Rate limiting: Total number of topics before the server rejects new topics.
#
# global-topic-limit: 5000
# global-topic-limit: 15000
# Rate limiting: Number of subscriptions per visitor (IP address)
#
@ -98,11 +111,18 @@
# - visitor-request-limit-replenish is the rate at which the bucket is refilled
#
# visitor-request-limit-burst: 60
# visitor-request-limit-replenish: 10s
# visitor-request-limit-replenish: "10s"
# Rate limiting: Allowed emails per visitor:
# - visitor-email-limit-burst is the initial bucket of emails each visitor has
# - visitor-email-limit-replenish is the rate at which the bucket is refilled
#
# visitor-email-limit-burst: 16
# visitor-email-limit-replenish: 1h
# visitor-email-limit-replenish: "1h"
# Rate limiting: Attachment size and bandwidth limits per visitor:
# - visitor-attachment-total-size-limit is the total storage limit used for attachments per visitor
# - visitor-attachment-daily-bandwidth-limit is the total daily attachment download/upload traffic limit per visitor
#
# visitor-attachment-total-size-limit: "100M"
# visitor-attachment-daily-bandwidth-limit: "500M"

View file

@ -7,6 +7,7 @@ import (
"firebase.google.com/go/messaging"
"fmt"
"github.com/stretchr/testify/require"
"heckel.io/ntfy/util"
"net/http"
"net/http/httptest"
"os"
@ -163,20 +164,13 @@ func TestServer_StaticSites(t *testing.T) {
}
func TestServer_PublishLargeMessage(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
c := newTestConfig(t)
c.AttachmentCacheDir = "" // Disable attachments
s := newTestServer(t, c)
body := strings.Repeat("this is a large message", 5000)
truncated := body[0:4096]
response := request(t, s, "PUT", "/mytopic", body, nil)
msg := toMessage(t, response.Body.String())
require.NotEmpty(t, msg.ID)
require.Equal(t, truncated, msg.Message)
require.Equal(t, 4096, len(msg.Message))
response = request(t, s, "GET", "/mytopic/json?poll=1", "", nil)
messages := toMessages(t, response.Body.String())
require.Equal(t, 1, len(messages))
require.Equal(t, truncated, messages[0].Message)
require.Equal(t, 400, response.Code)
}
func TestServer_PublishPriority(t *testing.T) {
@ -205,6 +199,9 @@ func TestServer_PublishPriority(t *testing.T) {
response = request(t, s, "GET", "/mytopic/trigger?priority=urgent", "test", nil)
require.Equal(t, 5, toMessage(t, response.Body.String()).Priority)
response = request(t, s, "GET", "/mytopic/trigger?priority=INVALID", "test", nil)
require.Equal(t, 40007, toHTTPError(t, response.Body.String()).Code)
}
func TestServer_PublishNoCache(t *testing.T) {
@ -268,13 +265,28 @@ func TestServer_PublishAtTooShortDelay(t *testing.T) {
func TestServer_PublishAtTooLongDelay(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
response := request(t, s, "PUT", "/mytopic", "a message", map[string]string{
"In": "99999999h",
})
require.Equal(t, 400, response.Code)
}
func TestServer_PublishAtInvalidDelay(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
response := request(t, s, "PUT", "/mytopic?delay=INVALID", "a message", nil)
err := toHTTPError(t, response.Body.String())
require.Equal(t, 400, response.Code)
require.Equal(t, 40004, err.Code)
}
func TestServer_PublishAtTooLarge(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
response := request(t, s, "PUT", "/mytopic?x-in=99999h", "a message", nil)
err := toHTTPError(t, response.Body.String())
require.Equal(t, 400, response.Code)
require.Equal(t, 40006, err.Code)
}
func TestServer_PublishAtAndPrune(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
@ -356,6 +368,19 @@ func TestServer_PublishAndPollSince(t *testing.T) {
messages := toMessages(t, response.Body.String())
require.Equal(t, 1, len(messages))
require.Equal(t, "test 2", messages[0].Message)
response = request(t, s, "GET", "/mytopic/json?poll=1&since=10s", "", nil)
messages = toMessages(t, response.Body.String())
require.Equal(t, 2, len(messages))
require.Equal(t, "test 1", messages[0].Message)
response = request(t, s, "GET", "/mytopic/json?poll=1&since=100ms", "", nil)
messages = toMessages(t, response.Body.String())
require.Equal(t, 1, len(messages))
require.Equal(t, "test 2", messages[0].Message)
response = request(t, s, "GET", "/mytopic/json?poll=1&since=INVALID", "", nil)
require.Equal(t, 40008, toHTTPError(t, response.Body.String()).Code)
}
func TestServer_PublishViaGET(t *testing.T) {
@ -396,6 +421,13 @@ func TestServer_PublishFirebase(t *testing.T) {
time.Sleep(500 * time.Millisecond) // Time for sends
}
func TestServer_PublishInvalidTopic(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
s.mailer = &testMailer{}
response := request(t, s, "PUT", "/docs", "fail", nil)
require.Equal(t, 40010, toHTTPError(t, response.Body.String()).Code)
}
func TestServer_PollWithQueryFilters(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
@ -649,9 +681,241 @@ func TestServer_MaybeTruncateFCMMessage_NotTooLong(t *testing.T) {
require.Equal(t, "", notTruncatedFCMMessage.Data["truncated"])
}
func TestServer_PublishAttachment(t *testing.T) {
content := util.RandomString(5000) // > 4096
s := newTestServer(t, newTestConfig(t))
response := request(t, s, "PUT", "/mytopic", content, nil)
msg := toMessage(t, response.Body.String())
require.Equal(t, "attachment.txt", msg.Attachment.Name)
require.Equal(t, "text/plain; charset=utf-8", msg.Attachment.Type)
require.Equal(t, int64(5000), msg.Attachment.Size)
require.GreaterOrEqual(t, msg.Attachment.Expires, time.Now().Add(3*time.Hour).Unix())
require.Contains(t, msg.Attachment.URL, "http://127.0.0.1:12345/file/")
require.Equal(t, "", msg.Attachment.Owner) // Should never be returned
require.FileExists(t, filepath.Join(s.config.AttachmentCacheDir, msg.ID))
path := strings.TrimPrefix(msg.Attachment.URL, "http://127.0.0.1:12345")
response = request(t, s, "GET", path, "", nil)
require.Equal(t, 200, response.Code)
require.Equal(t, "5000", response.Header().Get("Content-Length"))
require.Equal(t, content, response.Body.String())
// Slightly unrelated cross-test: make sure we add an owner for internal attachments
size, err := s.cache.AttachmentsSize("9.9.9.9") // See request()
require.Nil(t, err)
require.Equal(t, int64(5000), size)
}
func TestServer_PublishAttachmentShortWithFilename(t *testing.T) {
c := newTestConfig(t)
c.BehindProxy = true
s := newTestServer(t, c)
content := "this is an ATTACHMENT"
response := request(t, s, "PUT", "/mytopic?f=myfile.txt", content, map[string]string{
"X-Forwarded-For": "1.2.3.4",
})
msg := toMessage(t, response.Body.String())
require.Equal(t, "myfile.txt", msg.Attachment.Name)
require.Equal(t, "text/plain; charset=utf-8", msg.Attachment.Type)
require.Equal(t, int64(21), msg.Attachment.Size)
require.GreaterOrEqual(t, msg.Attachment.Expires, time.Now().Add(3*time.Hour).Unix())
require.Contains(t, msg.Attachment.URL, "http://127.0.0.1:12345/file/")
require.Equal(t, "", msg.Attachment.Owner) // Should never be returned
require.FileExists(t, filepath.Join(s.config.AttachmentCacheDir, msg.ID))
path := strings.TrimPrefix(msg.Attachment.URL, "http://127.0.0.1:12345")
response = request(t, s, "GET", path, "", nil)
require.Equal(t, 200, response.Code)
require.Equal(t, "21", response.Header().Get("Content-Length"))
require.Equal(t, content, response.Body.String())
// Slightly unrelated cross-test: make sure we add an owner for internal attachments
size, err := s.cache.AttachmentsSize("1.2.3.4")
require.Nil(t, err)
require.Equal(t, int64(21), size)
}
func TestServer_PublishAttachmentExternalWithoutFilename(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
response := request(t, s, "PUT", "/mytopic", "", map[string]string{
"Attach": "https://upload.wikimedia.org/wikipedia/commons/f/fd/Pink_flower.jpg",
})
msg := toMessage(t, response.Body.String())
require.Equal(t, "You received a file: Pink_flower.jpg", msg.Message)
require.Equal(t, "Pink_flower.jpg", msg.Attachment.Name)
require.Equal(t, "image/jpeg", msg.Attachment.Type)
require.Equal(t, int64(190173), msg.Attachment.Size)
require.Equal(t, int64(0), msg.Attachment.Expires)
require.Equal(t, "https://upload.wikimedia.org/wikipedia/commons/f/fd/Pink_flower.jpg", msg.Attachment.URL)
require.Equal(t, "", msg.Attachment.Owner)
// Slightly unrelated cross-test: make sure we don't add an owner for external attachments
size, err := s.cache.AttachmentsSize("127.0.0.1")
require.Nil(t, err)
require.Equal(t, int64(0), size)
}
func TestServer_PublishAttachmentExternalWithFilename(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
response := request(t, s, "PUT", "/mytopic", "This is a custom message", map[string]string{
"X-Attach": "https://upload.wikimedia.org/wikipedia/commons/f/fd/Pink_flower.jpg",
"File": "some file.jpg",
})
msg := toMessage(t, response.Body.String())
require.Equal(t, "This is a custom message", msg.Message)
require.Equal(t, "some file.jpg", msg.Attachment.Name)
require.Equal(t, "image/jpeg", msg.Attachment.Type)
require.Equal(t, int64(190173), msg.Attachment.Size)
require.Equal(t, int64(0), msg.Attachment.Expires)
require.Equal(t, "https://upload.wikimedia.org/wikipedia/commons/f/fd/Pink_flower.jpg", msg.Attachment.URL)
require.Equal(t, "", msg.Attachment.Owner)
}
func TestServer_PublishAttachmentBadURL(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
response := request(t, s, "PUT", "/mytopic?a=not+a+URL", "", nil)
err := toHTTPError(t, response.Body.String())
require.Equal(t, 400, response.Code)
require.Equal(t, 400, err.HTTPCode)
require.Equal(t, 40013, err.Code)
}
func TestServer_PublishAttachmentTooLargeContentLength(t *testing.T) {
content := util.RandomString(5000) // > 4096
s := newTestServer(t, newTestConfig(t))
response := request(t, s, "PUT", "/mytopic", content, map[string]string{
"Content-Length": "20000000",
})
err := toHTTPError(t, response.Body.String())
require.Equal(t, 400, response.Code)
require.Equal(t, 400, err.HTTPCode)
require.Equal(t, 40012, err.Code)
}
func TestServer_PublishAttachmentTooLargeBodyAttachmentFileSizeLimit(t *testing.T) {
content := util.RandomString(5001) // > 5000, see below
c := newTestConfig(t)
c.AttachmentFileSizeLimit = 5000
s := newTestServer(t, c)
response := request(t, s, "PUT", "/mytopic", content, nil)
err := toHTTPError(t, response.Body.String())
require.Equal(t, 400, response.Code)
require.Equal(t, 400, err.HTTPCode)
require.Equal(t, 40012, err.Code)
}
func TestServer_PublishAttachmentExpiryBeforeDelivery(t *testing.T) {
c := newTestConfig(t)
c.AttachmentExpiryDuration = 10 * time.Minute
s := newTestServer(t, c)
response := request(t, s, "PUT", "/mytopic", util.RandomString(5000), map[string]string{
"Delay": "11 min", // > AttachmentExpiryDuration
})
err := toHTTPError(t, response.Body.String())
require.Equal(t, 400, response.Code)
require.Equal(t, 400, err.HTTPCode)
require.Equal(t, 40017, err.Code)
}
func TestServer_PublishAttachmentTooLargeBodyVisitorAttachmentTotalSizeLimit(t *testing.T) {
c := newTestConfig(t)
c.VisitorAttachmentTotalSizeLimit = 10000
s := newTestServer(t, c)
response := request(t, s, "PUT", "/mytopic", util.RandomString(5000), nil)
msg := toMessage(t, response.Body.String())
require.Equal(t, 200, response.Code)
require.Equal(t, "You received a file: attachment.txt", msg.Message)
require.Equal(t, int64(5000), msg.Attachment.Size)
content := util.RandomString(5001) // 5000+5001 > , see below
response = request(t, s, "PUT", "/mytopic", content, nil)
err := toHTTPError(t, response.Body.String())
require.Equal(t, 400, response.Code)
require.Equal(t, 400, err.HTTPCode)
require.Equal(t, 40012, err.Code)
}
func TestServer_PublishAttachmentAndPrune(t *testing.T) {
content := util.RandomString(5000) // > 4096
c := newTestConfig(t)
c.AttachmentExpiryDuration = time.Millisecond // Hack
s := newTestServer(t, c)
// Publish and make sure we can retrieve it
response := request(t, s, "PUT", "/mytopic", content, nil)
msg := toMessage(t, response.Body.String())
require.Contains(t, msg.Attachment.URL, "http://127.0.0.1:12345/file/")
file := filepath.Join(s.config.AttachmentCacheDir, msg.ID)
require.FileExists(t, file)
path := strings.TrimPrefix(msg.Attachment.URL, "http://127.0.0.1:12345")
response = request(t, s, "GET", path, "", nil)
require.Equal(t, 200, response.Code)
require.Equal(t, content, response.Body.String())
// Prune and makes sure it's gone
time.Sleep(time.Second) // Sigh ...
s.updateStatsAndPrune()
require.NoFileExists(t, file)
response = request(t, s, "GET", path, "", nil)
require.Equal(t, 404, response.Code)
}
func TestServer_PublishAttachmentBandwidthLimit(t *testing.T) {
content := util.RandomString(5000) // > 4096
c := newTestConfig(t)
c.VisitorAttachmentDailyBandwidthLimit = 5*5000 + 123 // A little more than 1 upload and 3 downloads
s := newTestServer(t, c)
// Publish attachment
response := request(t, s, "PUT", "/mytopic", content, nil)
msg := toMessage(t, response.Body.String())
require.Contains(t, msg.Attachment.URL, "http://127.0.0.1:12345/file/")
// Get it 4 times successfully
path := strings.TrimPrefix(msg.Attachment.URL, "http://127.0.0.1:12345")
for i := 1; i <= 4; i++ { // 4 successful downloads
response = request(t, s, "GET", path, "", nil)
require.Equal(t, 200, response.Code)
require.Equal(t, content, response.Body.String())
}
// And then fail with a 429
response = request(t, s, "GET", path, "", nil)
err := toHTTPError(t, response.Body.String())
require.Equal(t, 429, response.Code)
require.Equal(t, 42905, err.Code)
}
func TestServer_PublishAttachmentBandwidthLimitUploadOnly(t *testing.T) {
content := util.RandomString(5000) // > 4096
c := newTestConfig(t)
c.VisitorAttachmentDailyBandwidthLimit = 5*5000 + 500 // 5 successful uploads
s := newTestServer(t, c)
// 5 successful uploads
for i := 1; i <= 5; i++ {
response := request(t, s, "PUT", "/mytopic", content, nil)
msg := toMessage(t, response.Body.String())
require.Contains(t, msg.Attachment.URL, "http://127.0.0.1:12345/file/")
}
// And a failed one
response := request(t, s, "PUT", "/mytopic", content, nil)
err := toHTTPError(t, response.Body.String())
require.Equal(t, 400, response.Code)
require.Equal(t, 40012, err.Code)
}
func newTestConfig(t *testing.T) *Config {
conf := NewConfig()
conf.BaseURL = "http://127.0.0.1:12345"
conf.CacheFile = filepath.Join(t.TempDir(), "cache.db")
conf.AttachmentCacheDir = t.TempDir()
return conf
}
@ -669,6 +933,7 @@ func request(t *testing.T, s *Server, method, url, body string, headers map[stri
if err != nil {
t.Fatal(err)
}
req.RemoteAddr = "9.9.9.9" // Used for tests
for k, v := range headers {
req.Header.Set(k, v)
}

69
server/util.go Normal file
View file

@ -0,0 +1,69 @@
package server
import (
"fmt"
"heckel.io/ntfy/util"
"io"
"net/http"
"net/url"
"path"
"strconv"
"time"
)
const (
peakAttachmentTimeout = 2500 * time.Millisecond
peakAttachmentReadBytes = 128
)
func maybePeakAttachmentURL(m *message) error {
return maybePeakAttachmentURLInternal(m, peakAttachmentTimeout)
}
func maybePeakAttachmentURLInternal(m *message, timeout time.Duration) error {
if m.Attachment == nil || m.Attachment.URL == "" {
return nil
}
client := http.Client{
Timeout: timeout,
Transport: &http.Transport{
DisableCompression: true, // Disable "Accept-Encoding: gzip", otherwise we won't get the Content-Length
Proxy: http.ProxyFromEnvironment,
},
}
req, err := http.NewRequest(http.MethodGet, m.Attachment.URL, nil)
if err != nil {
return err
}
req.Header.Set("User-Agent", "ntfy")
resp, err := client.Do(req)
if err != nil {
return errHTTPBadRequestAttachmentURLPeakGeneral
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
return errHTTPBadRequestAttachmentURLPeakNon2xx
}
if size, err := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64); err == nil {
m.Attachment.Size = size
}
buf := make([]byte, peakAttachmentReadBytes)
io.ReadFull(resp.Body, buf) // Best effort: We don't care about the error
mimeType, ext := util.DetectContentType(buf, m.Attachment.URL)
m.Attachment.Type = resp.Header.Get("Content-Type")
if m.Attachment.Type == "" {
m.Attachment.Type = mimeType
}
if m.Attachment.Name == "" {
u, err := url.Parse(m.Attachment.URL)
if err != nil {
m.Attachment.Name = fmt.Sprintf("attachment%s", ext)
} else {
m.Attachment.Name = path.Base(u.Path)
if m.Attachment.Name == "." || m.Attachment.Name == "/" {
m.Attachment.Name = fmt.Sprintf("attachment%s", ext)
}
}
}
return nil
}

19
server/util_test.go Normal file
View file

@ -0,0 +1,19 @@
package server
import (
"github.com/stretchr/testify/require"
"testing"
)
func TestMaybePeakAttachmentURL_Success(t *testing.T) {
m := &message{
Attachment: &attachment{
URL: "https://ntfy.sh/static/img/ntfy.png",
},
}
require.Nil(t, maybePeakAttachmentURL(m))
require.Equal(t, "ntfy.png", m.Attachment.Name)
require.Equal(t, int64(3627), m.Attachment.Size)
require.Equal(t, "image/png", m.Attachment.Type)
require.Equal(t, int64(0), m.Attachment.Expires)
}

View file

@ -25,7 +25,8 @@ type visitor struct {
ip string
requests *rate.Limiter
emails *rate.Limiter
subscriptions *util.Limiter
subscriptions util.Limiter
bandwidth util.Limiter
seen time.Time
mu sync.Mutex
}
@ -36,7 +37,8 @@ func newVisitor(conf *Config, ip string) *visitor {
ip: ip,
requests: rate.NewLimiter(rate.Every(conf.VisitorRequestLimitReplenish), conf.VisitorRequestLimitBurst),
emails: rate.NewLimiter(rate.Every(conf.VisitorEmailLimitReplenish), conf.VisitorEmailLimitBurst),
subscriptions: util.NewLimiter(int64(conf.VisitorSubscriptionLimit)),
subscriptions: util.NewFixedLimiter(int64(conf.VisitorSubscriptionLimit)),
bandwidth: util.NewBytesLimiter(conf.VisitorAttachmentDailyBandwidthLimit, 24*time.Hour),
seen: time.Now(),
}
}
@ -62,7 +64,7 @@ func (v *visitor) EmailAllowed() error {
func (v *visitor) SubscriptionAllowed() error {
v.mu.Lock()
defer v.mu.Unlock()
if err := v.subscriptions.Add(1); err != nil {
if err := v.subscriptions.Allow(1); err != nil {
return errVisitorLimitReached
}
return nil
@ -71,7 +73,7 @@ func (v *visitor) SubscriptionAllowed() error {
func (v *visitor) RemoveSubscription() {
v.mu.Lock()
defer v.mu.Unlock()
v.subscriptions.Sub(1)
v.subscriptions.Allow(-1)
}
func (v *visitor) Keepalive() {
@ -80,6 +82,10 @@ func (v *visitor) Keepalive() {
v.seen = time.Now()
}
func (v *visitor) BandwidthLimiter() util.Limiter {
return v.bandwidth
}
func (v *visitor) Stale() bool {
v.mu.Lock()
defer v.mu.Unlock()

View file

@ -0,0 +1,42 @@
package util
import (
"net/http"
"strings"
)
// ContentTypeWriter is an implementation of http.ResponseWriter that will detect the content type and set the
// Content-Type and (optionally) Content-Disposition headers accordingly.
//
// It will always set a Content-Type based on http.DetectContentType, but will never send the "text/html"
// content type.
type ContentTypeWriter struct {
w http.ResponseWriter
filename string
sniffed bool
}
// NewContentTypeWriter creates a new ContentTypeWriter
func NewContentTypeWriter(w http.ResponseWriter, filename string) *ContentTypeWriter {
return &ContentTypeWriter{w, filename, false}
}
func (w *ContentTypeWriter) Write(p []byte) (n int, err error) {
if w.sniffed {
return w.w.Write(p)
}
// Detect and set Content-Type header
// Fix content types that we don't want to inline-render in the browser. In particular,
// we don't want to render HTML in the browser for security reasons.
contentType, _ := DetectContentType(p, w.filename)
if strings.HasPrefix(contentType, "text/html") {
contentType = strings.ReplaceAll(contentType, "text/html", "text/plain")
} else if contentType == "application/octet-stream" {
contentType = "" // Reset to let downstream http.ResponseWriter take care of it
}
if contentType != "" {
w.w.Header().Set("Content-Type", contentType)
}
w.sniffed = true
return w.w.Write(p)
}

View file

@ -0,0 +1,57 @@
package util
import (
"crypto/rand"
"github.com/stretchr/testify/require"
"net/http/httptest"
"testing"
)
func TestSniffWriter_WriteHTML(t *testing.T) {
rr := httptest.NewRecorder()
sw := NewContentTypeWriter(rr, "")
sw.Write([]byte("<script>alert('hi')</script>"))
require.Equal(t, "text/plain; charset=utf-8", rr.Header().Get("Content-Type"))
}
func TestSniffWriter_WriteTwoWriteCalls(t *testing.T) {
rr := httptest.NewRecorder()
sw := NewContentTypeWriter(rr, "")
sw.Write([]byte{0x25, 0x50, 0x44, 0x46, 0x2d, 0x11, 0x22, 0x33})
sw.Write([]byte("<script>alert('hi')</script>"))
require.Equal(t, "application/pdf", rr.Header().Get("Content-Type"))
}
func TestSniffWriter_NoSniffWriterWriteHTML(t *testing.T) {
// This test just makes sure that without the sniff-w, we would get text/html
rr := httptest.NewRecorder()
rr.Write([]byte("<script>alert('hi')</script>"))
require.Equal(t, "text/html; charset=utf-8", rr.Header().Get("Content-Type"))
}
func TestSniffWriter_WriteHTMLSplitIntoTwoWrites(t *testing.T) {
// This test shows how splitting the HTML into two Write() calls will still yield text/plain
rr := httptest.NewRecorder()
sw := NewContentTypeWriter(rr, "")
sw.Write([]byte("<scr"))
sw.Write([]byte("ipt>alert('hi')</script>"))
require.Equal(t, "text/plain; charset=utf-8", rr.Header().Get("Content-Type"))
}
func TestSniffWriter_WriteUnknownMimeType(t *testing.T) {
rr := httptest.NewRecorder()
sw := NewContentTypeWriter(rr, "")
randomBytes := make([]byte, 199)
rand.Read(randomBytes)
sw.Write(randomBytes)
require.Equal(t, "application/octet-stream", rr.Header().Get("Content-Type"))
}
func TestSniffWriter_WriteWithFilenameAPK(t *testing.T) {
rr := httptest.NewRecorder()
sw := NewContentTypeWriter(rr, "https://example.com/ntfy.apk")
sw.Write([]byte{0x50, 0x4B, 0x03, 0x04})
require.Equal(t, "application/vnd.android.package-archive", rr.Header().Get("Content-Type"))
}

View file

@ -2,59 +2,109 @@ package util
import (
"errors"
"golang.org/x/time/rate"
"io"
"sync"
"time"
)
// ErrLimitReached is the error returned by the Limiter and LimitWriter when the predefined limit has been reached
var ErrLimitReached = errors.New("limit reached")
// Limiter is a helper that allows adding values up to a well-defined limit. Once the limit is reached
// ErrLimitReached will be returned. Limiter may be used by multiple goroutines.
type Limiter struct {
// Limiter is an interface that implements a rate limiting mechanism, e.g. based on time or a fixed value
type Limiter interface {
// Allow adds n to the limiters internal value, or returns ErrLimitReached if the limit has been reached
Allow(n int64) error
}
// FixedLimiter is a helper that allows adding values up to a well-defined limit. Once the limit is reached
// ErrLimitReached will be returned. FixedLimiter may be used by multiple goroutines.
type FixedLimiter struct {
value int64
limit int64
mu sync.Mutex
}
// NewLimiter creates a new Limiter
func NewLimiter(limit int64) *Limiter {
return &Limiter{
// NewFixedLimiter creates a new Limiter
func NewFixedLimiter(limit int64) *FixedLimiter {
return &FixedLimiter{
limit: limit,
}
}
// Add adds n to the limiters internal value, but only if the limit has not been reached. If the limit would be
// Allow adds n to the limiters internal value, but only if the limit has not been reached. If the limit was
// exceeded after adding n, ErrLimitReached is returned.
func (l *Limiter) Add(n int64) error {
func (l *FixedLimiter) Allow(n int64) error {
l.mu.Lock()
defer l.mu.Unlock()
if l.limit == 0 {
l.value += n
return nil
} else if l.value+n <= l.limit {
l.value += n
return nil
} else {
if l.value+n > l.limit {
return ErrLimitReached
}
l.value += n
return nil
}
// RateLimiter is a Limiter that wraps a rate.Limiter, allowing a floating time-based limit.
type RateLimiter struct {
limiter *rate.Limiter
}
// NewRateLimiter creates a new RateLimiter
func NewRateLimiter(r rate.Limit, b int) *RateLimiter {
return &RateLimiter{
limiter: rate.NewLimiter(r, b),
}
}
// Sub subtracts a value from the limiters internal value
func (l *Limiter) Sub(n int64) {
l.Add(-n)
// NewBytesLimiter creates a RateLimiter that is meant to be used for a bytes-per-interval limit,
// e.g. 250 MB per day. And example of the underlying idea can be found here: https://go.dev/play/p/0ljgzIZQ6dJ
func NewBytesLimiter(bytes int, interval time.Duration) *RateLimiter {
return NewRateLimiter(rate.Limit(bytes)*rate.Every(interval), bytes)
}
// Set sets the value of the limiter to n. This function ignores the limit. It is meant to set the value
// based on reality.
func (l *Limiter) Set(n int64) {
l.mu.Lock()
l.value = n
l.mu.Unlock()
// Allow adds n to the limiters internal value, but only if the limit has not been reached. If the limit was
// exceeded after adding n, ErrLimitReached is returned.
func (l *RateLimiter) Allow(n int64) error {
if n <= 0 {
return nil // No-op. Can't take back bytes you're written!
}
if !l.limiter.AllowN(time.Now(), int(n)) {
return ErrLimitReached
}
return nil
}
// Value returns the internal value of the limiter
func (l *Limiter) Value() int64 {
l.mu.Lock()
defer l.mu.Unlock()
return l.value
// LimitWriter implements an io.Writer that will pass through all Write calls to the underlying
// writer w until any of the limiter's limit is reached, at which point a Write will return ErrLimitReached.
// Each limiter's value is increased with every write.
type LimitWriter struct {
w io.Writer
written int64
limiters []Limiter
mu sync.Mutex
}
// NewLimitWriter creates a new LimitWriter
func NewLimitWriter(w io.Writer, limiters ...Limiter) *LimitWriter {
return &LimitWriter{
w: w,
limiters: limiters,
}
}
// Write passes through all writes to the underlying writer until any of the given limiter's limit is reached
func (w *LimitWriter) Write(p []byte) (n int, err error) {
w.mu.Lock()
defer w.mu.Unlock()
for i := 0; i < len(w.limiters); i++ {
if err := w.limiters[i].Allow(int64(len(p))); err != nil {
for j := i - 1; j >= 0; j-- {
w.limiters[j].Allow(-int64(len(p))) // Revert limiters limits if allowed
}
return 0, ErrLimitReached
}
}
n, err = w.w.Write(p)
w.written += int64(n)
return
}

View file

@ -1,30 +1,139 @@
package util
import (
"bytes"
"github.com/stretchr/testify/require"
"testing"
"time"
)
func TestLimiter_Add(t *testing.T) {
l := NewLimiter(10)
if err := l.Add(5); err != nil {
func TestFixedLimiter_Add(t *testing.T) {
l := NewFixedLimiter(10)
if err := l.Allow(5); err != nil {
t.Fatal(err)
}
if err := l.Add(5); err != nil {
if err := l.Allow(5); err != nil {
t.Fatal(err)
}
if err := l.Add(5); err != ErrLimitReached {
if err := l.Allow(5); err != ErrLimitReached {
t.Fatalf("expected ErrLimitReached, got %#v", err)
}
}
func TestLimiter_AddSub(t *testing.T) {
l := NewLimiter(10)
l.Add(5)
if l.Value() != 5 {
t.Fatalf("expected value to be %d, got %d", 5, l.Value())
func TestFixedLimiter_AddSub(t *testing.T) {
l := NewFixedLimiter(10)
l.Allow(5)
if l.value != 5 {
t.Fatalf("expected value to be %d, got %d", 5, l.value)
}
l.Sub(2)
if l.Value() != 3 {
t.Fatalf("expected value to be %d, got %d", 3, l.Value())
l.Allow(-2)
if l.value != 3 {
t.Fatalf("expected value to be %d, got %d", 7, l.value)
}
}
func TestBytesLimiter_Add_Simple(t *testing.T) {
l := NewBytesLimiter(250*1024*1024, 24*time.Hour) // 250 MB per 24h
require.Nil(t, l.Allow(100*1024*1024))
require.Nil(t, l.Allow(100*1024*1024))
require.Equal(t, ErrLimitReached, l.Allow(300*1024*1024))
}
func TestBytesLimiter_Add_Wait(t *testing.T) {
l := NewBytesLimiter(250*1024*1024, 24*time.Hour) // 250 MB per 24h (~ 303 bytes per 100ms)
require.Nil(t, l.Allow(250*1024*1024))
require.Equal(t, ErrLimitReached, l.Allow(400))
time.Sleep(200 * time.Millisecond)
require.Nil(t, l.Allow(400))
}
func TestLimitWriter_WriteNoLimiter(t *testing.T) {
var buf bytes.Buffer
lw := NewLimitWriter(&buf)
if _, err := lw.Write(make([]byte, 10)); err != nil {
t.Fatal(err)
}
if _, err := lw.Write(make([]byte, 1)); err != nil {
t.Fatal(err)
}
if buf.Len() != 11 {
t.Fatalf("expected buffer length to be %d, got %d", 11, buf.Len())
}
}
func TestLimitWriter_WriteOneLimiter(t *testing.T) {
var buf bytes.Buffer
l := NewFixedLimiter(10)
lw := NewLimitWriter(&buf, l)
if _, err := lw.Write(make([]byte, 10)); err != nil {
t.Fatal(err)
}
if _, err := lw.Write(make([]byte, 1)); err != ErrLimitReached {
t.Fatalf("expected ErrLimitReached, got %#v", err)
}
if buf.Len() != 10 {
t.Fatalf("expected buffer length to be %d, got %d", 10, buf.Len())
}
if l.value != 10 {
t.Fatalf("expected limiter value to be %d, got %d", 10, l.value)
}
}
func TestLimitWriter_WriteTwoLimiters(t *testing.T) {
var buf bytes.Buffer
l1 := NewFixedLimiter(11)
l2 := NewFixedLimiter(9)
lw := NewLimitWriter(&buf, l1, l2)
if _, err := lw.Write(make([]byte, 8)); err != nil {
t.Fatal(err)
}
if _, err := lw.Write(make([]byte, 2)); err != ErrLimitReached {
t.Fatalf("expected ErrLimitReached, got %#v", err)
}
if buf.Len() != 8 {
t.Fatalf("expected buffer length to be %d, got %d", 8, buf.Len())
}
if l1.value != 8 {
t.Fatalf("expected limiter 1 value to be %d, got %d", 8, l1.value)
}
if l2.value != 8 {
t.Fatalf("expected limiter 2 value to be %d, got %d", 8, l2.value)
}
}
func TestLimitWriter_WriteTwoDifferentLimiters(t *testing.T) {
var buf bytes.Buffer
l1 := NewFixedLimiter(32)
l2 := NewBytesLimiter(8, 200*time.Millisecond)
lw := NewLimitWriter(&buf, l1, l2)
_, err := lw.Write(make([]byte, 8))
require.Nil(t, err)
_, err = lw.Write(make([]byte, 4))
require.Equal(t, ErrLimitReached, err)
}
func TestLimitWriter_WriteTwoDifferentLimiters_Wait(t *testing.T) {
var buf bytes.Buffer
l1 := NewFixedLimiter(32)
l2 := NewBytesLimiter(8, 200*time.Millisecond)
lw := NewLimitWriter(&buf, l1, l2)
_, err := lw.Write(make([]byte, 8))
require.Nil(t, err)
time.Sleep(250 * time.Millisecond)
_, err = lw.Write(make([]byte, 8))
require.Nil(t, err)
_, err = lw.Write(make([]byte, 4))
require.Equal(t, ErrLimitReached, err)
}
func TestLimitWriter_WriteTwoDifferentLimiters_Wait_FixedLimiterFail(t *testing.T) {
var buf bytes.Buffer
l1 := NewFixedLimiter(11) // <<< This fails below
l2 := NewBytesLimiter(8, 200*time.Millisecond)
lw := NewLimitWriter(&buf, l1, l2)
_, err := lw.Write(make([]byte, 8))
require.Nil(t, err)
time.Sleep(250 * time.Millisecond)
_, err = lw.Write(make([]byte, 8)) // <<< FixedLimiter fails
require.Equal(t, ErrLimitReached, err)
}

61
util/peak.go Normal file
View file

@ -0,0 +1,61 @@
package util
import (
"bytes"
"io"
"strings"
)
// PeakedReadCloser is a ReadCloser that allows peaking into a stream and buffering it in memory.
// It can be instantiated using the Peak function. After a stream has been peaked, it can still be fully
// read by reading the PeakedReadCloser. It first drained from the memory buffer, and then from the remaining
// underlying reader.
type PeakedReadCloser struct {
PeakedBytes []byte
LimitReached bool
peaked io.Reader
underlying io.ReadCloser
closed bool
}
// Peak reads the underlying ReadCloser into memory up until the limit and returns a PeakedReadCloser
func Peak(underlying io.ReadCloser, limit int) (*PeakedReadCloser, error) {
if underlying == nil {
underlying = io.NopCloser(strings.NewReader(""))
}
peaked := make([]byte, limit)
read, err := io.ReadFull(underlying, peaked)
if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
return nil, err
}
return &PeakedReadCloser{
PeakedBytes: peaked[:read],
LimitReached: read == limit,
underlying: underlying,
peaked: bytes.NewReader(peaked[:read]),
closed: false,
}, nil
}
// Read reads from the peaked bytes and then from the underlying stream
func (r *PeakedReadCloser) Read(p []byte) (n int, err error) {
if r.closed {
return 0, io.EOF
}
n, err = r.peaked.Read(p)
if err == io.EOF {
return r.underlying.Read(p)
} else if err != nil {
return 0, err
}
return
}
// Close closes the underlying stream
func (r *PeakedReadCloser) Close() error {
if r.closed {
return io.EOF
}
r.closed = true
return r.underlying.Close()
}

55
util/peak_test.go Normal file
View file

@ -0,0 +1,55 @@
package util
import (
"github.com/stretchr/testify/require"
"io"
"strings"
"testing"
)
func TestPeak_LimitReached(t *testing.T) {
underlying := io.NopCloser(strings.NewReader("1234567890"))
peaked, err := Peak(underlying, 5)
if err != nil {
t.Fatal(err)
}
require.Equal(t, []byte("12345"), peaked.PeakedBytes)
require.Equal(t, true, peaked.LimitReached)
all, err := io.ReadAll(peaked)
if err != nil {
t.Fatal(err)
}
require.Equal(t, []byte("1234567890"), all)
require.Equal(t, []byte("12345"), peaked.PeakedBytes)
require.Equal(t, true, peaked.LimitReached)
}
func TestPeak_LimitNotReached(t *testing.T) {
underlying := io.NopCloser(strings.NewReader("1234567890"))
peaked, err := Peak(underlying, 15)
if err != nil {
t.Fatal(err)
}
all, err := io.ReadAll(peaked)
if err != nil {
t.Fatal(err)
}
require.Equal(t, []byte("1234567890"), all)
require.Equal(t, []byte("1234567890"), peaked.PeakedBytes)
require.Equal(t, false, peaked.LimitReached)
}
func TestPeak_Nil(t *testing.T) {
peaked, err := Peak(nil, 15)
if err != nil {
t.Fatal(err)
}
all, err := io.ReadAll(peaked)
if err != nil {
t.Fatal(err)
}
require.Equal(t, []byte(""), all)
require.Equal(t, []byte(""), peaked.PeakedBytes)
require.Equal(t, false, peaked.LimitReached)
}

View file

@ -3,8 +3,11 @@ package util
import (
"errors"
"fmt"
"github.com/gabriel-vasile/mimetype"
"math/rand"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"
@ -17,7 +20,7 @@ const (
var (
random = rand.New(rand.NewSource(time.Now().UnixNano()))
randomMutex = sync.Mutex{}
sizeStrRegex = regexp.MustCompile(`(?i)^(\d+)([gmkb])?$`)
errInvalidPriority = errors.New("invalid priority")
)
@ -163,3 +166,39 @@ func ExpandHome(path string) string {
func ShortTopicURL(s string) string {
return strings.TrimPrefix(strings.TrimPrefix(s, "https://"), "http://")
}
// DetectContentType probes the byte array b and returns mime type and file extension.
// The filename is only used to override certain special cases.
func DetectContentType(b []byte, filename string) (mimeType string, ext string) {
if strings.HasSuffix(strings.ToLower(filename), ".apk") {
return "application/vnd.android.package-archive", ".apk"
}
m := mimetype.Detect(b)
mimeType, ext = m.String(), m.Extension()
if ext == "" {
ext = ".bin"
}
return
}
// ParseSize parses a size string like 2K or 2M into bytes. If no unit is found, e.g. 123, bytes is assumed.
func ParseSize(s string) (int64, error) {
matches := sizeStrRegex.FindStringSubmatch(s)
if matches == nil {
return -1, fmt.Errorf("invalid size %s", s)
}
value, err := strconv.Atoi(matches[1])
if err != nil {
return -1, fmt.Errorf("cannot convert number %s", matches[1])
}
switch strings.ToUpper(matches[2]) {
case "G":
return int64(value) * 1024 * 1024 * 1024, nil
case "M":
return int64(value) * 1024 * 1024, nil
case "K":
return int64(value) * 1024, nil
default:
return int64(value), nil
}
}

View file

@ -121,3 +121,34 @@ func TestShortTopicURL(t *testing.T) {
require.Equal(t, "ntfy.sh/mytopic", ShortTopicURL("http://ntfy.sh/mytopic"))
require.Equal(t, "lalala", ShortTopicURL("lalala"))
}
func TestParseSize_10GSuccess(t *testing.T) {
s, err := ParseSize("10G")
if err != nil {
t.Fatal(err)
}
require.Equal(t, int64(10*1024*1024*1024), s)
}
func TestParseSize_10MUpperCaseSuccess(t *testing.T) {
s, err := ParseSize("10M")
if err != nil {
t.Fatal(err)
}
require.Equal(t, int64(10*1024*1024), s)
}
func TestParseSize_10kLowerCaseSuccess(t *testing.T) {
s, err := ParseSize("10k")
if err != nil {
t.Fatal(err)
}
require.Equal(t, int64(10*1024), s)
}
func TestParseSize_FailureInvalid(t *testing.T) {
_, err := ParseSize("not a size")
if err == nil {
t.Fatalf("expected error, but got none")
}
}