Message filtering tests

This commit is contained in:
Philipp Heckel 2021-12-22 09:44:16 +01:00
parent 09bf13bd70
commit b6120cf6d7
5 changed files with 211 additions and 89 deletions

View file

@ -32,8 +32,8 @@ You can also entirely disable the cache by setting `cache-duration` to `0`. When
passed on to the connected subscribers, but never stored on disk or even kept in memory longer than is needed to forward passed on to the connected subscribers, but never stored on disk or even kept in memory longer than is needed to forward
the message to the subscribers. the message to the subscribers.
Subscribers can retrieve cached messaging using the [`poll=1` parameter](subscribe/api.md#polling-for-messages), as well as the Subscribers can retrieve cached messaging using the [`poll=1` parameter](subscribe/api.md#poll-for-messages), as well as the
[`since=` parameter](subscribe/api.md#fetching-cached-messages). [`since=` parameter](subscribe/api.md#fetch-cached-messages).
## Behind a proxy (TLS, etc.) ## Behind a proxy (TLS, etc.)
!!! warning !!! warning

View file

@ -606,8 +606,8 @@ client-side network disruptions, but arguably this feature also may raise privac
To avoid messages being cached server-side entirely, you can set `X-Cache` header (or its alias: `Cache`) to `no`. To avoid messages being cached server-side entirely, you can set `X-Cache` header (or its alias: `Cache`) to `no`.
This will make sure that your message is not cached on the server, even if server-side caching is enabled. Messages This will make sure that your message is not cached on the server, even if server-side caching is enabled. Messages
are still delivered to connected subscribers, but [`since=`](subscribe/api.md#fetching-cached-messages) and are still delivered to connected subscribers, but [`since=`](subscribe/api.md#fetch-cached-messages) and
[`poll=1`](subscribe/api.md#polling-for-messages) won't return the message anymore. [`poll=1`](subscribe/api.md#poll-for-messages) won't return the message anymore.
=== "Command line (curl)" === "Command line (curl)"
``` ```
@ -752,14 +752,14 @@ but just in case, let's list them all:
## List of all parameters ## List of all parameters
The following is a list of all parameters that can be passed when publishing a message. Parameter names are **case-insensitive**, The following is a list of all parameters that can be passed when publishing a message. Parameter names are **case-insensitive**,
and can be passed as **HTTP headers** or **query parameters in the URL**. and can be passed as **HTTP headers** or **query parameters in the URL**. They are listed in the table in their canonical form.
| Parameter | Aliases (case-insensitive) | Description | | Parameter | Aliases (case-insensitive) | Description |
|---|---|---| |---|---|---|
| `X-Message` | `Message`, `m` | Main body of the message as shown in the notification | | `X-Message` | `Message`, `m` | Main body of the message as shown in the notification |
| `X-Title` | `Title`, `t` | [Message title](#message-title) | | `X-Title` | `Title`, `t` | [Message title](#message-title) |
| `X-Priority` | `Priority`, `prio`, `p` | [Message priority](#message-priority) | | `X-Priority` | `Priority`, `prio`, `p` | [Message priority](#message-priority) |
| `X-Tags` | `Tags`, `ta` | [Tags and emojis](#tags-emojis) | | `X-Tags` | `Tags`, `Tag`, `ta` | [Tags and emojis](#tags-emojis) |
| `X-Delay` | `Delay`, `X-At`, `At`, `X-In`, `In` | Timestamp or duration for [delayed delivery](#scheduled-delivery) | | `X-Delay` | `Delay`, `X-At`, `At`, `X-In`, `In` | Timestamp or duration for [delayed delivery](#scheduled-delivery) |
| `X-Cache` | `Cache` | Allows disabling [message caching](#message-caching) | | `X-Cache` | `Cache` | Allows disabling [message caching](#message-caching) |
| `X-Firebase` | `Firebase` | Allows disabling [sending to Firebase](#disable-firebase) | | `X-Firebase` | `Firebase` | Allows disabling [sending to Firebase](#disable-firebase) |

View file

@ -184,6 +184,60 @@ format. Keepalive messages are sent as empty lines.
fclose($fp); fclose($fp);
``` ```
## Advanced features
### Poll for messages
You can also just poll for messages if you don't like the long-standing connection using the `poll=1`
query parameter. The connection will end after all available messages have been read. This parameter can be
combined with `since=` (defaults to `since=all`).
```
curl -s "ntfy.sh/mytopic/json?poll=1"
```
### Fetch cached messages
Messages may be cached for a couple of hours (see [message caching](../config.md#message-cache)) to account for network
interruptions of subscribers. If the server has configured message caching, you can read back what you missed by using
the `since=` query parameter. It takes either a duration (e.g. `10m` or `30s`), a Unix timestamp (e.g. `1635528757`)
or `all` (all cached messages).
```
curl -s "ntfy.sh/mytopic/json?since=10m"
```
### Fetch scheduled messages
Messages that are [scheduled to be delivered](../publish.md#scheduled-delivery) at a later date are not typically
returned when subscribing via the API, which makes sense, because after all, the messages have technically not been
delivered yet. To also return scheduled messages from the API, you can use the `scheduled=1` (alias: `sched=1`)
parameter (makes most sense with the `poll=1` parameter):
```
curl -s "ntfy.sh/mytopic/json?poll=1&sched=1"
```
### Filter messages
You can filter which messages are returned based on the well-known message fields `message`, `title`, `priority` and
`tags`. Currently, only exact matches are supported. Here's an example that only returns messages of high priority
with the tag "zfs-error":
```
$ curl "ntfy.sh/alerts/json?priority=high&tags=zfs-error"
{"id":"0TIkJpBcxR","time":1640122627,"event":"open","topic":"alerts"}
{"id":"X3Uzz9O1sM","time":1640122674,"event":"message","topic":"alerts","priority":4,"tags":["zfs-error"],
"message":"ZFS pool corruption detected"}
```
### Subscribe to multiple topics
It's possible to subscribe to multiple topics in one HTTP call by providing a comma-separated list of topics
in the URL. This allows you to reduce the number of connections you have to maintain:
```
$ curl -s ntfy.sh/mytopic1,mytopic2/json
{"id":"0OkXIryH3H","time":1637182619,"event":"open","topic":"mytopic1,mytopic2,mytopic3"}
{"id":"dzJJm7BCWs","time":1637182634,"event":"message","topic":"mytopic1","message":"for topic 1"}
{"id":"Cm02DsxUHb","time":1637182643,"event":"message","topic":"mytopic2","message":"for topic 2"}
```
## JSON message format ## JSON message format
Both the [`/json` endpoint](#subscribe-as-json-stream) and the [`/sse` endpoint](#subscribe-as-sse-stream) return a JSON Both the [`/json` endpoint](#subscribe-as-json-stream) and the [`/sse` endpoint](#subscribe-as-sse-stream) return a JSON
format of the message. It's very straight forward: format of the message. It's very straight forward:
@ -250,44 +304,15 @@ Here's an example for each message type:
} }
``` ```
## Advanced features ## List of all parameters
The following is a list of all parameters that can be passed when subscribing to a message. Parameter names are **case-insensitive**,
and can be passed as **HTTP headers** or **query parameters in the URL**. They are listed in the table in their canonical form.
### Fetching cached messages | Parameter | Aliases (case-insensitive) | Description |
Messages may be cached for a couple of hours (see [message caching](../config.md#message-cache)) to account for network |---|---|---|
interruptions of subscribers. If the server has configured message caching, you can read back what you missed by using | `poll` | `X-Poll`, `po` | Return cached messages and close connection |
the `since=` query parameter. It takes either a duration (e.g. `10m` or `30s`), a Unix timestamp (e.g. `1635528757`) | `scheduled` | `X-Scheduled`, `sched` | Include scheduled/delayed messages in message list |
or `all` (all cached messages). | `message` | `X-Message`, `m` | Filter: Only return messages that match this exact message string |
| `title` | `X-Title`, `t` | Filter: Only return messages that match this exact title string |
``` | `priority` | `X-Priority`, `prio`, `p` | Filter: Only return messages that match this priority |
curl -s "ntfy.sh/mytopic/json?since=10m" | `tags` | `X-Tags`, `tag`, `ta` | Filter: Only return messages that all listed tags (comma-separated) |
```
### Polling for messages
You can also just poll for messages if you don't like the long-standing connection using the `poll=1`
query parameter. The connection will end after all available messages have been read. This parameter can be
combined with `since=` (defaults to `since=all`).
```
curl -s "ntfy.sh/mytopic/json?poll=1"
```
### Fetching scheduled messages
Messages that are [scheduled to be delivered](../publish.md#scheduled-delivery) at a later date are not typically
returned when subscribing via the API, which makes sense, because after all, the messages have technically not been
delivered yet. To also return scheduled messages from the API, you can use the `scheduled=1` (alias: `sched=1`)
parameter (makes most sense with the `poll=1` parameter):
```
curl -s "ntfy.sh/mytopic/json?poll=1&sched=1"
```
### Subscribing to multiple topics
It's possible to subscribe to multiple topics in one HTTP call by providing a
comma-separated list of topics in the URL. This allows you to reduce the number of connections you have to maintain:
```
$ curl -s ntfy.sh/mytopic1,mytopic2/json
{"id":"0OkXIryH3H","time":1637182619,"event":"open","topic":"mytopic1,mytopic2,mytopic3"}
{"id":"dzJJm7BCWs","time":1637182634,"event":"message","topic":"mytopic1","message":"for topic 1"}
{"id":"Cm02DsxUHb","time":1637182643,"event":"message","topic":"mytopic2","message":"for topic 2"}
```

View file

@ -322,7 +322,7 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, _ *visito
func (s *Server) parseParams(r *http.Request, m *message) (cache bool, firebase bool, err error) { func (s *Server) parseParams(r *http.Request, m *message) (cache bool, firebase bool, err error) {
cache = readParam(r, "x-cache", "cache") != "no" cache = readParam(r, "x-cache", "cache") != "no"
firebase = readParam(r, "x-firebase", "firebase") != "no" firebase = readParam(r, "x-firebase", "firebase") != "no"
m.Title = readParam(r, "x-title", "title", "ti", "t") m.Title = readParam(r, "x-title", "title", "t")
messageStr := readParam(r, "x-message", "message", "m") messageStr := readParam(r, "x-message", "message", "m")
if messageStr != "" { if messageStr != "" {
m.Message = messageStr m.Message = messageStr
@ -331,7 +331,7 @@ func (s *Server) parseParams(r *http.Request, m *message) (cache bool, firebase
if err != nil { if err != nil {
return false, false, errHTTPBadRequest return false, false, errHTTPBadRequest
} }
tagsStr := readParam(r, "x-tags", "tag", "tags", "ta") tagsStr := readParam(r, "x-tags", "tags", "tag", "ta")
if tagsStr != "" { if tagsStr != "" {
m.Tags = make([]string, 0) m.Tags = make([]string, 0)
for _, s := range util.SplitNoEmpty(tagsStr, ",") { for _, s := range util.SplitNoEmpty(tagsStr, ",") {
@ -418,17 +418,17 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi
if err != nil { if err != nil {
return err return err
} }
since, err := parseSince(r) poll := readParam(r, "x-poll", "poll", "po") == "1"
scheduled := readParam(r, "x-scheduled", "scheduled", "sched") == "1"
since, err := parseSince(r, poll)
if err != nil { if err != nil {
return err return err
} }
var wlock sync.Mutex
poll := r.URL.Query().Has("poll")
scheduled := r.URL.Query().Has("scheduled") || r.URL.Query().Has("sched")
messageFilter, titleFilter, priorityFilter, tagsFilter, err := parseQueryFilters(r) messageFilter, titleFilter, priorityFilter, tagsFilter, err := parseQueryFilters(r)
if err != nil { if err != nil {
return err return err
} }
var wlock sync.Mutex
sub := func(msg *message) error { sub := func(msg *message) error {
if !passesQueryFilter(msg, messageFilter, titleFilter, priorityFilter, tagsFilter) { if !passesQueryFilter(msg, messageFilter, titleFilter, priorityFilter, tagsFilter) {
return nil return nil
@ -481,11 +481,11 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi
} }
func parseQueryFilters(r *http.Request) (messageFilter string, titleFilter string, priorityFilter int, tagsFilter []string, err error) { func parseQueryFilters(r *http.Request) (messageFilter string, titleFilter string, priorityFilter int, tagsFilter []string, err error) {
messageFilter = r.URL.Query().Get("message") messageFilter = readParam(r, "x-message", "message", "m")
titleFilter = r.URL.Query().Get("title") titleFilter = readParam(r, "x-title", "title", "t")
tagsFilter = util.SplitNoEmpty(r.URL.Query().Get("tags"), ",") tagsFilter = util.SplitNoEmpty(readParam(r, "x-tags", "tags", "tag", "ta"), ",")
priorityFilter, err = util.ParsePriority(r.URL.Query().Get("priority")) priorityFilter, err = util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p"))
return return // may be err!
} }
func passesQueryFilter(msg *message, messageFilter string, titleFilter string, priorityFilter int, tagsFilter []string) bool { func passesQueryFilter(msg *message, messageFilter string, titleFilter string, priorityFilter int, tagsFilter []string) bool {
@ -498,7 +498,11 @@ func passesQueryFilter(msg *message, messageFilter string, titleFilter string, p
if titleFilter != "" && msg.Title != titleFilter { if titleFilter != "" && msg.Title != titleFilter {
return false return false
} }
if priorityFilter > 0 && (msg.Priority != priorityFilter || (msg.Priority == 0 && priorityFilter != 3)) { messagePriority := msg.Priority
if messagePriority == 0 {
messagePriority = 3 // For query filters, default priority (3) is the same as "not set" (0)
}
if priorityFilter > 0 && messagePriority != priorityFilter {
return false return false
} }
if len(tagsFilter) > 0 && !util.InStringListAll(msg.Tags, tagsFilter) { if len(tagsFilter) > 0 && !util.InStringListAll(msg.Tags, tagsFilter) {
@ -529,18 +533,19 @@ func (s *Server) sendOldMessages(topics []*topic, since sinceTime, scheduled boo
// //
// Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h), or // Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h), or
// "all" for all messages. // "all" for all messages.
func parseSince(r *http.Request) (sinceTime, error) { func parseSince(r *http.Request, poll bool) (sinceTime, error) {
if !r.URL.Query().Has("since") { since := readParam(r, "x-since", "since", "si")
if r.URL.Query().Has("poll") { if since == "" {
if poll {
return sinceAllMessages, nil return sinceAllMessages, nil
} }
return sinceNoMessages, nil return sinceNoMessages, nil
} }
if r.URL.Query().Get("since") == "all" { if since == "all" {
return sinceAllMessages, nil return sinceAllMessages, nil
} else if s, err := strconv.ParseInt(r.URL.Query().Get("since"), 10, 64); err == nil { } else if s, err := strconv.ParseInt(since, 10, 64); err == nil {
return sinceTime(time.Unix(s, 0)), nil return sinceTime(time.Unix(s, 0)), nil
} else if d, err := time.ParseDuration(r.URL.Query().Get("since")); err == nil { } else if d, err := time.ParseDuration(since); err == nil {
return sinceTime(time.Now().Add(-1 * d)), nil return sinceTime(time.Now().Add(-1 * d)), nil
} }
return sinceNoMessages, errHTTPBadRequest return sinceNoMessages, errHTTPBadRequest

View file

@ -392,6 +392,98 @@ func TestServer_PublishFirebase(t *testing.T) {
time.Sleep(500 * time.Millisecond) // Time for sends time.Sleep(500 * time.Millisecond) // Time for sends
} }
func TestServer_PollWithQueryFilters(t *testing.T) {
s := newTestServer(t, newTestConfig(t))
response := request(t, s, "PUT", "/mytopic?priority=1&tags=tag1,tag2", "my first message", nil)
msg := toMessage(t, response.Body.String())
require.NotEmpty(t, msg.ID)
response = request(t, s, "PUT", "/mytopic?title=a+title", "my second message", map[string]string{
"Tags": "tag2,tag3",
})
msg = toMessage(t, response.Body.String())
require.NotEmpty(t, msg.ID)
queriesThatShouldReturnMessageOne := []string{
"/mytopic/json?poll=1&priority=1",
"/mytopic/json?poll=1&priority=min",
"/mytopic/json?poll=1&tags=tag1",
"/mytopic/json?poll=1&tags=tag1,tag2",
"/mytopic/json?poll=1&message=my+first+message",
}
for _, query := range queriesThatShouldReturnMessageOne {
response = request(t, s, "GET", query, "", nil)
messages := toMessages(t, response.Body.String())
require.Equal(t, 1, len(messages), "Query failed: "+query)
require.Equal(t, "my first message", messages[0].Message, "Query failed: "+query)
}
queriesThatShouldReturnMessageTwo := []string{
"/mytopic/json?poll=1&x-priority=3", // !
"/mytopic/json?poll=1&priority=3",
"/mytopic/json?poll=1&priority=default",
"/mytopic/json?poll=1&p=3",
"/mytopic/json?poll=1&x-tags=tag2,tag3",
"/mytopic/json?poll=1&tags=tag2,tag3",
"/mytopic/json?poll=1&tag=tag2,tag3",
"/mytopic/json?poll=1&ta=tag2,tag3",
"/mytopic/json?poll=1&x-title=a+title",
"/mytopic/json?poll=1&title=a+title",
"/mytopic/json?poll=1&t=a+title",
"/mytopic/json?poll=1&x-message=my+second+message",
"/mytopic/json?poll=1&message=my+second+message",
"/mytopic/json?poll=1&m=my+second+message",
"/mytopic/json?x-poll=1&m=my+second+message",
"/mytopic/json?po=1&m=my+second+message",
}
for _, query := range queriesThatShouldReturnMessageTwo {
response = request(t, s, "GET", query, "", nil)
messages := toMessages(t, response.Body.String())
require.Equal(t, 1, len(messages), "Query failed: "+query)
require.Equal(t, "my second message", messages[0].Message, "Query failed: "+query)
}
queriesThatShouldReturnNoMessages := []string{
"/mytopic/json?poll=1&priority=4",
"/mytopic/json?poll=1&tags=tag1,tag2,tag3",
"/mytopic/json?poll=1&title=another+title",
"/mytopic/json?poll=1&message=my+third+message",
"/mytopic/json?poll=1&message=my+third+message",
}
for _, query := range queriesThatShouldReturnNoMessages {
response = request(t, s, "GET", query, "", nil)
messages := toMessages(t, response.Body.String())
require.Equal(t, 0, len(messages), "Query failed: "+query)
}
}
func TestServer_SubscribeWithQueryFilters(t *testing.T) {
c := newTestConfig(t)
c.KeepaliveInterval = 800 * time.Millisecond
s := newTestServer(t, c)
subscribeResponse := httptest.NewRecorder()
subscribeCancel := subscribe(t, s, "/mytopic/json?tags=zfs-issue", subscribeResponse)
response := request(t, s, "PUT", "/mytopic", "my first message", nil)
require.Equal(t, 200, response.Code)
response = request(t, s, "PUT", "/mytopic", "ZFS scrub failed", map[string]string{
"Tags": "zfs-issue,zfs-scrub",
})
require.Equal(t, 200, response.Code)
time.Sleep(850 * time.Millisecond)
subscribeCancel()
messages := toMessages(t, subscribeResponse.Body.String())
require.Equal(t, 3, len(messages))
require.Equal(t, openEvent, messages[0].Event)
require.Equal(t, messageEvent, messages[1].Event)
require.Equal(t, "ZFS scrub failed", messages[1].Message)
require.Equal(t, keepaliveEvent, messages[2].Event)
}
func newTestConfig(t *testing.T) *Config { func newTestConfig(t *testing.T) *Config {
conf := NewConfig(":80") conf := NewConfig(":80")
conf.CacheFile = filepath.Join(t.TempDir(), "cache.db") conf.CacheFile = filepath.Join(t.TempDir(), "cache.db")