From 846ee0fb2d421615c0cbc448f3e98c3b7cca9499 Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Sat, 15 Jan 2022 13:23:35 -0500 Subject: [PATCH 1/6] Websockets; working --- docs/config.md | 4 +- go.mod | 2 + go.sum | 3 ++ server/config.go | 2 +- server/server.go | 109 ++++++++++++++++++++++++++++++++++++++++++++++ server/server.yml | 2 +- 6 files changed, 118 insertions(+), 4 deletions(-) diff --git a/docs/config.md b/docs/config.md index 2f1c390..70a48fd 100644 --- a/docs/config.md +++ b/docs/config.md @@ -549,7 +549,7 @@ variable before running the `ntfy` command (e.g. `export NTFY_LISTEN_HTTP=:80`). | `smtp-server-listen` | `NTFY_SMTP_SERVER_LISTEN` | `[ip]:port` | - | Defines the IP address and port the SMTP server will listen on, e.g. `:25` or `1.2.3.4:25` | | `smtp-server-domain` | `NTFY_SMTP_SERVER_DOMAIN` | *domain name* | - | SMTP server e-mail domain, e.g. `ntfy.sh` | | `smtp-server-addr-prefix` | `NTFY_SMTP_SERVER_ADDR_PREFIX` | `[ip]:port` | - | Optional prefix for the e-mail addresses to prevent spam, e.g. `ntfy-` | -| `keepalive-interval` | `NTFY_KEEPALIVE_INTERVAL` | *duration* | 55s | Interval in which keepalive messages are sent to the client. This is to prevent intermediaries closing the connection for inactivity. Note that the Android app has a hardcoded timeout at 77s, so it should be less than that. | +| `keepalive-interval` | `NTFY_KEEPALIVE_INTERVAL` | *duration* | 45s | Interval in which keepalive messages are sent to the client. This is to prevent intermediaries closing the connection for inactivity. Note that the Android app has a hardcoded timeout at 77s, so it should be less than that. | | `manager-interval` | `$NTFY_MANAGER_INTERVAL` | *duration* | 1m | Interval in which the manager prunes old messages, deletes topics and prints the stats. | | `global-topic-limit` | `NTFY_GLOBAL_TOPIC_LIMIT` | *number* | 15,000 | Rate limiting: Total number of topics before the server rejects new topics. | | `visitor-subscription-limit` | `NTFY_VISITOR_SUBSCRIPTION_LIMIT` | *number* | 30 | Rate limiting: Number of subscriptions per visitor (IP address) | @@ -597,7 +597,7 @@ OPTIONS: --attachment-total-size-limit value, -A value limit of the on-disk attachment cache (default: 5G) [$NTFY_ATTACHMENT_TOTAL_SIZE_LIMIT] --attachment-file-size-limit value, -Y value per-file attachment size limit (e.g. 300k, 2M, 100M) (default: 15M) [$NTFY_ATTACHMENT_FILE_SIZE_LIMIT] --attachment-expiry-duration value, -X value duration after which uploaded attachments will be deleted (e.g. 3h, 20h) (default: 3h) [$NTFY_ATTACHMENT_EXPIRY_DURATION] - --keepalive-interval value, -k value interval of keepalive messages (default: 55s) [$NTFY_KEEPALIVE_INTERVAL] + --keepalive-interval value, -k value interval of keepalive messages (default: 45s) [$NTFY_KEEPALIVE_INTERVAL] --manager-interval value, -m value interval of for message pruning and stats printing (default: 1m0s) [$NTFY_MANAGER_INTERVAL] --smtp-sender-addr value SMTP server address (host:port) for outgoing emails [$NTFY_SMTP_SENDER_ADDR] --smtp-sender-user value SMTP user (if e-mail sending is enabled) [$NTFY_SMTP_SENDER_USER] diff --git a/go.mod b/go.mod index 816766c..05f44d6 100644 --- a/go.mod +++ b/go.mod @@ -35,11 +35,13 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.6 // indirect github.com/googleapis/gax-go/v2 v2.1.1 // indirect + github.com/gorilla/websocket v1.4.2 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect go.opencensus.io v0.23.0 // indirect golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20211210111614-af8b64212486 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect diff --git a/go.sum b/go.sum index ef752ff..a912729 100644 --- a/go.sum +++ b/go.sum @@ -189,6 +189,8 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0= github.com/googleapis/gax-go/v2 v2.1.1 h1:dp3bWCh+PPO1zjRRiCSczJav13sBvG4UhNyVTa1KqdU= github.com/googleapis/gax-go/v2 v2.1.1/go.mod h1:hddJymUZASv3XPyGkUpKj8pPO47Rmb0eJc8R6ouapiM= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -356,6 +358,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/server/config.go b/server/config.go index 4f0c1ad..1e2b31c 100644 --- a/server/config.go +++ b/server/config.go @@ -8,7 +8,7 @@ import ( const ( DefaultListenHTTP = ":80" DefaultCacheDuration = 12 * time.Hour - DefaultKeepaliveInterval = 55 * time.Second // Not too frequently to save battery (Android read timeout is 77s!) + DefaultKeepaliveInterval = 45 * time.Second // Not too frequently to save battery (Android read timeout used to be 77s!) DefaultManagerInterval = time.Minute DefaultAtSenderInterval = 10 * time.Second DefaultMinDelay = 10 * time.Second diff --git a/server/server.go b/server/server.go index 14581da..1168c62 100644 --- a/server/server.go +++ b/server/server.go @@ -10,6 +10,8 @@ import ( "firebase.google.com/go/messaging" "fmt" "github.com/emersion/go-smtp" + "github.com/gorilla/websocket" + "golang.org/x/sync/errgroup" "google.golang.org/api/option" "heckel.io/ntfy/util" "html/template" @@ -99,6 +101,7 @@ var ( jsonPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`) ssePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`) rawPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`) + wsPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/ws$`) publishPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/(publish|send|trigger)$`) staticRegex = regexp.MustCompile(`^/static/.+`) @@ -156,6 +159,10 @@ const ( emptyMessageBody = "triggered" // Used if message body is empty defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment fcmMessageLimit = 4000 // see maybeTruncateFCMMessage for details + wsWriteWait = 2 * time.Second + wsBufferSize = 1024 + wsReadLimit = 64 // We only ever receive PINGs + wsPongWait = 15 * time.Second ) // New instantiates a new Server. It creates the cache and adds a Firebase @@ -404,6 +411,8 @@ func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request) error { return s.withRateLimit(w, r, s.handleSubscribeSSE) } else if r.Method == http.MethodGet && rawPathRegex.MatchString(r.URL.Path) { return s.withRateLimit(w, r, s.handleSubscribeRaw) + } else if r.Method == http.MethodGet && wsPathRegex.MatchString(r.URL.Path) { + return s.withRateLimit(w, r, s.handleSubscribeWS) } return errHTTPNotFound } @@ -805,6 +814,106 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi } } +func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *visitor) error { + if err := v.SubscriptionAllowed(); err != nil { + return errHTTPTooManyRequestsLimitSubscriptions + } + defer v.RemoveSubscription() + topicsStr := strings.TrimSuffix(r.URL.Path[1:], "/ws") // Hack + topicIDs := util.SplitNoEmpty(topicsStr, ",") + topics, err := s.topicsFromIDs(topicIDs...) + if err != nil { + return err + } + poll := readParam(r, "x-poll", "poll", "po") == "1" + scheduled := readParam(r, "x-scheduled", "scheduled", "sched") == "1" + since, err := parseSince(r, poll) + if err != nil { + return err + } + messageFilter, titleFilter, priorityFilter, tagsFilter, err := parseQueryFilters(r) + if err != nil { + return err + } + upgrader := &websocket.Upgrader{ + ReadBufferSize: wsBufferSize, + WriteBufferSize: wsBufferSize, + CheckOrigin: func(r *http.Request) bool { + return true // We're open for business! + }, + } + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return err + } + defer conn.Close() + g, ctx := errgroup.WithContext(context.Background()) + g.Go(func() error { + pongWait := s.config.KeepaliveInterval + wsPongWait + conn.SetReadLimit(wsReadLimit) + if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil { + return err + } + conn.SetPongHandler(func(appData string) error { + return conn.SetReadDeadline(time.Now().Add(pongWait)) + }) + for { + _, _, err := conn.NextReader() + if err != nil { + return err + } + } + }) + g.Go(func() error { + ping := func() error { + if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil { + return err + } + return conn.WriteMessage(websocket.PingMessage, nil) + } + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(s.config.KeepaliveInterval): + v.Keepalive() + if err := ping(); err != nil { + return err + } + } + } + }) + sub := func(msg *message) error { + if !passesQueryFilter(msg, messageFilter, titleFilter, priorityFilter, tagsFilter) { + return nil + } + if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil { + return err + } + return conn.WriteJSON(msg) + } + w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests + if poll { + return s.sendOldMessages(topics, since, scheduled, sub) + } + subscriberIDs := make([]int, 0) + for _, t := range topics { + subscriberIDs = append(subscriberIDs, t.Subscribe(sub)) + } + defer func() { + for i, subscriberID := range subscriberIDs { + topics[i].Unsubscribe(subscriberID) // Order! + } + }() + if err := sub(newOpenMessage(topicsStr)); err != nil { // Send out open message + return err + } + if err := s.sendOldMessages(topics, since, scheduled, sub); err != nil { + return err + } + return g.Wait() +} + func parseQueryFilters(r *http.Request) (messageFilter string, titleFilter string, priorityFilter []int, tagsFilter []string, err error) { messageFilter = readParam(r, "x-message", "message", "m") titleFilter = readParam(r, "x-title", "title", "t") diff --git a/server/server.yml b/server/server.yml index c167d03..c65abd7 100644 --- a/server/server.yml +++ b/server/server.yml @@ -98,7 +98,7 @@ # # Note that the Android app has a hardcoded timeout at 77s, so it should be less than that. # -# keepalive-interval: "30s" +# keepalive-interval: "45s" # Interval in which the manager prunes old messages, deletes topics # and prints the stats. From 707c58a120c5389889d713a1e0b512f018e22ef8 Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Sat, 15 Jan 2022 22:33:35 -0500 Subject: [PATCH 2/6] Do not print ugly WS error; tests --- client/client.go | 27 ++++++++++----- cmd/app_test.go | 4 +-- cmd/publish_test.go | 36 ++++++++++++++++++++ cmd/serve_test.go | 68 ++++++++++++++++++++++++++++++++++++++ docs/subscribe/api.md | 3 ++ server/errors.go | 49 ++++++++++++++++++++++++++++ server/server.go | 76 +++++++++++++------------------------------ server/server.yml | 3 +- test/server.go | 3 ++ 9 files changed, 204 insertions(+), 65 deletions(-) create mode 100644 cmd/serve_test.go create mode 100644 server/errors.go diff --git a/client/client.go b/client/client.go index b3bf7ab..c9cc4cd 100644 --- a/client/client.go +++ b/client/client.go @@ -36,14 +36,16 @@ type Client struct { // Message is a struct that represents a ntfy message type Message struct { // TODO combine with server.message - ID string - Event string - Time int64 - Topic string - Message string - Title string - Priority int - Tags []string + ID string + Event string + Time int64 + Topic string + Message string + Title string + Priority int + Tags []string + Click string + Attachment *Attachment // Additional fields TopicURL string @@ -51,6 +53,15 @@ type Message struct { // TODO combine with server.message Raw string } +type Attachment struct { + Name string `json:"name"` + Type string `json:"type,omitempty"` + Size int64 `json:"size,omitempty"` + Expires int64 `json:"expires,omitempty"` + URL string `json:"url"` + Owner string `json:"-"` // IP address of uploader, used for rate limiting +} + type subscription struct { ID string topicURL string diff --git a/cmd/app_test.go b/cmd/app_test.go index c02ef4f..9873dd0 100644 --- a/cmd/app_test.go +++ b/cmd/app_test.go @@ -5,8 +5,6 @@ import ( "encoding/json" "github.com/urfave/cli/v2" "heckel.io/ntfy/client" - "io" - "log" "os" "strings" "testing" @@ -15,7 +13,7 @@ import ( // This only contains helpers so far func TestMain(m *testing.M) { - log.SetOutput(io.Discard) + // log.SetOutput(io.Discard) os.Exit(m.Run()) } diff --git a/cmd/publish_test.go b/cmd/publish_test.go index 80d84f8..23d2d36 100644 --- a/cmd/publish_test.go +++ b/cmd/publish_test.go @@ -34,3 +34,39 @@ func TestCLI_Publish_Subscribe_Poll(t *testing.T) { m = toMessage(t, stdout.String()) require.Equal(t, "some message", m.Message) } + +func TestCLI_Publish_All_The_Things(t *testing.T) { + s, port := test.StartServer(t) + defer test.StopServer(t, s, port) + topic := fmt.Sprintf("http://127.0.0.1:%d/mytopic", port) + + app, _, stdout, _ := newTestApp() + require.Nil(t, app.Run([]string{ + "ntfy", "publish", + "--title", "this is a title", + "--priority", "high", + "--tags", "tag1,tag2", + // No --delay, --email + "--click", "https://ntfy.sh", + "--attach", "https://f-droid.org/F-Droid.apk", + "--filename", "fdroid.apk", + "--no-cache", + "--no-firebase", + topic, + "some message", + })) + m := toMessage(t, stdout.String()) + require.Equal(t, "message", m.Event) + require.Equal(t, "mytopic", m.Topic) + require.Equal(t, "some message", m.Message) + require.Equal(t, "this is a title", m.Title) + require.Equal(t, 4, m.Priority) + require.Equal(t, []string{"tag1", "tag2"}, m.Tags) + require.Equal(t, "https://ntfy.sh", m.Click) + require.Equal(t, "https://f-droid.org/F-Droid.apk", m.Attachment.URL) + require.Equal(t, "fdroid.apk", m.Attachment.Name) + require.Equal(t, int64(0), m.Attachment.Size) + require.Equal(t, "", m.Attachment.Owner) + require.Equal(t, int64(0), m.Attachment.Expires) + require.Equal(t, "", m.Attachment.Type) +} diff --git a/cmd/serve_test.go b/cmd/serve_test.go new file mode 100644 index 0000000..d49fbbb --- /dev/null +++ b/cmd/serve_test.go @@ -0,0 +1,68 @@ +package cmd + +import ( + "fmt" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" + "heckel.io/ntfy/client" + "heckel.io/ntfy/test" + "heckel.io/ntfy/util" + "math/rand" + "os/exec" + "path/filepath" + "testing" + "time" +) + +func init() { + rand.Seed(time.Now().UnixMilli()) +} + +func TestCLI_Serve_Unix_Curl(t *testing.T) { + sockFile := filepath.Join(t.TempDir(), "ntfy.sock") + go func() { + app, _, _, _ := newTestApp() + err := app.Run([]string{"ntfy", "serve", "--listen-http=-", "--listen-unix=" + sockFile}) + require.Nil(t, err) + }() + for i := 0; i < 40 && !util.FileExists(sockFile); i++ { + time.Sleep(50 * time.Millisecond) + } + require.True(t, util.FileExists(sockFile)) + + cmd := exec.Command("curl", "-s", "--unix-socket", sockFile, "-d", "this is a message", "localhost/mytopic") + out, err := cmd.Output() + require.Nil(t, err) + m := toMessage(t, string(out)) + require.Equal(t, "this is a message", m.Message) +} + +func TestCLI_Serve_WebSocket(t *testing.T) { + port := 10000 + rand.Intn(20000) + go func() { + app, _, _, _ := newTestApp() + err := app.Run([]string{"ntfy", "serve", fmt.Sprintf("--listen-http=:%d", port)}) + require.Nil(t, err) + }() + test.WaitForPortUp(t, port) + + ws, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("ws://127.0.0.1:%d/mytopic/ws", port), nil) + require.Nil(t, err) + + messageType, data, err := ws.ReadMessage() + require.Nil(t, err) + require.Equal(t, websocket.TextMessage, messageType) + require.Equal(t, "open", toMessage(t, string(data)).Event) + + c := client.New(client.NewConfig()) + _, err = c.Publish(fmt.Sprintf("http://127.0.0.1:%d/mytopic", port), "my message") + require.Nil(t, err) + + messageType, data, err = ws.ReadMessage() + require.Nil(t, err) + require.Equal(t, websocket.TextMessage, messageType) + + m := toMessage(t, string(data)) + require.Equal(t, "my message", m.Message) + require.Equal(t, "mytopic", m.Topic) +} diff --git a/docs/subscribe/api.md b/docs/subscribe/api.md index 0a4d690..70ef375 100644 --- a/docs/subscribe/api.md +++ b/docs/subscribe/api.md @@ -184,6 +184,9 @@ format. Keepalive messages are sent as empty lines. fclose($fp); ``` +### Subscribe via WebSockets +xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + ## Advanced features ### Poll for messages diff --git a/server/errors.go b/server/errors.go new file mode 100644 index 0000000..ad0d036 --- /dev/null +++ b/server/errors.go @@ -0,0 +1,49 @@ +package server + +import ( + "encoding/json" + "net/http" +) + +// errHTTP is a generic HTTP error for any non-200 HTTP error +type errHTTP struct { + Code int `json:"code,omitempty"` + HTTPCode int `json:"http"` + Message string `json:"error"` + Link string `json:"link,omitempty"` +} + +func (e errHTTP) Error() string { + return e.Message +} + +func (e errHTTP) JSON() string { + b, _ := json.Marshal(&e) + return string(b) +} + +var ( + errHTTPBadRequestEmailDisabled = &errHTTP{40001, http.StatusBadRequest, "e-mail notifications are not enabled", "https://ntfy.sh/docs/config/#e-mail-notifications"} + errHTTPBadRequestDelayNoCache = &errHTTP{40002, http.StatusBadRequest, "cannot disable cache for delayed message", ""} + errHTTPBadRequestDelayNoEmail = &errHTTP{40003, http.StatusBadRequest, "delayed e-mail notifications are not supported", ""} + errHTTPBadRequestDelayCannotParse = &errHTTP{40004, http.StatusBadRequest, "invalid delay parameter: unable to parse delay", "https://ntfy.sh/docs/publish/#scheduled-delivery"} + errHTTPBadRequestDelayTooSmall = &errHTTP{40005, http.StatusBadRequest, "invalid delay parameter: too small, please refer to the docs", "https://ntfy.sh/docs/publish/#scheduled-delivery"} + errHTTPBadRequestDelayTooLarge = &errHTTP{40006, http.StatusBadRequest, "invalid delay parameter: too large, please refer to the docs", "https://ntfy.sh/docs/publish/#scheduled-delivery"} + errHTTPBadRequestPriorityInvalid = &errHTTP{40007, http.StatusBadRequest, "invalid priority parameter", "https://ntfy.sh/docs/publish/#message-priority"} + errHTTPBadRequestSinceInvalid = &errHTTP{40008, http.StatusBadRequest, "invalid since parameter", "https://ntfy.sh/docs/subscribe/api/#fetch-cached-messages"} + errHTTPBadRequestTopicInvalid = &errHTTP{40009, http.StatusBadRequest, "invalid topic: path invalid", ""} + errHTTPBadRequestTopicDisallowed = &errHTTP{40010, http.StatusBadRequest, "invalid topic: topic name is disallowed", ""} + errHTTPBadRequestMessageNotUTF8 = &errHTTP{40011, http.StatusBadRequest, "invalid message: message must be UTF-8 encoded", ""} + errHTTPBadRequestAttachmentTooLarge = &errHTTP{40012, http.StatusBadRequest, "invalid request: attachment too large, or bandwidth limit reached", ""} + errHTTPBadRequestAttachmentURLInvalid = &errHTTP{40013, http.StatusBadRequest, "invalid request: attachment URL is invalid", ""} + errHTTPBadRequestAttachmentsDisallowed = &errHTTP{40014, http.StatusBadRequest, "invalid request: attachments not allowed", ""} + errHTTPBadRequestAttachmentsExpiryBeforeDelivery = &errHTTP{40015, http.StatusBadRequest, "invalid request: attachment expiry before delayed delivery date", ""} + errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", ""} + errHTTPTooManyRequestsLimitRequests = &errHTTP{42901, http.StatusTooManyRequests, "limit reached: too many requests, please be nice", "https://ntfy.sh/docs/publish/#limitations"} + errHTTPTooManyRequestsLimitEmails = &errHTTP{42902, http.StatusTooManyRequests, "limit reached: too many emails, please be nice", "https://ntfy.sh/docs/publish/#limitations"} + errHTTPTooManyRequestsLimitSubscriptions = &errHTTP{42903, http.StatusTooManyRequests, "limit reached: too many active subscriptions, please be nice", "https://ntfy.sh/docs/publish/#limitations"} + errHTTPTooManyRequestsLimitTotalTopics = &errHTTP{42904, http.StatusTooManyRequests, "limit reached: the total number of topics on the server has been reached, please contact the admin", "https://ntfy.sh/docs/publish/#limitations"} + errHTTPTooManyRequestsAttachmentBandwidthLimit = &errHTTP{42905, http.StatusTooManyRequests, "too many requests: daily bandwidth limit reached", "https://ntfy.sh/docs/publish/#limitations"} + errHTTPInternalError = &errHTTP{50001, http.StatusInternalServerError, "internal server error", ""} + errHTTPInternalErrorInvalidFilePath = &errHTTP{50002, http.StatusInternalServerError, "internal server error: invalid file path", ""} +) diff --git a/server/server.go b/server/server.go index 1168c62..57dd0f3 100644 --- a/server/server.go +++ b/server/server.go @@ -54,23 +54,6 @@ type Server struct { mu sync.Mutex } -// errHTTP is a generic HTTP error for any non-200 HTTP error -type errHTTP struct { - Code int `json:"code,omitempty"` - HTTPCode int `json:"http"` - Message string `json:"error"` - Link string `json:"link,omitempty"` -} - -func (e errHTTP) Error() string { - return e.Message -} - -func (e errHTTP) JSON() string { - b, _ := json.Marshal(&e) - return string(b) -} - type indexPage struct { Topic string CacheDuration time.Duration @@ -128,30 +111,6 @@ var ( //go:embed docs docsStaticFs embed.FS docsStaticCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: docsStaticFs} - - errHTTPBadRequestEmailDisabled = &errHTTP{40001, http.StatusBadRequest, "e-mail notifications are not enabled", "https://ntfy.sh/docs/config/#e-mail-notifications"} - errHTTPBadRequestDelayNoCache = &errHTTP{40002, http.StatusBadRequest, "cannot disable cache for delayed message", ""} - errHTTPBadRequestDelayNoEmail = &errHTTP{40003, http.StatusBadRequest, "delayed e-mail notifications are not supported", ""} - errHTTPBadRequestDelayCannotParse = &errHTTP{40004, http.StatusBadRequest, "invalid delay parameter: unable to parse delay", "https://ntfy.sh/docs/publish/#scheduled-delivery"} - errHTTPBadRequestDelayTooSmall = &errHTTP{40005, http.StatusBadRequest, "invalid delay parameter: too small, please refer to the docs", "https://ntfy.sh/docs/publish/#scheduled-delivery"} - errHTTPBadRequestDelayTooLarge = &errHTTP{40006, http.StatusBadRequest, "invalid delay parameter: too large, please refer to the docs", "https://ntfy.sh/docs/publish/#scheduled-delivery"} - errHTTPBadRequestPriorityInvalid = &errHTTP{40007, http.StatusBadRequest, "invalid priority parameter", "https://ntfy.sh/docs/publish/#message-priority"} - errHTTPBadRequestSinceInvalid = &errHTTP{40008, http.StatusBadRequest, "invalid since parameter", "https://ntfy.sh/docs/subscribe/api/#fetch-cached-messages"} - errHTTPBadRequestTopicInvalid = &errHTTP{40009, http.StatusBadRequest, "invalid topic: path invalid", ""} - errHTTPBadRequestTopicDisallowed = &errHTTP{40010, http.StatusBadRequest, "invalid topic: topic name is disallowed", ""} - errHTTPBadRequestMessageNotUTF8 = &errHTTP{40011, http.StatusBadRequest, "invalid message: message must be UTF-8 encoded", ""} - errHTTPBadRequestAttachmentTooLarge = &errHTTP{40012, http.StatusBadRequest, "invalid request: attachment too large, or bandwidth limit reached", ""} - errHTTPBadRequestAttachmentURLInvalid = &errHTTP{40013, http.StatusBadRequest, "invalid request: attachment URL is invalid", ""} - errHTTPBadRequestAttachmentsDisallowed = &errHTTP{40014, http.StatusBadRequest, "invalid request: attachments not allowed", ""} - errHTTPBadRequestAttachmentsExpiryBeforeDelivery = &errHTTP{40015, http.StatusBadRequest, "invalid request: attachment expiry before delayed delivery date", ""} - errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", ""} - errHTTPTooManyRequestsLimitRequests = &errHTTP{42901, http.StatusTooManyRequests, "limit reached: too many requests, please be nice", "https://ntfy.sh/docs/publish/#limitations"} - errHTTPTooManyRequestsLimitEmails = &errHTTP{42902, http.StatusTooManyRequests, "limit reached: too many emails, please be nice", "https://ntfy.sh/docs/publish/#limitations"} - errHTTPTooManyRequestsLimitSubscriptions = &errHTTP{42903, http.StatusTooManyRequests, "limit reached: too many active subscriptions, please be nice", "https://ntfy.sh/docs/publish/#limitations"} - errHTTPTooManyRequestsLimitTotalTopics = &errHTTP{42904, http.StatusTooManyRequests, "limit reached: the total number of topics on the server has been reached, please contact the admin", "https://ntfy.sh/docs/publish/#limitations"} - errHTTPTooManyRequestsAttachmentBandwidthLimit = &errHTTP{42905, http.StatusTooManyRequests, "too many requests: daily bandwidth limit reached", "https://ntfy.sh/docs/publish/#limitations"} - errHTTPInternalError = &errHTTP{50001, http.StatusInternalServerError, "internal server error", ""} - errHTTPInternalErrorInvalidFilePath = &errHTTP{50002, http.StatusInternalServerError, "internal server error: invalid file path", ""} ) const ( @@ -159,10 +118,14 @@ const ( emptyMessageBody = "triggered" // Used if message body is empty defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment fcmMessageLimit = 4000 // see maybeTruncateFCMMessage for details - wsWriteWait = 2 * time.Second - wsBufferSize = 1024 - wsReadLimit = 64 // We only ever receive PINGs - wsPongWait = 15 * time.Second +) + +// WebSocket constants +const ( + wsWriteWait = 2 * time.Second + wsBufferSize = 1024 + wsReadLimit = 64 // We only ever receive PINGs + wsPongWait = 15 * time.Second ) // New instantiates a new Server. It creates the cache and adds a Firebase @@ -371,16 +334,19 @@ func (s *Server) Stop() { func (s *Server) handle(w http.ResponseWriter, r *http.Request) { if err := s.handleInternal(w, r); err != nil { - var e *errHTTP - var ok bool - if e, ok = err.(*errHTTP); !ok { - e = errHTTPInternalError + if websocket.IsWebSocketUpgrade(r) { + log.Printf("[%s] WS %s %s - %s", r.RemoteAddr, r.Method, r.URL.Path, err.Error()) + return // Do not attempt to write to upgraded connection } - log.Printf("[%s] %s - %d - %d - %s", r.RemoteAddr, r.Method, e.HTTPCode, e.Code, err.Error()) + httpErr, ok := err.(*errHTTP) + if !ok { + httpErr = errHTTPInternalError + } + log.Printf("[%s] HTTP %s %s - %d - %d - %s", r.RemoteAddr, r.Method, r.URL.Path, httpErr.HTTPCode, httpErr.Code, err.Error()) w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests - w.WriteHeader(e.HTTPCode) - io.WriteString(w, e.JSON()+"\n") + w.WriteHeader(httpErr.HTTPCode) + io.WriteString(w, httpErr.JSON()+"\n") } } @@ -911,7 +877,11 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi if err := s.sendOldMessages(topics, since, scheduled, sub); err != nil { return err } - return g.Wait() + err = g.Wait() + if err != nil && websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + return nil // Normal closures are not errors + } + return err } func parseQueryFilters(r *http.Request) (messageFilter string, titleFilter string, priorityFilter []int, tagsFilter []string, err error) { diff --git a/server/server.yml b/server/server.yml index c65abd7..736748b 100644 --- a/server/server.yml +++ b/server/server.yml @@ -6,8 +6,9 @@ # base-url: # Listen address for the HTTP & HTTPS web server. If "listen-https" is set, you must also -# set "key-file" and "cert-file". Format: : +# set "key-file" and "cert-file". Format: []:, e.g. "1.2.3.4:8080". # +# To listen on all interfaces, you may omit the IP address, e.g. ":443". # To disable HTTP, set "listen-http" to "-". # # listen-http: ":80" diff --git a/test/server.go b/test/server.go index 07382c5..0b9200a 100644 --- a/test/server.go +++ b/test/server.go @@ -5,6 +5,7 @@ import ( "heckel.io/ntfy/server" "math/rand" "net/http" + "path/filepath" "testing" "time" ) @@ -22,6 +23,8 @@ func StartServer(t *testing.T) (*server.Server, int) { func StartServerWithConfig(t *testing.T, conf *server.Config) (*server.Server, int) { port := 10000 + rand.Intn(20000) conf.ListenHTTP = fmt.Sprintf(":%d", port) + conf.AttachmentCacheDir = t.TempDir() + conf.CacheFile = filepath.Join(t.TempDir(), "cache.db") s, err := server.New(conf) if err != nil { t.Fatal(err) From 2b6363474e18f195108a10beca1305c198e71eaf Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Sat, 15 Jan 2022 23:17:46 -0500 Subject: [PATCH 3/6] Combine things, move stuff --- server/server.go | 169 ++++++++------------------------ server/server_test.go | 58 ----------- server/{message.go => types.go} | 70 +++++++++++++ server/util.go | 55 +++++++++++ server/util_test.go | 66 +++++++++++++ 5 files changed, 231 insertions(+), 187 deletions(-) rename server/{message.go => types.go} (54%) create mode 100644 server/util.go create mode 100644 server/util_test.go diff --git a/server/server.go b/server/server.go index 57dd0f3..06097a2 100644 --- a/server/server.go +++ b/server/server.go @@ -32,9 +32,6 @@ import ( "unicode/utf8" ) -// TODO add "max messages in a topic" limit -// TODO implement "since=" - // Server is the main server, providing the UI and API for ntfy type Server struct { config *Config @@ -59,25 +56,6 @@ type indexPage struct { CacheDuration time.Duration } -type sinceTime time.Time - -func (t sinceTime) IsAll() bool { - return t == sinceAllMessages -} - -func (t sinceTime) IsNone() bool { - return t == sinceNoMessages -} - -func (t sinceTime) Time() time.Time { - return time.Time(t) -} - -var ( - sinceAllMessages = sinceTime(time.Unix(0, 0)) - sinceNoMessages = sinceTime(time.Unix(1, 0)) -) - var ( topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /! topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app! @@ -117,7 +95,6 @@ const ( firebaseControlTopic = "~control" // See Android if changed emptyMessageBody = "triggered" // Used if message body is empty defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment - fcmMessageLimit = 4000 // see maybeTruncateFCMMessage for details ) // WebSocket constants @@ -232,25 +209,6 @@ func createFirebaseSubscriber(conf *Config) (subscriber, error) { }, nil } -// maybeTruncateFCMMessage performs best-effort truncation of FCM messages. -// The docs say the limit is 4000 characters, but during testing it wasn't quite clear -// what fields matter; so we're just capping the serialized JSON to 4000 bytes. -func maybeTruncateFCMMessage(m *messaging.Message) *messaging.Message { - s, err := json.Marshal(m) - if err != nil { - return m - } - if len(s) > fcmMessageLimit { - over := len(s) - fcmMessageLimit + 16 // = len("truncated":"1",), sigh ... - message, ok := m.Data["message"] - if ok && len(message) > over { - m.Data["truncated"] = "1" - m.Data["message"] = message[:len(message)-over] - } - } - return m -} - // Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts // a manager go routine to print stats and prune messages. func (s *Server) Run() error { @@ -391,7 +349,7 @@ func (s *Server) handleHome(w http.ResponseWriter, r *http.Request) error { } func (s *Server) handleTopic(w http.ResponseWriter, r *http.Request) error { - unifiedpush := readParam(r, "x-unifiedpush", "unifiedpush", "up") == "1" // see PUT/POST too! + unifiedpush := readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see PUT/POST too! if unifiedpush { w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests @@ -497,13 +455,15 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito if err := json.NewEncoder(w).Encode(m); err != nil { return err } - s.inc(&s.messages) + s.mu.Lock() + s.messages++ + s.mu.Unlock() return nil } func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (cache bool, firebase bool, email string, err error) { - cache = readParam(r, "x-cache", "cache") != "no" - firebase = readParam(r, "x-firebase", "firebase") != "no" + cache = readBoolParam(r, true, "x-cache", "cache") + firebase = readBoolParam(r, true, "x-firebase", "firebase") m.Title = readParam(r, "x-title", "title", "t") m.Click = readParam(r, "x-click", "click") filename := readParam(r, "x-filename", "filename", "file", "f") @@ -574,29 +534,13 @@ func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (ca } m.Time = delay.Unix() } - unifiedpush := readParam(r, "x-unifiedpush", "unifiedpush", "up") == "1" // see GET too! + unifiedpush := readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see GET too! if unifiedpush { firebase = false } return cache, firebase, email, nil } -func readParam(r *http.Request, names ...string) string { - for _, name := range names { - value := r.Header.Get(name) - if value != "" { - return strings.TrimSpace(value) - } - } - for _, name := range names { - value := r.URL.Query().Get(strings.ToLower(name)) - if value != "" { - return strings.TrimSpace(value) - } - } - return "" -} - // handlePublishBody consumes the PUT/POST body and decides whether the body is an attachment or the message. // // 1. curl -H "Attach: http://example.com/file.jpg" ntfy.sh/mytopic @@ -680,7 +624,7 @@ func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request, v * } return buf.String(), nil } - return s.handleSubscribe(w, r, v, "json", "application/x-ndjson", encoder) + return s.handleSubscribeHTTP(w, r, v, "application/x-ndjson", encoder) } func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request, v *visitor) error { @@ -694,7 +638,7 @@ func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request, v *v } return fmt.Sprintf("data: %s\n", buf.String()), nil } - return s.handleSubscribe(w, r, v, "sse", "text/event-stream", encoder) + return s.handleSubscribeHTTP(w, r, v, "text/event-stream", encoder) } func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *visitor) error { @@ -704,33 +648,25 @@ func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *v } return "\n", nil // "keepalive" and "open" events just send an empty line } - return s.handleSubscribe(w, r, v, "raw", "text/plain", encoder) + return s.handleSubscribeHTTP(w, r, v, "text/plain", encoder) } -func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visitor, format string, contentType string, encoder messageEncoder) error { +func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *visitor, contentType string, encoder messageEncoder) error { if err := v.SubscriptionAllowed(); err != nil { return errHTTPTooManyRequestsLimitSubscriptions } defer v.RemoveSubscription() - topicsStr := strings.TrimSuffix(r.URL.Path[1:], "/"+format) // Hack - topicIDs := util.SplitNoEmpty(topicsStr, ",") - topics, err := s.topicsFromIDs(topicIDs...) + topics, topicsStr, err := s.topicsFromPath(r.URL.Path) if err != nil { return err } - poll := readParam(r, "x-poll", "poll", "po") == "1" - scheduled := readParam(r, "x-scheduled", "scheduled", "sched") == "1" - since, err := parseSince(r, poll) - if err != nil { - return err - } - messageFilter, titleFilter, priorityFilter, tagsFilter, err := parseQueryFilters(r) + poll, since, scheduled, filters, err := parseSubscribeParams(r) if err != nil { return err } var wlock sync.Mutex sub := func(msg *message) error { - if !passesQueryFilter(msg, messageFilter, titleFilter, priorityFilter, tagsFilter) { + if !filters.Pass(msg) { return nil } m, err := encoder(msg) @@ -785,19 +721,11 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi return errHTTPTooManyRequestsLimitSubscriptions } defer v.RemoveSubscription() - topicsStr := strings.TrimSuffix(r.URL.Path[1:], "/ws") // Hack - topicIDs := util.SplitNoEmpty(topicsStr, ",") - topics, err := s.topicsFromIDs(topicIDs...) + topics, topicsStr, err := s.topicsFromPath(r.URL.Path) if err != nil { return err } - poll := readParam(r, "x-poll", "poll", "po") == "1" - scheduled := readParam(r, "x-scheduled", "scheduled", "sched") == "1" - since, err := parseSince(r, poll) - if err != nil { - return err - } - messageFilter, titleFilter, priorityFilter, tagsFilter, err := parseQueryFilters(r) + poll, since, scheduled, filters, err := parseSubscribeParams(r) if err != nil { return err } @@ -850,7 +778,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi } }) sub := func(msg *message) error { - if !passesQueryFilter(msg, messageFilter, titleFilter, priorityFilter, tagsFilter) { + if !filters.Pass(msg) { return nil } if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil { @@ -884,44 +812,20 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi return err } -func parseQueryFilters(r *http.Request) (messageFilter string, titleFilter string, priorityFilter []int, tagsFilter []string, err error) { - messageFilter = readParam(r, "x-message", "message", "m") - titleFilter = readParam(r, "x-title", "title", "t") - tagsFilter = util.SplitNoEmpty(readParam(r, "x-tags", "tags", "tag", "ta"), ",") - priorityFilter = make([]int, 0) - for _, p := range util.SplitNoEmpty(readParam(r, "x-priority", "priority", "prio", "p"), ",") { - priority, err := util.ParsePriority(p) - if err != nil { - return "", "", nil, nil, err - } - priorityFilter = append(priorityFilter, priority) +func parseSubscribeParams(r *http.Request) (poll bool, since sinceTime, scheduled bool, filters *queryFilter, err error) { + poll = readBoolParam(r, false, "x-poll", "poll", "po") + scheduled = readBoolParam(r, false, "x-scheduled", "scheduled", "sched") + since, err = parseSince(r, poll) + if err != nil { + return + } + filters, err = parseQueryFilters(r) + if err != nil { + return } return } -func passesQueryFilter(msg *message, messageFilter string, titleFilter string, priorityFilter []int, tagsFilter []string) bool { - if msg.Event != messageEvent { - return true // filters only apply to messages - } - if messageFilter != "" && msg.Message != messageFilter { - return false - } - if titleFilter != "" && msg.Title != titleFilter { - return false - } - messagePriority := msg.Priority - if messagePriority == 0 { - messagePriority = 3 // For query filters, default priority (3) is the same as "not set" (0) - } - if len(priorityFilter) > 0 && !util.InIntList(priorityFilter, messagePriority) { - return false - } - if len(tagsFilter) > 0 && !util.InStringListAll(msg.Tags, tagsFilter) { - return false - } - return true -} - func (s *Server) sendOldMessages(topics []*topic, since sinceTime, scheduled bool, sub subscriber) error { if since.IsNone() { return nil @@ -980,6 +884,19 @@ func (s *Server) topicFromPath(path string) (*topic, error) { return topics[0], nil } +func (s *Server) topicsFromPath(path string) ([]*topic, string, error) { + parts := strings.Split(path, "/") + if len(parts) < 2 { + return nil, "", errHTTPBadRequestTopicInvalid + } + topicIDs := util.SplitNoEmpty(parts[1], ",") + topics, err := s.topicsFromIDs(topicIDs...) + if err != nil { + return nil, "", errHTTPBadRequestTopicInvalid + } + return topics, parts[1], nil +} + func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) { s.mu.Lock() defer s.mu.Unlock() @@ -1180,9 +1097,3 @@ func (s *Server) visitor(r *http.Request) *visitor { v.Keepalive() return v } - -func (s *Server) inc(counter *int64) { - s.mu.Lock() - defer s.mu.Unlock() - *counter++ -} diff --git a/server/server_test.go b/server/server_test.go index 492edf9..f888136 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -4,7 +4,6 @@ import ( "bufio" "context" "encoding/json" - "firebase.google.com/go/messaging" "fmt" "github.com/stretchr/testify/require" "heckel.io/ntfy/util" @@ -624,63 +623,6 @@ func TestServer_UnifiedPushDiscovery(t *testing.T) { require.Equal(t, `{"unifiedpush":{"version":1}}`+"\n", response.Body.String()) } -func TestServer_MaybeTruncateFCMMessage(t *testing.T) { - origMessage := strings.Repeat("this is a long string", 300) - origFCMMessage := &messaging.Message{ - Topic: "mytopic", - Data: map[string]string{ - "id": "abcdefg", - "time": "1641324761", - "event": "message", - "topic": "mytopic", - "priority": "0", - "tags": "", - "title": "", - "message": origMessage, - }, - Android: &messaging.AndroidConfig{ - Priority: "high", - }, - } - origMessageLength := len(origFCMMessage.Data["message"]) - serializedOrigFCMMessage, _ := json.Marshal(origFCMMessage) - require.Greater(t, len(serializedOrigFCMMessage), fcmMessageLimit) // Pre-condition - - truncatedFCMMessage := maybeTruncateFCMMessage(origFCMMessage) - truncatedMessageLength := len(truncatedFCMMessage.Data["message"]) - serializedTruncatedFCMMessage, _ := json.Marshal(truncatedFCMMessage) - require.Equal(t, fcmMessageLimit, len(serializedTruncatedFCMMessage)) - require.Equal(t, "1", truncatedFCMMessage.Data["truncated"]) - require.NotEqual(t, origMessageLength, truncatedMessageLength) -} - -func TestServer_MaybeTruncateFCMMessage_NotTooLong(t *testing.T) { - origMessage := "not really a long string" - origFCMMessage := &messaging.Message{ - Topic: "mytopic", - Data: map[string]string{ - "id": "abcdefg", - "time": "1641324761", - "event": "message", - "topic": "mytopic", - "priority": "0", - "tags": "", - "title": "", - "message": origMessage, - }, - } - origMessageLength := len(origFCMMessage.Data["message"]) - serializedOrigFCMMessage, _ := json.Marshal(origFCMMessage) - require.LessOrEqual(t, len(serializedOrigFCMMessage), fcmMessageLimit) // Pre-condition - - notTruncatedFCMMessage := maybeTruncateFCMMessage(origFCMMessage) - notTruncatedMessageLength := len(notTruncatedFCMMessage.Data["message"]) - serializedNotTruncatedFCMMessage, _ := json.Marshal(notTruncatedFCMMessage) - require.Equal(t, origMessageLength, notTruncatedMessageLength) - require.Equal(t, len(serializedOrigFCMMessage), len(serializedNotTruncatedFCMMessage)) - require.Equal(t, "", notTruncatedFCMMessage.Data["truncated"]) -} - func TestServer_PublishAttachment(t *testing.T) { content := util.RandomString(5000) // > 4096 s := newTestServer(t, newTestConfig(t)) diff --git a/server/message.go b/server/types.go similarity index 54% rename from server/message.go rename to server/types.go index 27695f1..357a378 100644 --- a/server/message.go +++ b/server/types.go @@ -2,6 +2,7 @@ package server import ( "heckel.io/ntfy/util" + "net/http" "time" ) @@ -70,3 +71,72 @@ func newKeepaliveMessage(topic string) *message { func newDefaultMessage(topic, msg string) *message { return newMessage(messageEvent, topic, msg) } + +type sinceTime time.Time + +func (t sinceTime) IsAll() bool { + return t == sinceAllMessages +} + +func (t sinceTime) IsNone() bool { + return t == sinceNoMessages +} + +func (t sinceTime) Time() time.Time { + return time.Time(t) +} + +var ( + sinceAllMessages = sinceTime(time.Unix(0, 0)) + sinceNoMessages = sinceTime(time.Unix(1, 0)) +) + +type queryFilter struct { + Message string + Title string + Tags []string + Priority []int +} + +func parseQueryFilters(r *http.Request) (*queryFilter, error) { + messageFilter := readParam(r, "x-message", "message", "m") + titleFilter := readParam(r, "x-title", "title", "t") + tagsFilter := util.SplitNoEmpty(readParam(r, "x-tags", "tags", "tag", "ta"), ",") + priorityFilter := make([]int, 0) + for _, p := range util.SplitNoEmpty(readParam(r, "x-priority", "priority", "prio", "p"), ",") { + priority, err := util.ParsePriority(p) + if err != nil { + return nil, err + } + priorityFilter = append(priorityFilter, priority) + } + return &queryFilter{ + Message: messageFilter, + Title: titleFilter, + Tags: tagsFilter, + Priority: priorityFilter, + }, nil +} + +func (q *queryFilter) Pass(msg *message) bool { + if msg.Event != messageEvent { + return true // filters only apply to messages + } + if q.Message != "" && msg.Message != q.Message { + return false + } + if q.Title != "" && msg.Title != q.Title { + return false + } + messagePriority := msg.Priority + if messagePriority == 0 { + messagePriority = 3 // For query filters, default priority (3) is the same as "not set" (0) + } + if len(q.Priority) > 0 && !util.InIntList(q.Priority, messagePriority) { + return false + } + if len(q.Tags) > 0 && !util.InStringListAll(msg.Tags, q.Tags) { + return false + } + return true +} diff --git a/server/util.go b/server/util.go new file mode 100644 index 0000000..4966cb0 --- /dev/null +++ b/server/util.go @@ -0,0 +1,55 @@ +package server + +import ( + "encoding/json" + "firebase.google.com/go/messaging" + "net/http" + "strings" +) + +const ( + fcmMessageLimit = 4000 +) + +// maybeTruncateFCMMessage performs best-effort truncation of FCM messages. +// The docs say the limit is 4000 characters, but during testing it wasn't quite clear +// what fields matter; so we're just capping the serialized JSON to 4000 bytes. +func maybeTruncateFCMMessage(m *messaging.Message) *messaging.Message { + s, err := json.Marshal(m) + if err != nil { + return m + } + if len(s) > fcmMessageLimit { + over := len(s) - fcmMessageLimit + 16 // = len("truncated":"1",), sigh ... + message, ok := m.Data["message"] + if ok && len(message) > over { + m.Data["truncated"] = "1" + m.Data["message"] = message[:len(message)-over] + } + } + return m +} + +func readBoolParam(r *http.Request, defaultValue bool, names ...string) bool { + value := strings.ToLower(readParam(r, names...)) + if value == "" { + return defaultValue + } + return value == "1" || value == "yes" || value == "true" +} + +func readParam(r *http.Request, names ...string) string { + for _, name := range names { + value := r.Header.Get(name) + if value != "" { + return strings.TrimSpace(value) + } + } + for _, name := range names { + value := r.URL.Query().Get(strings.ToLower(name)) + if value != "" { + return strings.TrimSpace(value) + } + } + return "" +} diff --git a/server/util_test.go b/server/util_test.go new file mode 100644 index 0000000..75a76a2 --- /dev/null +++ b/server/util_test.go @@ -0,0 +1,66 @@ +package server + +import ( + "encoding/json" + "firebase.google.com/go/messaging" + "github.com/stretchr/testify/require" + "strings" + "testing" +) + +func TestMaybeTruncateFCMMessage(t *testing.T) { + origMessage := strings.Repeat("this is a long string", 300) + origFCMMessage := &messaging.Message{ + Topic: "mytopic", + Data: map[string]string{ + "id": "abcdefg", + "time": "1641324761", + "event": "message", + "topic": "mytopic", + "priority": "0", + "tags": "", + "title": "", + "message": origMessage, + }, + Android: &messaging.AndroidConfig{ + Priority: "high", + }, + } + origMessageLength := len(origFCMMessage.Data["message"]) + serializedOrigFCMMessage, _ := json.Marshal(origFCMMessage) + require.Greater(t, len(serializedOrigFCMMessage), fcmMessageLimit) // Pre-condition + + truncatedFCMMessage := maybeTruncateFCMMessage(origFCMMessage) + truncatedMessageLength := len(truncatedFCMMessage.Data["message"]) + serializedTruncatedFCMMessage, _ := json.Marshal(truncatedFCMMessage) + require.Equal(t, fcmMessageLimit, len(serializedTruncatedFCMMessage)) + require.Equal(t, "1", truncatedFCMMessage.Data["truncated"]) + require.NotEqual(t, origMessageLength, truncatedMessageLength) +} + +func TestMaybeTruncateFCMMessage_NotTooLong(t *testing.T) { + origMessage := "not really a long string" + origFCMMessage := &messaging.Message{ + Topic: "mytopic", + Data: map[string]string{ + "id": "abcdefg", + "time": "1641324761", + "event": "message", + "topic": "mytopic", + "priority": "0", + "tags": "", + "title": "", + "message": origMessage, + }, + } + origMessageLength := len(origFCMMessage.Data["message"]) + serializedOrigFCMMessage, _ := json.Marshal(origFCMMessage) + require.LessOrEqual(t, len(serializedOrigFCMMessage), fcmMessageLimit) // Pre-condition + + notTruncatedFCMMessage := maybeTruncateFCMMessage(origFCMMessage) + notTruncatedMessageLength := len(notTruncatedFCMMessage.Data["message"]) + serializedNotTruncatedFCMMessage, _ := json.Marshal(notTruncatedFCMMessage) + require.Equal(t, origMessageLength, notTruncatedMessageLength) + require.Equal(t, len(serializedOrigFCMMessage), len(serializedNotTruncatedFCMMessage)) + require.Equal(t, "", notTruncatedFCMMessage.Data["truncated"]) +} From 91d40dcc91903ac76e2efa6e2732d19e4c085aeb Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Sat, 15 Jan 2022 23:52:21 -0500 Subject: [PATCH 4/6] Docs --- docs/config.md | 4 +++ docs/subscribe/api.md | 58 ++++++++++++++++++++++++++++++++++++++----- 2 files changed, 56 insertions(+), 6 deletions(-) diff --git a/docs/config.md b/docs/config.md index 70a48fd..5710540 100644 --- a/docs/config.md +++ b/docs/config.md @@ -243,6 +243,8 @@ or the root domain: proxy_redirect off; proxy_set_header Host $http_host; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_connect_timeout 3m; @@ -274,6 +276,8 @@ or the root domain: proxy_redirect off; proxy_set_header Host $http_host; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_connect_timeout 3m; diff --git a/docs/subscribe/api.md b/docs/subscribe/api.md index 70ef375..d16a3f9 100644 --- a/docs/subscribe/api.md +++ b/docs/subscribe/api.md @@ -3,7 +3,11 @@ You can create and subscribe to a topic in the [web UI](web.md), via the [phone or in your own app or script by subscribing the API. This page describes how to subscribe via API. You may also want to check out the page that describes how to [publish messages](../publish.md). -The subscription API relies on a simple HTTP GET request with a streaming HTTP response, i.e **you open a GET request and +You can consume the subscription API as either a **[simple HTTP stream (JSON, SSE or raw)](#http-stream)**, or +**[via WebSockets](#websockets)**. Both are incredibly simple to use. + +## HTTP stream +The HTTP stream-based API relies on a simple GET request with a streaming HTTP response, i.e **you open a GET request and the connection stays open forever**, sending messages back as they come in. There are three different API endpoints, which only differ in the response format: @@ -12,7 +16,7 @@ only differ in the response format: can be used with [EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) * [Raw stream](#subscribe-as-raw-stream): `/raw` returns messages as raw text, with one line per message -## Subscribe as JSON stream +### Subscribe as JSON stream Here are a few examples of how to consume the JSON endpoint (`/json`). For almost all languages, **this is the recommended way to subscribe to a topic**. The notable exception is JavaScript, for which the [SSE/EventSource stream](#subscribe-as-sse-stream) is much easier to work with. @@ -80,7 +84,7 @@ recommended way to subscribe to a topic**. The notable exception is JavaScript, fclose($fp); ``` -## Subscribe as SSE stream +### Subscribe as SSE stream Using [EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) in JavaScript, you can consume notifications via a [Server-Sent Events (SSE)](https://en.wikipedia.org/wiki/Server-sent_events) stream. It's incredibly easy to use. Here's what it looks like. You may also want to check out the [live example](/example.html). @@ -125,7 +129,7 @@ easy to use. Here's what it looks like. You may also want to check out the [live }; ``` -## Subscribe as raw stream +### Subscribe as raw stream The `/raw` endpoint will output one line per message, and **will only include the message body**. It's useful for extremely simple scripts, and doesn't include all the data. Additional fields such as [priority](../publish.md#message-priority), [tags](../publish.md#tags--emojis--) or [message title](../publish.md#message-title) are not included in this output @@ -184,8 +188,50 @@ format. Keepalive messages are sent as empty lines. fclose($fp); ``` -### Subscribe via WebSockets -xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx +## WebSockets +You may also subscribe to topics via [WebSockets](https://en.wikipedia.org/wiki/WebSocket), which is also widely +supported in many languages. Most notably, WebSockets are natively supported in JavaScript. On the command line, +I recommend [websocat](https://github.com/vi/websocat), a fantastic tool similar to `socat` or `curl`, but specifically +for WebSockets. + +The WebSockets endpoint is available at `/ws` and returns messages as JSON objects similar to the +[JSON stream endpoint](#subscribe-as-json-stream). + +=== "Command line (websocat)" + ``` + $ websocat wss://ntfy.sh/mytopic/ws + {"id":"qRHUCCvjj8","time":1642307388,"event":"open","topic":"mytopic"} + {"id":"eOWoUBJ14x","time":1642307754,"event":"message","topic":"mytopic","message":"hi there"} + ``` + +=== "HTTP" + ``` http + GET /disk-alerts/ws HTTP/1.1 + Host: ntfy.sh + Upgrade: websocket + Connection: Upgrade + + HTTP/1.1 101 Switching Protocols + Upgrade: websocket + Connection: Upgrade + ... + ``` + +=== "Go" + ``` go + import "github.com/gorilla/websocket" + ws, _, _ := websocket.DefaultDialer.Dial("wss://ntfy.sh/mytopic/ws", nil) + messageType, data, err := ws.ReadMessage() + ... + ``` + +=== "JavaScript" + ``` javascript + const socket = new WebSocket('wss://ntfy.sh/mytopic/ws'); + socket.addEventListener('message', function (event) { + console.log(event.data); + }); + ``` ## Advanced features From df4585af6b3abc7e473cf4620d60ee5703550190 Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Sat, 15 Jan 2022 23:53:40 -0500 Subject: [PATCH 5/6] Attachment comment --- client/client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/client.go b/client/client.go index c9cc4cd..575c285 100644 --- a/client/client.go +++ b/client/client.go @@ -53,6 +53,7 @@ type Message struct { // TODO combine with server.message Raw string } +// Attachment represents a message attachment type Attachment struct { Name string `json:"name"` Type string `json:"type,omitempty"` From 9b0e7eedb2e38ee2ac6f5a41d8d98a64304fd295 Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Sun, 16 Jan 2022 00:07:32 -0500 Subject: [PATCH 6/6] Fix data race --- server/server.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/server.go b/server/server.go index 06097a2..d31588e 100644 --- a/server/server.go +++ b/server/server.go @@ -741,6 +741,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi return err } defer conn.Close() + var wlock sync.Mutex g, ctx := errgroup.WithContext(context.Background()) g.Go(func() error { pongWait := s.config.KeepaliveInterval + wsPongWait @@ -760,6 +761,8 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi }) g.Go(func() error { ping := func() error { + wlock.Lock() + defer wlock.Unlock() if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil { return err } @@ -781,6 +784,8 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi if !filters.Pass(msg) { return nil } + wlock.Lock() + defer wlock.Unlock() if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil { return err }