From b6120cf6d725593db0a3755e39cf2e7165fdd2b0 Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Wed, 22 Dec 2021 09:44:16 +0100 Subject: [PATCH] Message filtering tests --- docs/config.md | 4 +- docs/publish.md | 8 +-- docs/subscribe/api.md | 155 ++++++++++++++++++++++++------------------ server/server.go | 41 ++++++----- server/server_test.go | 92 +++++++++++++++++++++++++ 5 files changed, 211 insertions(+), 89 deletions(-) diff --git a/docs/config.md b/docs/config.md index 3917825..3cd6cb4 100644 --- a/docs/config.md +++ b/docs/config.md @@ -32,8 +32,8 @@ You can also entirely disable the cache by setting `cache-duration` to `0`. When passed on to the connected subscribers, but never stored on disk or even kept in memory longer than is needed to forward the message to the subscribers. -Subscribers can retrieve cached messaging using the [`poll=1` parameter](subscribe/api.md#polling-for-messages), as well as the -[`since=` parameter](subscribe/api.md#fetching-cached-messages). +Subscribers can retrieve cached messaging using the [`poll=1` parameter](subscribe/api.md#poll-for-messages), as well as the +[`since=` parameter](subscribe/api.md#fetch-cached-messages). ## Behind a proxy (TLS, etc.) !!! warning diff --git a/docs/publish.md b/docs/publish.md index c7d01de..bf033db 100644 --- a/docs/publish.md +++ b/docs/publish.md @@ -606,8 +606,8 @@ client-side network disruptions, but arguably this feature also may raise privac To avoid messages being cached server-side entirely, you can set `X-Cache` header (or its alias: `Cache`) to `no`. This will make sure that your message is not cached on the server, even if server-side caching is enabled. Messages -are still delivered to connected subscribers, but [`since=`](subscribe/api.md#fetching-cached-messages) and -[`poll=1`](subscribe/api.md#polling-for-messages) won't return the message anymore. +are still delivered to connected subscribers, but [`since=`](subscribe/api.md#fetch-cached-messages) and +[`poll=1`](subscribe/api.md#poll-for-messages) won't return the message anymore. === "Command line (curl)" ``` @@ -752,14 +752,14 @@ but just in case, let's list them all: ## List of all parameters The following is a list of all parameters that can be passed when publishing a message. Parameter names are **case-insensitive**, -and can be passed as **HTTP headers** or **query parameters in the URL**. +and can be passed as **HTTP headers** or **query parameters in the URL**. They are listed in the table in their canonical form. | Parameter | Aliases (case-insensitive) | Description | |---|---|---| | `X-Message` | `Message`, `m` | Main body of the message as shown in the notification | | `X-Title` | `Title`, `t` | [Message title](#message-title) | | `X-Priority` | `Priority`, `prio`, `p` | [Message priority](#message-priority) | -| `X-Tags` | `Tags`, `ta` | [Tags and emojis](#tags-emojis) | +| `X-Tags` | `Tags`, `Tag`, `ta` | [Tags and emojis](#tags-emojis) | | `X-Delay` | `Delay`, `X-At`, `At`, `X-In`, `In` | Timestamp or duration for [delayed delivery](#scheduled-delivery) | | `X-Cache` | `Cache` | Allows disabling [message caching](#message-caching) | | `X-Firebase` | `Firebase` | Allows disabling [sending to Firebase](#disable-firebase) | diff --git a/docs/subscribe/api.md b/docs/subscribe/api.md index c11f37b..92c171d 100644 --- a/docs/subscribe/api.md +++ b/docs/subscribe/api.md @@ -184,6 +184,60 @@ format. Keepalive messages are sent as empty lines. fclose($fp); ``` +## Advanced features + +### Poll for messages +You can also just poll for messages if you don't like the long-standing connection using the `poll=1` +query parameter. The connection will end after all available messages have been read. This parameter can be +combined with `since=` (defaults to `since=all`). + +``` +curl -s "ntfy.sh/mytopic/json?poll=1" +``` + +### Fetch cached messages +Messages may be cached for a couple of hours (see [message caching](../config.md#message-cache)) to account for network +interruptions of subscribers. If the server has configured message caching, you can read back what you missed by using +the `since=` query parameter. It takes either a duration (e.g. `10m` or `30s`), a Unix timestamp (e.g. `1635528757`) +or `all` (all cached messages). + +``` +curl -s "ntfy.sh/mytopic/json?since=10m" +``` + +### Fetch scheduled messages +Messages that are [scheduled to be delivered](../publish.md#scheduled-delivery) at a later date are not typically +returned when subscribing via the API, which makes sense, because after all, the messages have technically not been +delivered yet. To also return scheduled messages from the API, you can use the `scheduled=1` (alias: `sched=1`) +parameter (makes most sense with the `poll=1` parameter): + +``` +curl -s "ntfy.sh/mytopic/json?poll=1&sched=1" +``` + +### Filter messages +You can filter which messages are returned based on the well-known message fields `message`, `title`, `priority` and +`tags`. Currently, only exact matches are supported. Here's an example that only returns messages of high priority +with the tag "zfs-error": + +``` +$ curl "ntfy.sh/alerts/json?priority=high&tags=zfs-error" +{"id":"0TIkJpBcxR","time":1640122627,"event":"open","topic":"alerts"} +{"id":"X3Uzz9O1sM","time":1640122674,"event":"message","topic":"alerts","priority":4,"tags":["zfs-error"], + "message":"ZFS pool corruption detected"} +``` + +### Subscribe to multiple topics +It's possible to subscribe to multiple topics in one HTTP call by providing a comma-separated list of topics +in the URL. This allows you to reduce the number of connections you have to maintain: + +``` +$ curl -s ntfy.sh/mytopic1,mytopic2/json +{"id":"0OkXIryH3H","time":1637182619,"event":"open","topic":"mytopic1,mytopic2,mytopic3"} +{"id":"dzJJm7BCWs","time":1637182634,"event":"message","topic":"mytopic1","message":"for topic 1"} +{"id":"Cm02DsxUHb","time":1637182643,"event":"message","topic":"mytopic2","message":"for topic 2"} +``` + ## JSON message format Both the [`/json` endpoint](#subscribe-as-json-stream) and the [`/sse` endpoint](#subscribe-as-sse-stream) return a JSON format of the message. It's very straight forward: @@ -204,17 +258,17 @@ Here's an example for each message type: === "Notification message" ``` json { - "id": "wze9zgqK41", - "time": 1638542110, - "event": "message", - "topic": "phil_alerts", - "priority": 5, - "tags": [ - "warning", - "skull" - ], - "title": "Unauthorized access detected", - "message": "Remote access to phils-laptop detected. Act right away." + "id": "wze9zgqK41", + "time": 1638542110, + "event": "message", + "topic": "phil_alerts", + "priority": 5, + "tags": [ + "warning", + "skull" + ], + "title": "Unauthorized access detected", + "message": "Remote access to phils-laptop detected. Act right away." } ``` @@ -222,72 +276,43 @@ Here's an example for each message type: === "Notification message (minimal)" ``` json { - "id": "wze9zgqK41", - "time": 1638542110, - "event": "message", - "topic": "phil_alerts", - "message": "Remote access to phils-laptop detected. Act right away." + "id": "wze9zgqK41", + "time": 1638542110, + "event": "message", + "topic": "phil_alerts", + "message": "Remote access to phils-laptop detected. Act right away." } ``` === "Open message" ``` json { - "id": "2pgIAaGrQ8", - "time": 1638542215, - "event": "open", - "topic": "phil_alerts" + "id": "2pgIAaGrQ8", + "time": 1638542215, + "event": "open", + "topic": "phil_alerts" } ``` -=== "Keepalive message" +=== "Keepalive message" ``` json { - "id": "371sevb0pD", - "time": 1638542275, - "event": "keepalive", - "topic": "phil_alerts" + "id": "371sevb0pD", + "time": 1638542275, + "event": "keepalive", + "topic": "phil_alerts" } ``` -## Advanced features +## List of all parameters +The following is a list of all parameters that can be passed when subscribing to a message. Parameter names are **case-insensitive**, +and can be passed as **HTTP headers** or **query parameters in the URL**. They are listed in the table in their canonical form. -### Fetching cached messages -Messages may be cached for a couple of hours (see [message caching](../config.md#message-cache)) to account for network -interruptions of subscribers. If the server has configured message caching, you can read back what you missed by using -the `since=` query parameter. It takes either a duration (e.g. `10m` or `30s`), a Unix timestamp (e.g. `1635528757`) -or `all` (all cached messages). - -``` -curl -s "ntfy.sh/mytopic/json?since=10m" -``` - -### Polling for messages -You can also just poll for messages if you don't like the long-standing connection using the `poll=1` -query parameter. The connection will end after all available messages have been read. This parameter can be -combined with `since=` (defaults to `since=all`). - -``` -curl -s "ntfy.sh/mytopic/json?poll=1" -``` - -### Fetching scheduled messages -Messages that are [scheduled to be delivered](../publish.md#scheduled-delivery) at a later date are not typically -returned when subscribing via the API, which makes sense, because after all, the messages have technically not been -delivered yet. To also return scheduled messages from the API, you can use the `scheduled=1` (alias: `sched=1`) -parameter (makes most sense with the `poll=1` parameter): - -``` -curl -s "ntfy.sh/mytopic/json?poll=1&sched=1" -``` - -### Subscribing to multiple topics -It's possible to subscribe to multiple topics in one HTTP call by providing a -comma-separated list of topics in the URL. This allows you to reduce the number of connections you have to maintain: - -``` -$ curl -s ntfy.sh/mytopic1,mytopic2/json -{"id":"0OkXIryH3H","time":1637182619,"event":"open","topic":"mytopic1,mytopic2,mytopic3"} -{"id":"dzJJm7BCWs","time":1637182634,"event":"message","topic":"mytopic1","message":"for topic 1"} -{"id":"Cm02DsxUHb","time":1637182643,"event":"message","topic":"mytopic2","message":"for topic 2"} -``` +| Parameter | Aliases (case-insensitive) | Description | +|---|---|---| +| `poll` | `X-Poll`, `po` | Return cached messages and close connection | +| `scheduled` | `X-Scheduled`, `sched` | Include scheduled/delayed messages in message list | +| `message` | `X-Message`, `m` | Filter: Only return messages that match this exact message string | +| `title` | `X-Title`, `t` | Filter: Only return messages that match this exact title string | +| `priority` | `X-Priority`, `prio`, `p` | Filter: Only return messages that match this priority | +| `tags` | `X-Tags`, `tag`, `ta` | Filter: Only return messages that all listed tags (comma-separated) | diff --git a/server/server.go b/server/server.go index e852c77..87d8f40 100644 --- a/server/server.go +++ b/server/server.go @@ -322,7 +322,7 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, _ *visito func (s *Server) parseParams(r *http.Request, m *message) (cache bool, firebase bool, err error) { cache = readParam(r, "x-cache", "cache") != "no" firebase = readParam(r, "x-firebase", "firebase") != "no" - m.Title = readParam(r, "x-title", "title", "ti", "t") + m.Title = readParam(r, "x-title", "title", "t") messageStr := readParam(r, "x-message", "message", "m") if messageStr != "" { m.Message = messageStr @@ -331,7 +331,7 @@ func (s *Server) parseParams(r *http.Request, m *message) (cache bool, firebase if err != nil { return false, false, errHTTPBadRequest } - tagsStr := readParam(r, "x-tags", "tag", "tags", "ta") + tagsStr := readParam(r, "x-tags", "tags", "tag", "ta") if tagsStr != "" { m.Tags = make([]string, 0) for _, s := range util.SplitNoEmpty(tagsStr, ",") { @@ -418,17 +418,17 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi if err != nil { return err } - since, err := parseSince(r) + poll := readParam(r, "x-poll", "poll", "po") == "1" + scheduled := readParam(r, "x-scheduled", "scheduled", "sched") == "1" + since, err := parseSince(r, poll) if err != nil { return err } - var wlock sync.Mutex - poll := r.URL.Query().Has("poll") - scheduled := r.URL.Query().Has("scheduled") || r.URL.Query().Has("sched") messageFilter, titleFilter, priorityFilter, tagsFilter, err := parseQueryFilters(r) if err != nil { return err } + var wlock sync.Mutex sub := func(msg *message) error { if !passesQueryFilter(msg, messageFilter, titleFilter, priorityFilter, tagsFilter) { return nil @@ -481,11 +481,11 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi } func parseQueryFilters(r *http.Request) (messageFilter string, titleFilter string, priorityFilter int, tagsFilter []string, err error) { - messageFilter = r.URL.Query().Get("message") - titleFilter = r.URL.Query().Get("title") - tagsFilter = util.SplitNoEmpty(r.URL.Query().Get("tags"), ",") - priorityFilter, err = util.ParsePriority(r.URL.Query().Get("priority")) - return + messageFilter = readParam(r, "x-message", "message", "m") + titleFilter = readParam(r, "x-title", "title", "t") + tagsFilter = util.SplitNoEmpty(readParam(r, "x-tags", "tags", "tag", "ta"), ",") + priorityFilter, err = util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p")) + return // may be err! } func passesQueryFilter(msg *message, messageFilter string, titleFilter string, priorityFilter int, tagsFilter []string) bool { @@ -498,7 +498,11 @@ func passesQueryFilter(msg *message, messageFilter string, titleFilter string, p if titleFilter != "" && msg.Title != titleFilter { return false } - if priorityFilter > 0 && (msg.Priority != priorityFilter || (msg.Priority == 0 && priorityFilter != 3)) { + messagePriority := msg.Priority + if messagePriority == 0 { + messagePriority = 3 // For query filters, default priority (3) is the same as "not set" (0) + } + if priorityFilter > 0 && messagePriority != priorityFilter { return false } if len(tagsFilter) > 0 && !util.InStringListAll(msg.Tags, tagsFilter) { @@ -529,18 +533,19 @@ func (s *Server) sendOldMessages(topics []*topic, since sinceTime, scheduled boo // // Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h), or // "all" for all messages. -func parseSince(r *http.Request) (sinceTime, error) { - if !r.URL.Query().Has("since") { - if r.URL.Query().Has("poll") { +func parseSince(r *http.Request, poll bool) (sinceTime, error) { + since := readParam(r, "x-since", "since", "si") + if since == "" { + if poll { return sinceAllMessages, nil } return sinceNoMessages, nil } - if r.URL.Query().Get("since") == "all" { + if since == "all" { return sinceAllMessages, nil - } else if s, err := strconv.ParseInt(r.URL.Query().Get("since"), 10, 64); err == nil { + } else if s, err := strconv.ParseInt(since, 10, 64); err == nil { return sinceTime(time.Unix(s, 0)), nil - } else if d, err := time.ParseDuration(r.URL.Query().Get("since")); err == nil { + } else if d, err := time.ParseDuration(since); err == nil { return sinceTime(time.Now().Add(-1 * d)), nil } return sinceNoMessages, errHTTPBadRequest diff --git a/server/server_test.go b/server/server_test.go index 3abeb47..a1c995d 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -392,6 +392,98 @@ func TestServer_PublishFirebase(t *testing.T) { time.Sleep(500 * time.Millisecond) // Time for sends } +func TestServer_PollWithQueryFilters(t *testing.T) { + s := newTestServer(t, newTestConfig(t)) + + response := request(t, s, "PUT", "/mytopic?priority=1&tags=tag1,tag2", "my first message", nil) + msg := toMessage(t, response.Body.String()) + require.NotEmpty(t, msg.ID) + + response = request(t, s, "PUT", "/mytopic?title=a+title", "my second message", map[string]string{ + "Tags": "tag2,tag3", + }) + msg = toMessage(t, response.Body.String()) + require.NotEmpty(t, msg.ID) + + queriesThatShouldReturnMessageOne := []string{ + "/mytopic/json?poll=1&priority=1", + "/mytopic/json?poll=1&priority=min", + "/mytopic/json?poll=1&tags=tag1", + "/mytopic/json?poll=1&tags=tag1,tag2", + "/mytopic/json?poll=1&message=my+first+message", + } + for _, query := range queriesThatShouldReturnMessageOne { + response = request(t, s, "GET", query, "", nil) + messages := toMessages(t, response.Body.String()) + require.Equal(t, 1, len(messages), "Query failed: "+query) + require.Equal(t, "my first message", messages[0].Message, "Query failed: "+query) + } + + queriesThatShouldReturnMessageTwo := []string{ + "/mytopic/json?poll=1&x-priority=3", // ! + "/mytopic/json?poll=1&priority=3", + "/mytopic/json?poll=1&priority=default", + "/mytopic/json?poll=1&p=3", + "/mytopic/json?poll=1&x-tags=tag2,tag3", + "/mytopic/json?poll=1&tags=tag2,tag3", + "/mytopic/json?poll=1&tag=tag2,tag3", + "/mytopic/json?poll=1&ta=tag2,tag3", + "/mytopic/json?poll=1&x-title=a+title", + "/mytopic/json?poll=1&title=a+title", + "/mytopic/json?poll=1&t=a+title", + "/mytopic/json?poll=1&x-message=my+second+message", + "/mytopic/json?poll=1&message=my+second+message", + "/mytopic/json?poll=1&m=my+second+message", + "/mytopic/json?x-poll=1&m=my+second+message", + "/mytopic/json?po=1&m=my+second+message", + } + for _, query := range queriesThatShouldReturnMessageTwo { + response = request(t, s, "GET", query, "", nil) + messages := toMessages(t, response.Body.String()) + require.Equal(t, 1, len(messages), "Query failed: "+query) + require.Equal(t, "my second message", messages[0].Message, "Query failed: "+query) + } + + queriesThatShouldReturnNoMessages := []string{ + "/mytopic/json?poll=1&priority=4", + "/mytopic/json?poll=1&tags=tag1,tag2,tag3", + "/mytopic/json?poll=1&title=another+title", + "/mytopic/json?poll=1&message=my+third+message", + "/mytopic/json?poll=1&message=my+third+message", + } + for _, query := range queriesThatShouldReturnNoMessages { + response = request(t, s, "GET", query, "", nil) + messages := toMessages(t, response.Body.String()) + require.Equal(t, 0, len(messages), "Query failed: "+query) + } +} + +func TestServer_SubscribeWithQueryFilters(t *testing.T) { + c := newTestConfig(t) + c.KeepaliveInterval = 800 * time.Millisecond + s := newTestServer(t, c) + + subscribeResponse := httptest.NewRecorder() + subscribeCancel := subscribe(t, s, "/mytopic/json?tags=zfs-issue", subscribeResponse) + + response := request(t, s, "PUT", "/mytopic", "my first message", nil) + require.Equal(t, 200, response.Code) + response = request(t, s, "PUT", "/mytopic", "ZFS scrub failed", map[string]string{ + "Tags": "zfs-issue,zfs-scrub", + }) + require.Equal(t, 200, response.Code) + + time.Sleep(850 * time.Millisecond) + subscribeCancel() + + messages := toMessages(t, subscribeResponse.Body.String()) + require.Equal(t, 3, len(messages)) + require.Equal(t, openEvent, messages[0].Event) + require.Equal(t, messageEvent, messages[1].Event) + require.Equal(t, "ZFS scrub failed", messages[1].Message) + require.Equal(t, keepaliveEvent, messages[2].Event) +} + func newTestConfig(t *testing.T) *Config { conf := NewConfig(":80") conf.CacheFile = filepath.Join(t.TempDir(), "cache.db")