From 196c86d12bd424fb2360a4c33fa86526f2862c8f Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Fri, 10 Dec 2021 11:31:42 -0500 Subject: [PATCH 1/5] WIP_ Add 'At:'/'Delay:' headers to support scheduled messages --- config/config.go | 3 + server/cache.go | 4 +- server/cache_mem.go | 115 ++++++++++++++++++++----------- server/cache_mem_test.go | 6 +- server/cache_sqlite.go | 130 ++++++++++++++++++++++++++---------- server/cache_sqlite_test.go | 12 ++-- server/cache_test.go | 40 +++++++++-- server/server.go | 113 ++++++++++++++++++++++++------- 8 files changed, 311 insertions(+), 112 deletions(-) diff --git a/config/config.go b/config/config.go index 2dbed00..7fcea45 100644 --- a/config/config.go +++ b/config/config.go @@ -11,6 +11,7 @@ const ( DefaultCacheDuration = 12 * time.Hour DefaultKeepaliveInterval = 30 * time.Second DefaultManagerInterval = time.Minute + DefaultAtSenderInterval = 10 * time.Second ) // Defines all the limits @@ -35,6 +36,7 @@ type Config struct { CacheDuration time.Duration KeepaliveInterval time.Duration ManagerInterval time.Duration + AtSenderInterval time.Duration GlobalTopicLimit int VisitorRequestLimitBurst int VisitorRequestLimitReplenish time.Duration @@ -54,6 +56,7 @@ func New(listenHTTP string) *Config { CacheDuration: DefaultCacheDuration, KeepaliveInterval: DefaultKeepaliveInterval, ManagerInterval: DefaultManagerInterval, + AtSenderInterval: DefaultAtSenderInterval, GlobalTopicLimit: DefaultGlobalTopicLimit, VisitorRequestLimitBurst: DefaultVisitorRequestLimitBurst, VisitorRequestLimitReplenish: DefaultVisitorRequestLimitReplenish, diff --git a/server/cache.go b/server/cache.go index b355791..64d517d 100644 --- a/server/cache.go +++ b/server/cache.go @@ -14,8 +14,10 @@ var ( // i.e. message structs with the Event messageEvent. type cache interface { AddMessage(m *message) error - Messages(topic string, since sinceTime) ([]*message, error) + Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) + MessagesDue() ([]*message, error) MessageCount(topic string) (int, error) Topics() (map[string]*topic, error) Prune(olderThan time.Time) error + MarkPublished(m *message) error } diff --git a/server/cache_mem.go b/server/cache_mem.go index 9272ebd..31c7bb9 100644 --- a/server/cache_mem.go +++ b/server/cache_mem.go @@ -1,14 +1,16 @@ package server import ( + "sort" "sync" "time" ) type memCache struct { - messages map[string][]*message - nop bool - mu sync.Mutex + messages map[string][]*message + scheduled map[string]*message // Message ID -> message + nop bool + mu sync.Mutex } var _ cache = (*memCache)(nil) @@ -16,8 +18,9 @@ var _ cache = (*memCache)(nil) // newMemCache creates an in-memory cache func newMemCache() *memCache { return &memCache{ - messages: make(map[string][]*message), - nop: false, + messages: make(map[string][]*message), + scheduled: make(map[string]*message), + nop: false, } } @@ -25,77 +28,109 @@ func newMemCache() *memCache { // it is always empty and can be used if caching is entirely disabled func newNopCache() *memCache { return &memCache{ - messages: make(map[string][]*message), - nop: true, + messages: make(map[string][]*message), + scheduled: make(map[string]*message), + nop: true, } } -func (s *memCache) AddMessage(m *message) error { - s.mu.Lock() - defer s.mu.Unlock() - if s.nop { +func (c *memCache) AddMessage(m *message) error { + c.mu.Lock() + defer c.mu.Unlock() + if c.nop { return nil } if m.Event != messageEvent { return errUnexpectedMessageType } - if _, ok := s.messages[m.Topic]; !ok { - s.messages[m.Topic] = make([]*message, 0) + if _, ok := c.messages[m.Topic]; !ok { + c.messages[m.Topic] = make([]*message, 0) } - s.messages[m.Topic] = append(s.messages[m.Topic], m) + delayed := m.Time > time.Now().Unix() + if delayed { + c.scheduled[m.ID] = m + } + c.messages[m.Topic] = append(c.messages[m.Topic], m) return nil } -func (s *memCache) Messages(topic string, since sinceTime) ([]*message, error) { - s.mu.Lock() - defer s.mu.Unlock() - if _, ok := s.messages[topic]; !ok || since.IsNone() { +func (c *memCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) { + c.mu.Lock() + defer c.mu.Unlock() + if _, ok := c.messages[topic]; !ok || since.IsNone() { return make([]*message, 0), nil } - messages := make([]*message, 0) // copy! - for _, m := range s.messages[topic] { - msgTime := time.Unix(m.Time, 0) - if msgTime == since.Time() || msgTime.After(since.Time()) { + messages := make([]*message, 0) + for _, m := range c.messages[topic] { + _, messageScheduled := c.scheduled[m.ID] + include := m.Time >= since.Time().Unix() && (!messageScheduled || scheduled) + if include { messages = append(messages, m) } } + sort.Slice(messages, func(i, j int) bool { + return messages[i].Time < messages[j].Time + }) return messages, nil } -func (s *memCache) MessageCount(topic string) (int, error) { - s.mu.Lock() - defer s.mu.Unlock() - if _, ok := s.messages[topic]; !ok { - return 0, nil +func (c *memCache) MessagesDue() ([]*message, error) { + c.mu.Lock() + defer c.mu.Unlock() + messages := make([]*message, 0) + for _, m := range c.scheduled { + due := time.Now().Unix() >= m.Time + if due { + messages = append(messages, m) + } } - return len(s.messages[topic]), nil + sort.Slice(messages, func(i, j int) bool { + return messages[i].Time < messages[j].Time + }) + return messages, nil } -func (s *memCache) Topics() (map[string]*topic, error) { - s.mu.Lock() - defer s.mu.Unlock() +func (c *memCache) MarkPublished(m *message) error { + c.mu.Lock() + delete(c.scheduled, m.ID) + c.mu.Unlock() + return nil +} + +func (c *memCache) MessageCount(topic string) (int, error) { + c.mu.Lock() + defer c.mu.Unlock() + if _, ok := c.messages[topic]; !ok { + return 0, nil + } + return len(c.messages[topic]), nil +} + +func (c *memCache) Topics() (map[string]*topic, error) { + c.mu.Lock() + defer c.mu.Unlock() topics := make(map[string]*topic) - for topic := range s.messages { + for topic := range c.messages { topics[topic] = newTopic(topic) } return topics, nil } -func (s *memCache) Prune(olderThan time.Time) error { - s.mu.Lock() - defer s.mu.Unlock() - for topic := range s.messages { - s.pruneTopic(topic, olderThan) +func (c *memCache) Prune(olderThan time.Time) error { + c.mu.Lock() + defer c.mu.Unlock() + for topic := range c.messages { + c.pruneTopic(topic, olderThan) } return nil } -func (s *memCache) pruneTopic(topic string, olderThan time.Time) { +func (c *memCache) pruneTopic(topic string, olderThan time.Time) { messages := make([]*message, 0) - for _, m := range s.messages[topic] { + for _, m := range c.messages[topic] { if m.Time >= olderThan.Unix() { messages = append(messages, m) } } - s.messages[topic] = messages + c.messages[topic] = messages } diff --git a/server/cache_mem_test.go b/server/cache_mem_test.go index a1c854d..831703a 100644 --- a/server/cache_mem_test.go +++ b/server/cache_mem_test.go @@ -9,6 +9,10 @@ func TestMemCache_Messages(t *testing.T) { testCacheMessages(t, newMemCache()) } +func TestMemCache_MessagesScheduled(t *testing.T) { + testCacheMessagesScheduled(t, newMemCache()) +} + func TestMemCache_Topics(t *testing.T) { testCacheTopics(t, newMemCache()) } @@ -25,7 +29,7 @@ func TestMemCache_NopCache(t *testing.T) { c := newNopCache() assert.Nil(t, c.AddMessage(newDefaultMessage("mytopic", "my message"))) - messages, err := c.Messages("mytopic", sinceAllMessages) + messages, err := c.Messages("mytopic", sinceAllMessages, false) assert.Nil(t, err) assert.Empty(t, messages) diff --git a/server/cache_sqlite.go b/server/cache_sqlite.go index 3c3564d..19eddee 100644 --- a/server/cache_sqlite.go +++ b/server/cache_sqlite.go @@ -21,19 +21,32 @@ const ( message VARCHAR(512) NOT NULL, title VARCHAR(256) NOT NULL, priority INT NOT NULL, - tags VARCHAR(256) NOT NULL + tags VARCHAR(256) NOT NULL, + published INT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic); COMMIT; ` - insertMessageQuery = `INSERT INTO messages (id, time, topic, message, title, priority, tags) VALUES (?, ?, ?, ?, ?, ?, ?)` + insertMessageQuery = `INSERT INTO messages (id, time, topic, message, title, priority, tags, published) VALUES (?, ?, ?, ?, ?, ?, ?, ?)` pruneMessagesQuery = `DELETE FROM messages WHERE time < ?` selectMessagesSinceTimeQuery = ` - SELECT id, time, message, title, priority, tags + SELECT id, time, topic, message, title, priority, tags + FROM messages + WHERE topic = ? AND time >= ? AND published = 1 + ORDER BY time ASC + ` + selectMessagesSinceTimeIncludeScheduledQuery = ` + SELECT id, time, topic, message, title, priority, tags FROM messages WHERE topic = ? AND time >= ? ORDER BY time ASC ` + selectMessagesDueQuery = ` + SELECT id, time, topic, message, title, priority, tags + FROM messages + WHERE time <= ? AND published = 0 + ` + updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE id = ?` selectMessagesCountQuery = `SELECT COUNT(*) FROM messages` selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?` selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic` @@ -41,7 +54,7 @@ const ( // Schema management queries const ( - currentSchemaVersion = 1 + currentSchemaVersion = 2 createSchemaVersionTableQuery = ` CREATE TABLE IF NOT EXISTS schemaVersion ( id INT PRIMARY KEY, @@ -49,6 +62,7 @@ const ( ); ` insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)` + updateSchemaVersion = `UPDATE schemaVersion SET version = ? WHERE id = 1` selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1` // 0 -> 1 @@ -59,6 +73,13 @@ const ( ALTER TABLE messages ADD COLUMN tags VARCHAR(256) NOT NULL DEFAULT(''); COMMIT; ` + + // 1 -> 2 + migrate1To2AlterMessagesTableQuery = ` + BEGIN; + ALTER TABLE messages ADD COLUMN published INT NOT NULL DEFAULT(1); + COMMIT; + ` ) type sqliteCache struct { @@ -84,46 +105,39 @@ func (c *sqliteCache) AddMessage(m *message) error { if m.Event != messageEvent { return errUnexpectedMessageType } - _, err := c.db.Exec(insertMessageQuery, m.ID, m.Time, m.Topic, m.Message, m.Title, m.Priority, strings.Join(m.Tags, ",")) + published := m.Time <= time.Now().Unix() + _, err := c.db.Exec(insertMessageQuery, m.ID, m.Time, m.Topic, m.Message, m.Title, m.Priority, strings.Join(m.Tags, ","), published) return err } -func (c *sqliteCache) Messages(topic string, since sinceTime) ([]*message, error) { +func (c *sqliteCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) { if since.IsNone() { return make([]*message, 0), nil } - rows, err := c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix()) + var rows *sql.Rows + var err error + if scheduled { + rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix()) + } else { + rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix()) + } if err != nil { return nil, err } - defer rows.Close() - messages := make([]*message, 0) - for rows.Next() { - var timestamp int64 - var priority int - var id, msg, title, tagsStr string - if err := rows.Scan(&id, ×tamp, &msg, &title, &priority, &tagsStr); err != nil { - return nil, err - } - var tags []string - if tagsStr != "" { - tags = strings.Split(tagsStr, ",") - } - messages = append(messages, &message{ - ID: id, - Time: timestamp, - Event: messageEvent, - Topic: topic, - Message: msg, - Title: title, - Priority: priority, - Tags: tags, - }) - } - if err := rows.Err(); err != nil { + return readMessages(rows) +} + +func (c *sqliteCache) MessagesDue() ([]*message, error) { + rows, err := c.db.Query(selectMessagesDueQuery, time.Now().Unix()) + if err != nil { return nil, err } - return messages, nil + return readMessages(rows) +} + +func (c *sqliteCache) MarkPublished(m *message) error { + _, err := c.db.Exec(updateMessagePublishedQuery, m.ID) + return err } func (c *sqliteCache) MessageCount(topic string) (int, error) { @@ -169,6 +183,37 @@ func (c *sqliteCache) Prune(olderThan time.Time) error { return err } +func readMessages(rows *sql.Rows) ([]*message, error) { + defer rows.Close() + messages := make([]*message, 0) + for rows.Next() { + var timestamp int64 + var priority int + var id, topic, msg, title, tagsStr string + if err := rows.Scan(&id, ×tamp, &topic, &msg, &title, &priority, &tagsStr); err != nil { + return nil, err + } + var tags []string + if tagsStr != "" { + tags = strings.Split(tagsStr, ",") + } + messages = append(messages, &message{ + ID: id, + Time: timestamp, + Event: messageEvent, + Topic: topic, + Message: msg, + Title: title, + Priority: priority, + Tags: tags, + }) + } + if err := rows.Err(); err != nil { + return nil, err + } + return messages, nil +} + func setupDB(db *sql.DB) error { // If 'messages' table does not exist, this must be a new database rowsMC, err := db.Query(selectMessagesCountQuery) @@ -194,7 +239,9 @@ func setupDB(db *sql.DB) error { if schemaVersion == currentSchemaVersion { return nil } else if schemaVersion == 0 { - return migrateFrom0To1(db) + return migrateFrom0(db) + } else if schemaVersion == 1 { + return migrateFrom1(db) } return fmt.Errorf("unexpected schema version found: %d", schemaVersion) } @@ -212,7 +259,7 @@ func setupNewDB(db *sql.DB) error { return nil } -func migrateFrom0To1(db *sql.DB) error { +func migrateFrom0(db *sql.DB) error { log.Print("Migrating cache database schema: from 0 to 1") if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil { return err @@ -223,5 +270,16 @@ func migrateFrom0To1(db *sql.DB) error { if _, err := db.Exec(insertSchemaVersion, 1); err != nil { return err } - return nil + return migrateFrom1(db) +} + +func migrateFrom1(db *sql.DB) error { + log.Print("Migrating cache database schema: from 1 to 2") + if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil { + return err + } + if _, err := db.Exec(updateSchemaVersion, 2); err != nil { + return err + } + return nil // Update this when a new version is added } diff --git a/server/cache_sqlite_test.go b/server/cache_sqlite_test.go index 0f6c430..eb7b64a 100644 --- a/server/cache_sqlite_test.go +++ b/server/cache_sqlite_test.go @@ -9,10 +9,14 @@ import ( "time" ) -func TestSqliteCache_AddMessage(t *testing.T) { +func TestSqliteCache_Messages(t *testing.T) { testCacheMessages(t, newSqliteTestCache(t)) } +func TestSqliteCache_MessagesScheduled(t *testing.T) { + testCacheMessagesScheduled(t, newSqliteTestCache(t)) +} + func TestSqliteCache_Topics(t *testing.T) { testCacheTopics(t, newSqliteTestCache(t)) } @@ -25,7 +29,7 @@ func TestSqliteCache_Prune(t *testing.T) { testCachePrune(t, newSqliteTestCache(t)) } -func TestSqliteCache_Migration_0to1(t *testing.T) { +func TestSqliteCache_Migration_From0(t *testing.T) { filename := newSqliteTestCacheFile(t) db, err := sql.Open("sqlite3", filename) assert.Nil(t, err) @@ -53,7 +57,7 @@ func TestSqliteCache_Migration_0to1(t *testing.T) { // Create cache to trigger migration c := newSqliteTestCacheFromFile(t, filename) - messages, err := c.Messages("mytopic", sinceAllMessages) + messages, err := c.Messages("mytopic", sinceAllMessages, false) assert.Nil(t, err) assert.Equal(t, 10, len(messages)) assert.Equal(t, "some message 5", messages[5].Message) @@ -67,7 +71,7 @@ func TestSqliteCache_Migration_0to1(t *testing.T) { var schemaVersion int assert.Nil(t, rows.Scan(&schemaVersion)) - assert.Equal(t, 1, schemaVersion) + assert.Equal(t, 2, schemaVersion) } func newSqliteTestCache(t *testing.T) *sqliteCache { diff --git a/server/cache_test.go b/server/cache_test.go index ab65b06..1eae091 100644 --- a/server/cache_test.go +++ b/server/cache_test.go @@ -27,7 +27,7 @@ func testCacheMessages(t *testing.T, c cache) { assert.Equal(t, 2, count) // mytopic: since all - messages, _ := c.Messages("mytopic", sinceAllMessages) + messages, _ := c.Messages("mytopic", sinceAllMessages, false) assert.Equal(t, 2, len(messages)) assert.Equal(t, "my message", messages[0].Message) assert.Equal(t, "mytopic", messages[0].Topic) @@ -38,11 +38,11 @@ func testCacheMessages(t *testing.T, c cache) { assert.Equal(t, "my other message", messages[1].Message) // mytopic: since none - messages, _ = c.Messages("mytopic", sinceNoMessages) + messages, _ = c.Messages("mytopic", sinceNoMessages, false) assert.Empty(t, messages) // mytopic: since 2 - messages, _ = c.Messages("mytopic", sinceTime(time.Unix(2, 0))) + messages, _ = c.Messages("mytopic", sinceTime(time.Unix(2, 0)), false) assert.Equal(t, 1, len(messages)) assert.Equal(t, "my other message", messages[0].Message) @@ -52,7 +52,7 @@ func testCacheMessages(t *testing.T, c cache) { assert.Equal(t, 1, count) // example: since all - messages, _ = c.Messages("example", sinceAllMessages) + messages, _ = c.Messages("example", sinceAllMessages, false) assert.Equal(t, "my example message", messages[0].Message) // non-existing: count @@ -61,7 +61,7 @@ func testCacheMessages(t *testing.T, c cache) { assert.Equal(t, 0, count) // non-existing: since all - messages, _ = c.Messages("doesnotexist", sinceAllMessages) + messages, _ = c.Messages("doesnotexist", sinceAllMessages, false) assert.Empty(t, messages) } @@ -103,7 +103,7 @@ func testCachePrune(t *testing.T, c cache) { assert.Nil(t, err) assert.Equal(t, 0, count) - messages, err := c.Messages("mytopic", sinceAllMessages) + messages, err := c.Messages("mytopic", sinceAllMessages, false) assert.Nil(t, err) assert.Equal(t, 1, len(messages)) assert.Equal(t, "my other message", messages[0].Message) @@ -116,8 +116,34 @@ func testCacheMessagesTagsPrioAndTitle(t *testing.T, c cache) { m.Title = "some title" assert.Nil(t, c.AddMessage(m)) - messages, _ := c.Messages("mytopic", sinceAllMessages) + messages, _ := c.Messages("mytopic", sinceAllMessages, false) assert.Equal(t, []string{"tag1", "tag2"}, messages[0].Tags) assert.Equal(t, 5, messages[0].Priority) assert.Equal(t, "some title", messages[0].Title) } + +func testCacheMessagesScheduled(t *testing.T, c cache) { + m1 := newDefaultMessage("mytopic", "message 1") + m2 := newDefaultMessage("mytopic", "message 2") + m2.Time = time.Now().Add(time.Hour).Unix() + m3 := newDefaultMessage("mytopic", "message 3") + m3.Time = time.Now().Add(time.Minute).Unix() // earlier than m2! + m4 := newDefaultMessage("mytopic2", "message 4") + m4.Time = time.Now().Add(time.Minute).Unix() + assert.Nil(t, c.AddMessage(m1)) + assert.Nil(t, c.AddMessage(m2)) + assert.Nil(t, c.AddMessage(m3)) + + messages, _ := c.Messages("mytopic", sinceAllMessages, false) // exclude scheduled + assert.Equal(t, 1, len(messages)) + assert.Equal(t, "message 1", messages[0].Message) + + messages, _ = c.Messages("mytopic", sinceAllMessages, true) // include scheduled + assert.Equal(t, 3, len(messages)) + assert.Equal(t, "message 1", messages[0].Message) + assert.Equal(t, "message 3", messages[1].Message) // Order! + assert.Equal(t, "message 2", messages[2].Message) + + messages, _ = c.MessagesDue() + assert.Empty(t, messages) +} diff --git a/server/server.go b/server/server.go index 7ee039c..73727f2 100644 --- a/server/server.go +++ b/server/server.go @@ -73,6 +73,7 @@ var ( const ( messageLimit = 512 + minDelay = 10 * time.Second ) var ( @@ -183,6 +184,15 @@ func (s *Server) Run() error { s.updateStatsAndExpire() } }() + go func() { + ticker := time.NewTicker(s.config.AtSenderInterval) + for { + <-ticker.C + if err := s.sendDelayedMessages(); err != nil { + log.Printf("error sending scheduled messages: %s", err.Error()) + } + } + }() listenStr := fmt.Sprintf("%s/http", s.config.ListenHTTP) if s.config.ListenHTTPS != "" { listenStr += fmt.Sprintf(" %s/https", s.config.ListenHTTPS) @@ -279,14 +289,17 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, _ *visito if m.Message == "" { return errHTTPBadRequest } - title, priority, tags, cache, firebase := parseHeaders(r.Header) - m.Title = title - m.Priority = priority - m.Tags = tags - if err := t.Publish(m); err != nil { + cache, firebase, err := parseHeaders(r.Header, m) + if err != nil { return err } - if s.firebase != nil && firebase { + delayed := m.Time > time.Now().Unix() + if !delayed { + if err := t.Publish(m); err != nil { + return err + } + } + if s.firebase != nil && firebase && !delayed { go func() { if err := s.firebase(m); err != nil { log.Printf("Unable to publish to Firebase: %v", err.Error()) @@ -308,35 +321,62 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, _ *visito return nil } -func parseHeaders(header http.Header) (title string, priority int, tags []string, cache bool, firebase bool) { - title = readHeader(header, "x-title", "title", "ti", "t") +func parseHeaders(header http.Header, m *message) (cache bool, firebase bool, err error) { + cache = readHeader(header, "x-cache", "cache") != "no" + firebase = readHeader(header, "x-firebase", "firebase") != "no" + m.Title = readHeader(header, "x-title", "title", "ti", "t") priorityStr := readHeader(header, "x-priority", "priority", "prio", "p") if priorityStr != "" { switch strings.ToLower(priorityStr) { case "1", "min": - priority = 1 + m.Priority = 1 case "2", "low": - priority = 2 + m.Priority = 2 case "3", "default": - priority = 3 + m.Priority = 3 case "4", "high": - priority = 4 + m.Priority = 4 case "5", "max", "urgent": - priority = 5 + m.Priority = 5 default: - priority = 0 + return false, false, errHTTPBadRequest } } tagsStr := readHeader(header, "x-tags", "tag", "tags", "ta") if tagsStr != "" { - tags = make([]string, 0) + m.Tags = make([]string, 0) for _, s := range strings.Split(tagsStr, ",") { - tags = append(tags, strings.TrimSpace(s)) + m.Tags = append(m.Tags, strings.TrimSpace(s)) } } - cache = readHeader(header, "x-cache", "cache") != "no" - firebase = readHeader(header, "x-firebase", "firebase") != "no" - return title, priority, tags, cache, firebase + atStr := readHeader(header, "x-at", "at", "x-schedule", "schedule", "sched") + if atStr != "" { + if !cache { + return false, false, errHTTPBadRequest + } + at, err := strconv.Atoi(atStr) + if err != nil { + return false, false, errHTTPBadRequest + } else if int64(at) < time.Now().Add(minDelay).Unix() { + return false, false, errHTTPBadRequest + } + m.Time = int64(at) + } else { + delayStr := readHeader(header, "x-delay", "delay", "x-in", "in") + if delayStr != "" { + if !cache { + return false, false, errHTTPBadRequest + } + delay, err := time.ParseDuration(delayStr) + if err != nil { + return false, false, errHTTPBadRequest + } else if delay < minDelay { + return false, false, errHTTPBadRequest + } + m.Time = time.Now().Add(delay).Unix() + } + } + return cache, firebase, nil } func readHeader(header http.Header, names ...string) string { @@ -401,6 +441,7 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi } var wlock sync.Mutex poll := r.URL.Query().Has("poll") + scheduled := r.URL.Query().Has("scheduled") || r.URL.Query().Has("sched") sub := func(msg *message) error { wlock.Lock() defer wlock.Unlock() @@ -419,7 +460,7 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests w.Header().Set("Content-Type", contentType+"; charset=utf-8") // Android/Volley client needs charset! if poll { - return s.sendOldMessages(topics, since, sub) + return s.sendOldMessages(topics, since, scheduled, sub) } subscriberIDs := make([]int, 0) for _, t := range topics { @@ -433,7 +474,7 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi if err := sub(newOpenMessage(topicsStr)); err != nil { // Send out open message return err } - if err := s.sendOldMessages(topics, since, sub); err != nil { + if err := s.sendOldMessages(topics, since, scheduled, sub); err != nil { return err } for { @@ -449,12 +490,12 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi } } -func (s *Server) sendOldMessages(topics []*topic, since sinceTime, sub subscriber) error { +func (s *Server) sendOldMessages(topics []*topic, since sinceTime, scheduled bool, sub subscriber) error { if since.IsNone() { return nil } for _, t := range topics { - messages, err := s.cache.Messages(t.ID, since) + messages, err := s.cache.Messages(t.ID, since, scheduled) if err != nil { return err } @@ -560,6 +601,32 @@ func (s *Server) updateStatsAndExpire() { s.messages, len(s.topics), subscribers, messages, len(s.visitors)) } +func (s *Server) sendDelayedMessages() error { + s.mu.Lock() + defer s.mu.Unlock() + messages, err := s.cache.MessagesDue() + if err != nil { + return err + } + for _, m := range messages { + t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published + if ok { + if err := t.Publish(m); err != nil { + log.Printf("unable to publish message %s to topic %s: %v", m.ID, m.Topic, err.Error()) + } + if s.firebase != nil { + if err := s.firebase(m); err != nil { + log.Printf("unable to publish to Firebase: %v", err.Error()) + } + } + } + if err := s.cache.MarkPublished(m); err != nil { + return err + } + } + return nil +} + func (s *Server) withRateLimit(w http.ResponseWriter, r *http.Request, handler func(w http.ResponseWriter, r *http.Request, v *visitor) error) error { v := s.visitor(r) if err := v.RequestAllowed(); err != nil { From 06b4d9c83b94b71bf3d1ec563405ed7499106316 Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Fri, 10 Dec 2021 19:59:51 -0500 Subject: [PATCH 2/5] Natural language --- go.mod | 13 +++++-- go.sum | 24 +++++++++--- server/server.go | 24 +++--------- util/time.go | 95 +++++++++++++++++++++++++++++++++++++++++++++++ util/time_test.go | 60 ++++++++++++++++++++++++++++++ 5 files changed, 188 insertions(+), 28 deletions(-) create mode 100644 util/time.go create mode 100644 util/time_test.go diff --git a/go.mod b/go.mod index 9987eeb..2d4db26 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module heckel.io/ntfy go 1.17 +replace github.com/olebedev/when => github.com/binwiederhier/when v0.0.1-binwiederhier2 + require ( cloud.google.com/go/firestore v1.6.1 // indirect cloud.google.com/go/storage v1.18.2 // indirect @@ -9,36 +11,39 @@ require ( github.com/BurntSushi/toml v0.4.1 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect github.com/mattn/go-sqlite3 v1.14.9 + github.com/olebedev/when v0.0.0-20190311101825-c3b538a97254 github.com/stretchr/testify v1.7.0 github.com/urfave/cli/v2 v2.3.0 golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 - google.golang.org/api v0.61.0 + google.golang.org/api v0.62.0 gopkg.in/yaml.v2 v2.4.0 // indirect ) require ( cloud.google.com/go v0.99.0 // indirect + github.com/AlekSi/pointer v1.0.0 // indirect github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4 // indirect github.com/cncf/xds/go v0.0.0-20211130200136-a8f946100490 // indirect - github.com/davecgh/go-spew v1.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/envoyproxy/go-control-plane v0.10.1 // indirect github.com/envoyproxy/protoc-gen-validate v0.6.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.6 // indirect github.com/googleapis/gax-go/v2 v2.1.1 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect go.opencensus.io v0.23.0 // indirect golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d // indirect - golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 // indirect + golang.org/x/sys v0.0.0-20211205182925-97ca703d548d // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20211206220100-3cb06788ce7f // indirect + google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect google.golang.org/grpc v1.42.0 // indirect google.golang.org/protobuf v1.27.1 // indirect gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect diff --git a/go.sum b/go.sum index 48252af..e04bb27 100644 --- a/go.sum +++ b/go.sum @@ -25,6 +25,7 @@ cloud.google.com/go v0.90.0/go.mod h1:kRX0mNRHe0e2rC6oNakvwQqzyDmg57xJ+SZU1eT2aD cloud.google.com/go v0.93.3/go.mod h1:8utlLll2EF5XMAV15woO4lSbWQlk8rer9aLOfLh7+YI= cloud.google.com/go v0.94.1/go.mod h1:qAlAugsXlC+JWO+Bke5vCtc9ONxjQT3drlTTnAplMW4= cloud.google.com/go v0.97.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Udc= +cloud.google.com/go v0.98.0/go.mod h1:ua6Ush4NALrHk5QXDWnjvZHN93OuF0HfuEPq9I1X0cM= cloud.google.com/go v0.99.0 h1:y/cM2iqGgGi5D5DQZl6D9STN/3dR/Vx5Mp8s752oJTY= cloud.google.com/go v0.99.0/go.mod h1:w0Xx2nLzqWJPuozYQX+hFfCSI8WioryfRDzkoI/Y2ZA= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= @@ -51,12 +52,16 @@ cloud.google.com/go/storage v1.18.2/go.mod h1:AiIj7BWXyhO5gGVmYJ+S8tbkCx3yb0IMju dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4= firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs= +github.com/AlekSi/pointer v1.0.0 h1:KWCWzsvFxNLcmM5XmiqHsGTTsuwZMsLFwWF9Y+//bNE= +github.com/AlekSi/pointer v1.0.0/go.mod h1:1kjywbfcPFCmncIxtk6fIEub6LKrfMz3gc5QKVOSOA8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw= github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/binwiederhier/when v0.0.1-binwiederhier2 h1:BjQC7OQI4MK0vXeltn2BEuf0Tdh/M6YNh1JrepnVr2I= +github.com/binwiederhier/when v0.0.1-binwiederhier2/go.mod h1:DPucAeQGDPUzYUt+NaWw6qsF5SFapWWToxEiVDh2aV0= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.3.0 h1:t/LhUZLVitR1Ow2YOnduCsavhwFUklBMoGVYUCqmCqk= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -84,8 +89,9 @@ github.com/cncf/xds/go v0.0.0-20211130200136-a8f946100490/go.mod h1:eXthEFrGJvWH github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU= github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -199,6 +205,8 @@ github.com/lyft/protoc-gen-star v0.5.3/go.mod h1:V0xaHgaf5oCCqmcxYcWiDfTiKsZsRc8 github.com/mattn/go-sqlite3 v1.14.9 h1:10HX2Td0ocZpYEjhilsuo6WWtUqttj2Kb0KtD86/KYA= github.com/mattn/go-sqlite3 v1.14.9/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -213,6 +221,7 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO github.com/spf13/afero v1.3.3/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4= github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -390,8 +399,9 @@ golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 h1:TyHqChC80pFkXWraUUf6RuB5IqFdQieMLwwCJokV2pc= golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211205182925-97ca703d548d h1:FjkYO/PPp4Wi0EAUOVLxePm7qVW4r4ctbWpURyuOD0E= +golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -495,8 +505,9 @@ google.golang.org/api v0.56.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqiv google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdrMgI= google.golang.org/api v0.58.0/go.mod h1:cAbP2FsxoGVNwtgNAmmn3y5G1TWAiVYRmg4yku3lv+E= google.golang.org/api v0.59.0/go.mod h1:sT2boj7M9YJxZzgeZqXogmhfmRWDtPzT31xkieUbuZU= -google.golang.org/api v0.61.0 h1:TXXKS1slM3b2bZNJwD5DV/Tp6/M2cLzLOLh9PjDhrw8= google.golang.org/api v0.61.0/go.mod h1:xQRti5UdCmoCEqFxcz93fTl338AVqDgyaDRuOZ3hg9I= +google.golang.org/api v0.62.0 h1:PhGymJMXfGBzc4lBRmrx9+1w4w2wEzURHNGF/sD/xGc= +google.golang.org/api v0.62.0/go.mod h1:dKmwPCydfsad4qCH08MSdgWjfHOyfpd4VtDGgRFdavw= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -566,9 +577,11 @@ google.golang.org/genproto v0.0.0-20211008145708-270636b82663/go.mod h1:5CzLGKJ6 google.golang.org/genproto v0.0.0-20211016002631-37fc39342514/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211028162531-8db9c33dc351/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211203200212-54befc351ae9/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211206160659-862468c7d6e0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= -google.golang.org/genproto v0.0.0-20211206220100-3cb06788ce7f h1:QH7+Ym+7e2XV1dZIHapkXoeqHyNaCzn6MNp3JBaYYUc= -google.golang.org/genproto v0.0.0-20211206220100-3cb06788ce7f/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa h1:I0YcKz0I7OAhddo7ya8kMnvprhcWM045PmkBdMO9zN0= +google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -594,6 +607,7 @@ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= +google.golang.org/grpc v1.40.1/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.42.0 h1:XT2/MFpuPFsEX2fWh3YQtHkZ+WYZFQRfaUgLZYj/p6A= google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= diff --git a/server/server.go b/server/server.go index 73727f2..fa7574c 100644 --- a/server/server.go +++ b/server/server.go @@ -349,32 +349,18 @@ func parseHeaders(header http.Header, m *message) (cache bool, firebase bool, er m.Tags = append(m.Tags, strings.TrimSpace(s)) } } - atStr := readHeader(header, "x-at", "at", "x-schedule", "schedule", "sched") - if atStr != "" { + whenStr := readHeader(header, "x-at", "at", "x-in", "in") + if whenStr != "" { if !cache { return false, false, errHTTPBadRequest } - at, err := strconv.Atoi(atStr) + at, err := util.ParseFutureTime(whenStr, time.Now()) if err != nil { return false, false, errHTTPBadRequest - } else if int64(at) < time.Now().Add(minDelay).Unix() { + } else if at.Unix() < time.Now().Add(minDelay).Unix() { return false, false, errHTTPBadRequest } - m.Time = int64(at) - } else { - delayStr := readHeader(header, "x-delay", "delay", "x-in", "in") - if delayStr != "" { - if !cache { - return false, false, errHTTPBadRequest - } - delay, err := time.ParseDuration(delayStr) - if err != nil { - return false, false, errHTTPBadRequest - } else if delay < minDelay { - return false, false, errHTTPBadRequest - } - m.Time = time.Now().Add(delay).Unix() - } + m.Time = at.Unix() } return cache, firebase, nil } diff --git a/util/time.go b/util/time.go new file mode 100644 index 0000000..cd8fa9f --- /dev/null +++ b/util/time.go @@ -0,0 +1,95 @@ +package util + +import ( + "errors" + "github.com/olebedev/when" + "regexp" + "strconv" + "strings" + "time" +) + +var ( + errUnparsableTime = errors.New("unable to parse time") + durationStrRegex = regexp.MustCompile(`(?i)^(\d+)\s*(d|days?|h|hours?|m|mins?|minutes?|s|secs?|seconds?)$`) +) + +func ParseFutureTime(s string, now time.Time) (time.Time, error) { + s = strings.TrimSpace(s) + t, err := parseUnixTime(s, now) + if err == nil { + return t, nil + } + t, err = parseFromDuration(s, now) + if err == nil { + return t, nil + } + t, err = parseNaturalTime(s, now) + if err == nil { + return t, nil + } + return time.Time{}, errUnparsableTime +} + +func parseFromDuration(s string, now time.Time) (time.Time, error) { + d, err := parseDuration(s) + if err == nil { + return now.Add(d), nil + } + return time.Time{}, errUnparsableTime +} + +func parseDuration(s string) (time.Duration, error) { + d, err := time.ParseDuration(s) + if err == nil { + return d, nil + } + matches := durationStrRegex.FindStringSubmatch(s) + if matches != nil { + number, err := strconv.Atoi(matches[1]) + if err != nil { + return 0, errUnparsableTime + } + switch unit := matches[2][0:1]; unit { + case "d": + return time.Duration(number) * 24 * time.Hour, nil + case "h": + return time.Duration(number) * time.Hour, nil + case "m": + return time.Duration(number) * time.Minute, nil + case "s": + return time.Duration(number) * time.Second, nil + default: + return 0, errUnparsableTime + } + } + return 0, errUnparsableTime +} + +func parseUnixTime(s string, now time.Time) (time.Time, error) { + t, err := strconv.Atoi(s) + if err != nil { + return time.Time{}, err + } else if int64(t) < now.Unix() { + return time.Time{}, errUnparsableTime + } + return time.Unix(int64(t), 0), nil +} + +func parseNaturalTime(s string, now time.Time) (time.Time, error) { + r, err := when.EN.Parse(s, now) // returns "nil, nil" if no matches! + if err != nil || r == nil { + return time.Time{}, errUnparsableTime + } else if r.Time.After(now) { + return r.Time, nil + } + // Hack: If the time is parsable, but not in the future, + // simply append "tomorrow, " to it. + r, err = when.EN.Parse("tomorrow, "+s, now) // returns "nil, nil" if no matches! + if err != nil || r == nil { + return time.Time{}, errUnparsableTime + } else if r.Time.After(now) { + return r.Time, nil + } + return time.Time{}, errUnparsableTime +} diff --git a/util/time_test.go b/util/time_test.go new file mode 100644 index 0000000..5575a62 --- /dev/null +++ b/util/time_test.go @@ -0,0 +1,60 @@ +package util + +import ( + "github.com/stretchr/testify/require" + "testing" + "time" +) + +var ( + // 2021-12-10 10:17:23 (Friday) + base = time.Date(2021, 12, 10, 10, 17, 23, 0, time.UTC) +) + +func TestParseFutureTime_11am_FutureTime(t *testing.T) { + d, err := ParseFutureTime("11am", base) + require.Nil(t, err) + require.Equal(t, time.Date(2021, 12, 10, 11, 0, 0, 0, time.UTC), d) // Same day +} + +func TestParseFutureTime_9am_PastTime(t *testing.T) { + d, err := ParseFutureTime("9am", base) + require.Nil(t, err) + require.Equal(t, time.Date(2021, 12, 11, 9, 0, 0, 0, time.UTC), d) // Next day +} + +func TestParseFutureTime_Monday_10_30pm_FutureTime(t *testing.T) { + d, err := ParseFutureTime("Monday, 10:30pm", base) + require.Nil(t, err) + require.Equal(t, time.Date(2021, 12, 13, 22, 30, 0, 0, time.UTC), d) +} + +func TestParseFutureTime_30m(t *testing.T) { + d, err := ParseFutureTime("30m", base) + require.Nil(t, err) + require.Equal(t, time.Date(2021, 12, 10, 10, 47, 23, 0, time.UTC), d) +} + +func TestParseFutureTime_30min(t *testing.T) { + d, err := ParseFutureTime("30min", base) + require.Nil(t, err) + require.Equal(t, time.Date(2021, 12, 10, 10, 47, 23, 0, time.UTC), d) +} + +func TestParseFutureTime_3h(t *testing.T) { + d, err := ParseFutureTime("3h", base) + require.Nil(t, err) + require.Equal(t, time.Date(2021, 12, 10, 13, 17, 23, 0, time.UTC), d) +} + +func TestParseFutureTime_1day(t *testing.T) { + d, err := ParseFutureTime("1 day", base) + require.Nil(t, err) + require.Equal(t, time.Date(2021, 12, 11, 10, 17, 23, 0, time.UTC), d) +} + +func TestParseFutureTime_UnixTime(t *testing.T) { + d, err := ParseFutureTime("1639183911", base) + require.Nil(t, err) + require.Equal(t, time.Date(2021, 12, 10, 19, 51, 51, 0, time.Local), d) +} From 5ef83a7ba09e02717aa888082f69bdade737f36d Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Fri, 10 Dec 2021 20:28:56 -0500 Subject: [PATCH 3/5] Test DB migration --- server/cache_sqlite.go | 5 +- server/cache_sqlite_test.go | 91 ++++++++++++++++++++++++++++++------- 2 files changed, 77 insertions(+), 19 deletions(-) diff --git a/server/cache_sqlite.go b/server/cache_sqlite.go index 19eddee..c6813d5 100644 --- a/server/cache_sqlite.go +++ b/server/cache_sqlite.go @@ -76,9 +76,7 @@ const ( // 1 -> 2 migrate1To2AlterMessagesTableQuery = ` - BEGIN; ALTER TABLE messages ADD COLUMN published INT NOT NULL DEFAULT(1); - COMMIT; ` ) @@ -220,7 +218,7 @@ func setupDB(db *sql.DB) error { if err != nil { return setupNewDB(db) } - defer rowsMC.Close() + rowsMC.Close() // If 'messages' table exists, check 'schemaVersion' table schemaVersion := 0 @@ -233,6 +231,7 @@ func setupDB(db *sql.DB) error { if err := rowsSV.Scan(&schemaVersion); err != nil { return err } + rowsSV.Close() } // Do migrations diff --git a/server/cache_sqlite_test.go b/server/cache_sqlite_test.go index eb7b64a..384da25 100644 --- a/server/cache_sqlite_test.go +++ b/server/cache_sqlite_test.go @@ -3,7 +3,7 @@ package server import ( "database/sql" "fmt" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "path/filepath" "testing" "time" @@ -32,7 +32,7 @@ func TestSqliteCache_Prune(t *testing.T) { func TestSqliteCache_Migration_From0(t *testing.T) { filename := newSqliteTestCacheFile(t) db, err := sql.Open("sqlite3", filename) - assert.Nil(t, err) + require.Nil(t, err) // Create "version 0" schema _, err = db.Exec(` @@ -46,32 +46,91 @@ func TestSqliteCache_Migration_From0(t *testing.T) { CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic); COMMIT; `) - assert.Nil(t, err) + require.Nil(t, err) // Insert a bunch of messages for i := 0; i < 10; i++ { _, err = db.Exec(`INSERT INTO messages (id, time, topic, message) VALUES (?, ?, ?, ?)`, fmt.Sprintf("abcd%d", i), time.Now().Unix(), "mytopic", fmt.Sprintf("some message %d", i)) - assert.Nil(t, err) + require.Nil(t, err) } + require.Nil(t, db.Close()) // Create cache to trigger migration c := newSqliteTestCacheFromFile(t, filename) - messages, err := c.Messages("mytopic", sinceAllMessages, false) - assert.Nil(t, err) - assert.Equal(t, 10, len(messages)) - assert.Equal(t, "some message 5", messages[5].Message) - assert.Equal(t, "", messages[5].Title) - assert.Nil(t, messages[5].Tags) - assert.Equal(t, 0, messages[5].Priority) + checkSchemaVersion(t, c.db) - rows, err := c.db.Query(`SELECT version FROM schemaVersion`) - assert.Nil(t, err) - assert.True(t, rows.Next()) + messages, err := c.Messages("mytopic", sinceAllMessages, false) + require.Nil(t, err) + require.Equal(t, 10, len(messages)) + require.Equal(t, "some message 5", messages[5].Message) + require.Equal(t, "", messages[5].Title) + require.Nil(t, messages[5].Tags) + require.Equal(t, 0, messages[5].Priority) +} + +func TestSqliteCache_Migration_From1(t *testing.T) { + filename := newSqliteTestCacheFile(t) + db, err := sql.Open("sqlite3", filename) + require.Nil(t, err) + + // Create "version 1" schema + _, err = db.Exec(` + CREATE TABLE IF NOT EXISTS messages ( + id VARCHAR(20) PRIMARY KEY, + time INT NOT NULL, + topic VARCHAR(64) NOT NULL, + message VARCHAR(512) NOT NULL, + title VARCHAR(256) NOT NULL, + priority INT NOT NULL, + tags VARCHAR(256) NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic); + CREATE TABLE IF NOT EXISTS schemaVersion ( + id INT PRIMARY KEY, + version INT NOT NULL + ); + INSERT INTO schemaVersion (id, version) VALUES (1, 1); + `) + require.Nil(t, err) + + // Insert a bunch of messages + for i := 0; i < 10; i++ { + _, err = db.Exec(`INSERT INTO messages (id, time, topic, message, title, priority, tags) VALUES (?, ?, ?, ?, ?, ?, ?)`, + fmt.Sprintf("abcd%d", i), time.Now().Unix(), "mytopic", fmt.Sprintf("some message %d", i), "", 0, "") + require.Nil(t, err) + } + require.Nil(t, db.Close()) + + // Create cache to trigger migration + c := newSqliteTestCacheFromFile(t, filename) + checkSchemaVersion(t, c.db) + + // Add delayed message + delayedMessage := newDefaultMessage("mytopic", "some delayed message") + delayedMessage.Time = time.Now().Add(time.Minute).Unix() + require.Nil(t, c.AddMessage(delayedMessage)) + + // 10, not 11! + messages, err := c.Messages("mytopic", sinceAllMessages, false) + require.Nil(t, err) + require.Equal(t, 10, len(messages)) + + // 11! + messages, err = c.Messages("mytopic", sinceAllMessages, true) + require.Nil(t, err) + require.Equal(t, 11, len(messages)) +} + +func checkSchemaVersion(t *testing.T, db *sql.DB) { + rows, err := db.Query(`SELECT version FROM schemaVersion`) + require.Nil(t, err) + require.True(t, rows.Next()) var schemaVersion int - assert.Nil(t, rows.Scan(&schemaVersion)) - assert.Equal(t, 2, schemaVersion) + require.Nil(t, rows.Scan(&schemaVersion)) + require.Equal(t, currentSchemaVersion, schemaVersion) + require.Nil(t, rows.Close()) } func newSqliteTestCache(t *testing.T) *sqliteCache { From e8688fed4b2b752ab3ec3bd724d0a6eceb7ef7f7 Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Fri, 10 Dec 2021 22:57:01 -0500 Subject: [PATCH 4/5] Lots more tests --- config/config.go | 9 ++ server/cache_sqlite.go | 2 +- server/server.go | 25 ++--- server/server_test.go | 244 +++++++++++++++++++++++++++-------------- util/time.go | 2 +- util/time_test.go | 2 +- 6 files changed, 186 insertions(+), 98 deletions(-) diff --git a/config/config.go b/config/config.go index 7fcea45..9e1640a 100644 --- a/config/config.go +++ b/config/config.go @@ -12,6 +12,9 @@ const ( DefaultKeepaliveInterval = 30 * time.Second DefaultManagerInterval = time.Minute DefaultAtSenderInterval = 10 * time.Second + DefaultMinDelay = 10 * time.Second + DefaultMaxDelay = 3 * 24 * time.Hour + DefaultMessageLimit = 512 ) // Defines all the limits @@ -37,6 +40,9 @@ type Config struct { KeepaliveInterval time.Duration ManagerInterval time.Duration AtSenderInterval time.Duration + MessageLimit int + MinDelay time.Duration + MaxDelay time.Duration GlobalTopicLimit int VisitorRequestLimitBurst int VisitorRequestLimitReplenish time.Duration @@ -56,6 +62,9 @@ func New(listenHTTP string) *Config { CacheDuration: DefaultCacheDuration, KeepaliveInterval: DefaultKeepaliveInterval, ManagerInterval: DefaultManagerInterval, + MessageLimit: DefaultMessageLimit, + MinDelay: DefaultMinDelay, + MaxDelay: DefaultMaxDelay, AtSenderInterval: DefaultAtSenderInterval, GlobalTopicLimit: DefaultGlobalTopicLimit, VisitorRequestLimitBurst: DefaultVisitorRequestLimitBurst, diff --git a/server/cache_sqlite.go b/server/cache_sqlite.go index c6813d5..82d0907 100644 --- a/server/cache_sqlite.go +++ b/server/cache_sqlite.go @@ -28,7 +28,7 @@ const ( COMMIT; ` insertMessageQuery = `INSERT INTO messages (id, time, topic, message, title, priority, tags, published) VALUES (?, ?, ?, ?, ?, ?, ?, ?)` - pruneMessagesQuery = `DELETE FROM messages WHERE time < ?` + pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1` selectMessagesSinceTimeQuery = ` SELECT id, time, topic, message, title, priority, tags FROM messages diff --git a/server/server.go b/server/server.go index fa7574c..b5bd31e 100644 --- a/server/server.go +++ b/server/server.go @@ -71,11 +71,6 @@ var ( sinceNoMessages = sinceTime(time.Unix(1, 0)) ) -const ( - messageLimit = 512 - minDelay = 10 * time.Second -) - var ( topicRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app! jsonRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`) @@ -181,7 +176,7 @@ func (s *Server) Run() error { ticker := time.NewTicker(s.config.ManagerInterval) for { <-ticker.C - s.updateStatsAndExpire() + s.updateStatsAndPrune() } }() go func() { @@ -280,7 +275,7 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, _ *visito if err != nil { return err } - reader := io.LimitReader(r.Body, messageLimit) + reader := io.LimitReader(r.Body, int64(s.config.MessageLimit)) b, err := io.ReadAll(reader) if err != nil { return err @@ -289,7 +284,7 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, _ *visito if m.Message == "" { return errHTTPBadRequest } - cache, firebase, err := parseHeaders(r.Header, m) + cache, firebase, err := s.parseHeaders(r.Header, m) if err != nil { return err } @@ -321,7 +316,7 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, _ *visito return nil } -func parseHeaders(header http.Header, m *message) (cache bool, firebase bool, err error) { +func (s *Server) parseHeaders(header http.Header, m *message) (cache bool, firebase bool, err error) { cache = readHeader(header, "x-cache", "cache") != "no" firebase = readHeader(header, "x-firebase", "firebase") != "no" m.Title = readHeader(header, "x-title", "title", "ti", "t") @@ -349,7 +344,7 @@ func parseHeaders(header http.Header, m *message) (cache bool, firebase bool, er m.Tags = append(m.Tags, strings.TrimSpace(s)) } } - whenStr := readHeader(header, "x-at", "at", "x-in", "in") + whenStr := readHeader(header, "x-at", "at", "x-in", "in", "x-delay", "delay") if whenStr != "" { if !cache { return false, false, errHTTPBadRequest @@ -357,7 +352,9 @@ func parseHeaders(header http.Header, m *message) (cache bool, firebase bool, er at, err := util.ParseFutureTime(whenStr, time.Now()) if err != nil { return false, false, errHTTPBadRequest - } else if at.Unix() < time.Now().Add(minDelay).Unix() { + } else if at.Unix() < time.Now().Add(s.config.MinDelay).Unix() { + return false, false, errHTTPBadRequest + } else if at.Unix() > time.Now().Add(s.config.MaxDelay).Unix() { return false, false, errHTTPBadRequest } m.Time = at.Unix() @@ -548,7 +545,7 @@ func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) { return topics, nil } -func (s *Server) updateStatsAndExpire() { +func (s *Server) updateStatsAndPrune() { s.mu.Lock() defer s.mu.Unlock() @@ -559,13 +556,13 @@ func (s *Server) updateStatsAndExpire() { } } - // Prune cache + // Prune message cache olderThan := time.Now().Add(-1 * s.config.CacheDuration) if err := s.cache.Prune(olderThan); err != nil { log.Printf("error pruning cache: %s", err.Error()) } - // Prune old messages, remove subscriptions without subscribers + // Prune old topics, remove subscriptions without subscribers var subscribers, messages int for _, t := range s.topics { subs := t.Subscribers() diff --git a/server/server_test.go b/server/server_test.go index 1513cb9..cbe24a5 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -4,7 +4,7 @@ import ( "bufio" "context" "encoding/json" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "heckel.io/ntfy/config" "net/http" "net/http/httptest" @@ -19,33 +19,33 @@ func TestServer_PublishAndPoll(t *testing.T) { response1 := request(t, s, "PUT", "/mytopic", "my first message", nil) msg1 := toMessage(t, response1.Body.String()) - assert.NotEmpty(t, msg1.ID) - assert.Equal(t, "my first message", msg1.Message) + require.NotEmpty(t, msg1.ID) + require.Equal(t, "my first message", msg1.Message) response2 := request(t, s, "PUT", "/mytopic", "my second\n\nmessage", nil) msg2 := toMessage(t, response2.Body.String()) - assert.NotEqual(t, msg1.ID, msg2.ID) - assert.NotEmpty(t, msg2.ID) - assert.Equal(t, "my second\n\nmessage", msg2.Message) + require.NotEqual(t, msg1.ID, msg2.ID) + require.NotEmpty(t, msg2.ID) + require.Equal(t, "my second\n\nmessage", msg2.Message) response := request(t, s, "GET", "/mytopic/json?poll=1", "", nil) messages := toMessages(t, response.Body.String()) - assert.Equal(t, 2, len(messages)) - assert.Equal(t, "my first message", messages[0].Message) - assert.Equal(t, "my second\n\nmessage", messages[1].Message) + require.Equal(t, 2, len(messages)) + require.Equal(t, "my first message", messages[0].Message) + require.Equal(t, "my second\n\nmessage", messages[1].Message) response = request(t, s, "GET", "/mytopic/sse?poll=1", "", nil) lines := strings.Split(strings.TrimSpace(response.Body.String()), "\n") - assert.Equal(t, 3, len(lines)) - assert.Equal(t, "my first message", toMessage(t, strings.TrimPrefix(lines[0], "data: ")).Message) - assert.Equal(t, "", lines[1]) - assert.Equal(t, "my second\n\nmessage", toMessage(t, strings.TrimPrefix(lines[2], "data: ")).Message) + require.Equal(t, 3, len(lines)) + require.Equal(t, "my first message", toMessage(t, strings.TrimPrefix(lines[0], "data: ")).Message) + require.Equal(t, "", lines[1]) + require.Equal(t, "my second\n\nmessage", toMessage(t, strings.TrimPrefix(lines[2], "data: ")).Message) response = request(t, s, "GET", "/mytopic/raw?poll=1", "", nil) lines = strings.Split(strings.TrimSpace(response.Body.String()), "\n") - assert.Equal(t, 2, len(lines)) - assert.Equal(t, "my first message", lines[0]) - assert.Equal(t, "my second message", lines[1]) // \n -> " " + require.Equal(t, 2, len(lines)) + require.Equal(t, "my first message", lines[0]) + require.Equal(t, "my second message", lines[1]) // \n -> " " } func TestServer_SubscribeOpenAndKeepalive(t *testing.T) { @@ -69,21 +69,21 @@ func TestServer_SubscribeOpenAndKeepalive(t *testing.T) { <-doneChan messages := toMessages(t, rr.Body.String()) - assert.Equal(t, 2, len(messages)) + require.Equal(t, 2, len(messages)) - assert.Equal(t, openEvent, messages[0].Event) - assert.Equal(t, "mytopic", messages[0].Topic) - assert.Equal(t, "", messages[0].Message) - assert.Equal(t, "", messages[0].Title) - assert.Equal(t, 0, messages[0].Priority) - assert.Nil(t, messages[0].Tags) + require.Equal(t, openEvent, messages[0].Event) + require.Equal(t, "mytopic", messages[0].Topic) + require.Equal(t, "", messages[0].Message) + require.Equal(t, "", messages[0].Title) + require.Equal(t, 0, messages[0].Priority) + require.Nil(t, messages[0].Tags) - assert.Equal(t, keepaliveEvent, messages[1].Event) - assert.Equal(t, "mytopic", messages[1].Topic) - assert.Equal(t, "", messages[1].Message) - assert.Equal(t, "", messages[1].Title) - assert.Equal(t, 0, messages[1].Priority) - assert.Nil(t, messages[1].Tags) + require.Equal(t, keepaliveEvent, messages[1].Event) + require.Equal(t, "mytopic", messages[1].Topic) + require.Equal(t, "", messages[1].Message) + require.Equal(t, "", messages[1].Title) + require.Equal(t, 0, messages[1].Priority) + require.Nil(t, messages[1].Tags) } func TestServer_PublishAndSubscribe(t *testing.T) { @@ -93,63 +93,79 @@ func TestServer_PublishAndSubscribe(t *testing.T) { subscribeCancel := subscribe(t, s, "/mytopic/json", subscribeRR) publishFirstRR := request(t, s, "PUT", "/mytopic", "my first message", nil) - assert.Equal(t, 200, publishFirstRR.Code) + require.Equal(t, 200, publishFirstRR.Code) publishSecondRR := request(t, s, "PUT", "/mytopic", "my other message", map[string]string{ "Title": " This is a title ", "X-Tags": "tag1,tag 2, tag3", "p": "1", }) - assert.Equal(t, 200, publishSecondRR.Code) + require.Equal(t, 200, publishSecondRR.Code) subscribeCancel() messages := toMessages(t, subscribeRR.Body.String()) - assert.Equal(t, 3, len(messages)) - assert.Equal(t, openEvent, messages[0].Event) + require.Equal(t, 3, len(messages)) + require.Equal(t, openEvent, messages[0].Event) - assert.Equal(t, messageEvent, messages[1].Event) - assert.Equal(t, "mytopic", messages[1].Topic) - assert.Equal(t, "my first message", messages[1].Message) - assert.Equal(t, "", messages[1].Title) - assert.Equal(t, 0, messages[1].Priority) - assert.Nil(t, messages[1].Tags) + require.Equal(t, messageEvent, messages[1].Event) + require.Equal(t, "mytopic", messages[1].Topic) + require.Equal(t, "my first message", messages[1].Message) + require.Equal(t, "", messages[1].Title) + require.Equal(t, 0, messages[1].Priority) + require.Nil(t, messages[1].Tags) - assert.Equal(t, messageEvent, messages[2].Event) - assert.Equal(t, "mytopic", messages[2].Topic) - assert.Equal(t, "my other message", messages[2].Message) - assert.Equal(t, "This is a title", messages[2].Title) - assert.Equal(t, 1, messages[2].Priority) - assert.Equal(t, []string{"tag1", "tag 2", "tag3"}, messages[2].Tags) + require.Equal(t, messageEvent, messages[2].Event) + require.Equal(t, "mytopic", messages[2].Topic) + require.Equal(t, "my other message", messages[2].Message) + require.Equal(t, "This is a title", messages[2].Title) + require.Equal(t, 1, messages[2].Priority) + require.Equal(t, []string{"tag1", "tag 2", "tag3"}, messages[2].Tags) } func TestServer_StaticSites(t *testing.T) { s := newTestServer(t, newTestConfig(t)) rr := request(t, s, "GET", "/", "", nil) - assert.Equal(t, 200, rr.Code) - assert.Contains(t, rr.Body.String(), "") + require.Equal(t, 200, rr.Code) + require.Contains(t, rr.Body.String(), "") rr = request(t, s, "HEAD", "/", "", nil) - assert.Equal(t, 200, rr.Code) + require.Equal(t, 200, rr.Code) rr = request(t, s, "GET", "/does-not-exist.txt", "", nil) - assert.Equal(t, 404, rr.Code) + require.Equal(t, 404, rr.Code) rr = request(t, s, "GET", "/mytopic", "", nil) - assert.Equal(t, 200, rr.Code) - assert.Contains(t, rr.Body.String(), ``) + require.Equal(t, 200, rr.Code) + require.Contains(t, rr.Body.String(), ``) rr = request(t, s, "GET", "/static/css/app.css", "", nil) - assert.Equal(t, 200, rr.Code) - assert.Contains(t, rr.Body.String(), `html, body {`) + require.Equal(t, 200, rr.Code) + require.Contains(t, rr.Body.String(), `html, body {`) rr = request(t, s, "GET", "/docs", "", nil) - assert.Equal(t, 301, rr.Code) + require.Equal(t, 301, rr.Code) rr = request(t, s, "GET", "/docs/", "", nil) - assert.Equal(t, 200, rr.Code) - assert.Contains(t, rr.Body.String(), `Made with ❤️ by Philipp C. Heckel`) - assert.Contains(t, rr.Body.String(), ``) + require.Equal(t, 200, rr.Code) + require.Contains(t, rr.Body.String(), `Made with ❤️ by Philipp C. Heckel`) + require.Contains(t, rr.Body.String(), ``) +} + +func TestServer_PublishLargeMessage(t *testing.T) { + s := newTestServer(t, newTestConfig(t)) + + body := strings.Repeat("this is a large message", 1000) + truncated := body[0:512] + response := request(t, s, "PUT", "/mytopic", body, nil) + msg := toMessage(t, response.Body.String()) + require.NotEmpty(t, msg.ID) + require.Equal(t, truncated, msg.Message) + + response = request(t, s, "GET", "/mytopic/json?poll=1", "", nil) + messages := toMessages(t, response.Body.String()) + require.Equal(t, 1, len(messages)) + require.Equal(t, truncated, messages[0].Message) } func TestServer_PublishNoCache(t *testing.T) { @@ -159,12 +175,78 @@ func TestServer_PublishNoCache(t *testing.T) { "Cache": "no", }) msg := toMessage(t, response.Body.String()) - assert.NotEmpty(t, msg.ID) - assert.Equal(t, "this message is not cached", msg.Message) + require.NotEmpty(t, msg.ID) + require.Equal(t, "this message is not cached", msg.Message) response = request(t, s, "GET", "/mytopic/json?poll=1", "", nil) messages := toMessages(t, response.Body.String()) - assert.Empty(t, messages) + require.Empty(t, messages) +} +func TestServer_PublishAt(t *testing.T) { + c := newTestConfig(t) + c.MinDelay = time.Second + c.AtSenderInterval = 100 * time.Millisecond + s := newTestServer(t, c) + + response := request(t, s, "PUT", "/mytopic", "a message", map[string]string{ + "In": "1s", + }) + require.Equal(t, 200, response.Code) + + response = request(t, s, "GET", "/mytopic/json?poll=1", "", nil) + messages := toMessages(t, response.Body.String()) + require.Equal(t, 0, len(messages)) + + time.Sleep(time.Second) + require.Nil(t, s.sendDelayedMessages()) + + response = request(t, s, "GET", "/mytopic/json?poll=1", "", nil) + messages = toMessages(t, response.Body.String()) + require.Equal(t, 1, len(messages)) + require.Equal(t, "a message", messages[0].Message) +} + +func TestServer_PublishAtWithCacheError(t *testing.T) { + s := newTestServer(t, newTestConfig(t)) + + response := request(t, s, "PUT", "/mytopic", "a message", map[string]string{ + "Cache": "no", + "In": "30 min", + }) + require.Equal(t, 400, response.Code) +} + +func TestServer_PublishAtTooShortDelay(t *testing.T) { + s := newTestServer(t, newTestConfig(t)) + + response := request(t, s, "PUT", "/mytopic", "a message", map[string]string{ + "In": "1s", + }) + require.Equal(t, 400, response.Code) +} + +func TestServer_PublishAtTooLongDelay(t *testing.T) { + s := newTestServer(t, newTestConfig(t)) + + response := request(t, s, "PUT", "/mytopic", "a message", map[string]string{ + "In": "99999999h", + }) + require.Equal(t, 400, response.Code) +} + +func TestServer_PublishAtAndPrune(t *testing.T) { + s := newTestServer(t, newTestConfig(t)) + + response := request(t, s, "PUT", "/mytopic", "a message", map[string]string{ + "In": "1h", + }) + require.Equal(t, 200, response.Code) + s.updateStatsAndPrune() // Fire pruning + + response = request(t, s, "GET", "/mytopic/json?poll=1&scheduled=1", "", nil) + messages := toMessages(t, response.Body.String()) + require.Equal(t, 1, len(messages)) // Not affected by pruning + require.Equal(t, "a message", messages[0].Message) } func TestServer_PublishAndMultiPoll(t *testing.T) { @@ -172,29 +254,29 @@ func TestServer_PublishAndMultiPoll(t *testing.T) { response := request(t, s, "PUT", "/mytopic1", "message 1", nil) msg := toMessage(t, response.Body.String()) - assert.NotEmpty(t, msg.ID) - assert.Equal(t, "mytopic1", msg.Topic) - assert.Equal(t, "message 1", msg.Message) + require.NotEmpty(t, msg.ID) + require.Equal(t, "mytopic1", msg.Topic) + require.Equal(t, "message 1", msg.Message) response = request(t, s, "PUT", "/mytopic2", "message 2", nil) msg = toMessage(t, response.Body.String()) - assert.NotEmpty(t, msg.ID) - assert.Equal(t, "mytopic2", msg.Topic) - assert.Equal(t, "message 2", msg.Message) + require.NotEmpty(t, msg.ID) + require.Equal(t, "mytopic2", msg.Topic) + require.Equal(t, "message 2", msg.Message) response = request(t, s, "GET", "/mytopic1/json?poll=1", "", nil) messages := toMessages(t, response.Body.String()) - assert.Equal(t, 1, len(messages)) - assert.Equal(t, "mytopic1", messages[0].Topic) - assert.Equal(t, "message 1", messages[0].Message) + require.Equal(t, 1, len(messages)) + require.Equal(t, "mytopic1", messages[0].Topic) + require.Equal(t, "message 1", messages[0].Message) response = request(t, s, "GET", "/mytopic1,mytopic2/json?poll=1", "", nil) messages = toMessages(t, response.Body.String()) - assert.Equal(t, 2, len(messages)) - assert.Equal(t, "mytopic1", messages[0].Topic) - assert.Equal(t, "message 1", messages[0].Message) - assert.Equal(t, "mytopic2", messages[1].Topic) - assert.Equal(t, "message 2", messages[1].Message) + require.Equal(t, 2, len(messages)) + require.Equal(t, "mytopic1", messages[0].Topic) + require.Equal(t, "message 1", messages[0].Message) + require.Equal(t, "mytopic2", messages[1].Topic) + require.Equal(t, "message 2", messages[1].Message) } func TestServer_PublishWithNopCache(t *testing.T) { @@ -206,18 +288,18 @@ func TestServer_PublishWithNopCache(t *testing.T) { subscribeCancel := subscribe(t, s, "/mytopic/json", subscribeRR) publishRR := request(t, s, "PUT", "/mytopic", "my first message", nil) - assert.Equal(t, 200, publishRR.Code) + require.Equal(t, 200, publishRR.Code) subscribeCancel() messages := toMessages(t, subscribeRR.Body.String()) - assert.Equal(t, 2, len(messages)) - assert.Equal(t, openEvent, messages[0].Event) - assert.Equal(t, messageEvent, messages[1].Event) - assert.Equal(t, "my first message", messages[1].Message) + require.Equal(t, 2, len(messages)) + require.Equal(t, openEvent, messages[0].Event) + require.Equal(t, messageEvent, messages[1].Event) + require.Equal(t, "my first message", messages[1].Message) response := request(t, s, "GET", "/mytopic/json?poll=1", "", nil) messages = toMessages(t, response.Body.String()) - assert.Empty(t, messages) + require.Empty(t, messages) } func newTestConfig(t *testing.T) *config.Config { @@ -278,6 +360,6 @@ func toMessages(t *testing.T, s string) []*message { func toMessage(t *testing.T, s string) *message { var m message - assert.Nil(t, json.NewDecoder(strings.NewReader(s)).Decode(&m)) + require.Nil(t, json.NewDecoder(strings.NewReader(s)).Decode(&m)) return &m } diff --git a/util/time.go b/util/time.go index cd8fa9f..ed68b76 100644 --- a/util/time.go +++ b/util/time.go @@ -73,7 +73,7 @@ func parseUnixTime(s string, now time.Time) (time.Time, error) { } else if int64(t) < now.Unix() { return time.Time{}, errUnparsableTime } - return time.Unix(int64(t), 0), nil + return time.Unix(int64(t), 0).UTC(), nil } func parseNaturalTime(s string, now time.Time) (time.Time, error) { diff --git a/util/time_test.go b/util/time_test.go index 5575a62..9cab504 100644 --- a/util/time_test.go +++ b/util/time_test.go @@ -56,5 +56,5 @@ func TestParseFutureTime_1day(t *testing.T) { func TestParseFutureTime_UnixTime(t *testing.T) { d, err := ParseFutureTime("1639183911", base) require.Nil(t, err) - require.Equal(t, time.Date(2021, 12, 10, 19, 51, 51, 0, time.Local), d) + require.Equal(t, time.Date(2021, 12, 11, 0, 51, 51, 0, time.UTC), d) } From 01d21165e9dad8935b4da639012727dd3df29dbf Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Sat, 11 Dec 2021 00:06:25 -0500 Subject: [PATCH 5/5] Docs docs docs --- docs/config.md | 2 +- docs/publish.md | 83 +++++++++++++++++++++++++++++++++++++++++-- docs/subscribe/api.md | 12 ++++++- server/server.go | 12 +++---- util/time.go | 2 ++ 5 files changed, 101 insertions(+), 10 deletions(-) diff --git a/docs/config.md b/docs/config.md index 1f8a54d..5762562 100644 --- a/docs/config.md +++ b/docs/config.md @@ -32,7 +32,7 @@ You can also entirely disable the cache by setting `cache-duration` to `0`. When passed on to the connected subscribers, but never stored on disk or even kept in memory longer than is needed to forward the message to the subscribers. -Subscribers can retrieve cached messaging using the [`poll=1` parameter](subscribe/api.md#polling), as well as the +Subscribers can retrieve cached messaging using the [`poll=1` parameter](subscribe/api.md#polling-for-messages), as well as the [`since=` parameter](subscribe/api.md#fetching-cached-messages). ## Behind a proxy (TLS, etc.) diff --git a/docs/publish.md b/docs/publish.md index 5ffaaf7..0e4a04d 100644 --- a/docs/publish.md +++ b/docs/publish.md @@ -332,6 +332,85 @@ them with a comma, e.g. `tag1,tag2,tag3`.
Detail view of notifications with tags
+## Scheduled delivery +You can delay the delivery of messages and let ntfy send them at a later date. This can be used to send yourself +reminders or even to execute commands at a later date (if your subscriber acts on messages). + +Usage is pretty straight forward. You can set the delivery time using the `X-Delay` header (or any of its aliases: `Delay`, +`X-At`, `At`, `X-In` or `In`), either by specifying a Unix timestamp (e.g. `1639194738`), a duration (e.g. `30m`, +`3h`, `2 days`), or a natural language time string (e.g. `10am`, `8:30pm`, `tomorrow, 3pm`, `Tuesday, 7am`, +[and more](https://github.com/olebedev/when)). + +As of today, the minimum delay you can set is **10 seconds** and the maximum delay is **3 days**. This can currently +not be configured otherwise ([let me know](https://github.com/binwiederhier/ntfy/issues) if you'd like to change +these limits). + +For the purposes of [message caching](config.md#message-cache), scheduled messages are kept in the cache until 12 hours +after they were delivered (or whatever the server-side cache duration is set to). For instance, if a message is scheduled +to be delivered in 3 days, it'll remain in the cache for 3 days and 12 hours. Also note that naturally, +[turning off server-side caching](#message-caching) is not possible in combination with this feature. + +=== "Command line (curl)" + ``` + curl -H "At: tomorrow, 10am" -d "Good morning" ntfy.sh/hello + curl -H "In: 30min" -d "It's 30 minutes later now" ntfy.sh/reminder + curl -H "Delay: 1639194738" -d "Unix timestamps are awesome" ntfy.sh/itsaunixsystem + ``` + +=== "HTTP" + ``` http + POST /hello HTTP/1.1 + Host: ntfy.sh + At: tomorrow, 10am + + Good morning + ``` + +=== "JavaScript" + ``` javascript + fetch('https://ntfy.sh/hello', { + method: 'POST', + body: 'Good morning', + headers: { 'At': 'tomorrow, 10am' } + }) + ``` + +=== "Go" + ``` go + req, _ := http.NewRequest("POST", "https://ntfy.sh/hello", strings.NewReader("Good morning")) + req.Header.Set("At", "tomorrow, 10am") + http.DefaultClient.Do(req) + ``` + +=== "PHP" + ``` php-inline + file_get_contents('https://ntfy.sh/backups', false, stream_context_create([ + 'http' => [ + 'method' => 'POST', + 'header' => + "Content-Type: text/plain\r\n" . + "At: tomorrow, 10am", + 'content' => 'Good morning' + ] + ])); + ``` + +Here are a few examples (assuming today's date is **12/10/2021, 9am, Eastern Time Zone**): + + + + +
+ + + + + + + +
Delay/At/In headerMessage will be delivered atExplanation
30m12/10/2021, 9:30am30 minutes from now
2 hours12/10/2021, 11:30am2 hours from now
1 day12/11/2021, 9am24 hours from now
10am12/10/2021, 10amToday at 10am (same day, because it's only 9am)
8am12/11/2021, 8amTomorrow at 8am (because it's 9am already)
163915200012/10/2021, 11am (EST) Today at 11am (EST)
+
+ ## Advanced features ### Message caching @@ -347,7 +426,7 @@ client-side network disruptions, but arguably this feature also may raise privac To avoid messages being cached server-side entirely, you can set `X-Cache` header (or its alias: `Cache`) to `no`. This will make sure that your message is not cached on the server, even if server-side caching is enabled. Messages are still delivered to connected subscribers, but [`since=`](subscribe/api.md#fetching-cached-messages) and -[`poll=1`](subscribe/api.md#polling) won't return the message anymore. +[`poll=1`](subscribe/api.md#polling-for-messages) won't return the message anymore. === "Command line (curl)" ``` @@ -393,7 +472,7 @@ are still delivered to connected subscribers, but [`since=`](subscribe/api.md#fe ])); ``` -### Firebase +### Disable Firebase !!! info If `Firebase: no` is used and [instant delivery](subscribe/phone.md#instant-delivery) isn't enabled in the Android app (Google Play variant only), **message delivery will be significantly delayed (up to 15 minutes)**. To overcome diff --git a/docs/subscribe/api.md b/docs/subscribe/api.md index b4449cf..4a0bb85 100644 --- a/docs/subscribe/api.md +++ b/docs/subscribe/api.md @@ -239,7 +239,7 @@ or `all` (all cached messages). curl -s "ntfy.sh/mytopic/json?since=10m" ``` -### Polling +### Polling for messages You can also just poll for messages if you don't like the long-standing connection using the `poll=1` query parameter. The connection will end after all available messages have been read. This parameter can be combined with `since=` (defaults to `since=all`). @@ -248,6 +248,16 @@ combined with `since=` (defaults to `since=all`). curl -s "ntfy.sh/mytopic/json?poll=1" ``` +### Fetching scheduled messages +Messages that are [scheduled to be delivered](../publish.md#scheduled-delivery) at a later date are not typically +returned when subscribing via the API, which makes sense, because after all, the messages have technically not been +delivered yet. To also return scheduled messages from the API, you can use the `scheduled=1` (alias: `sched=1`) +parameter (makes most sense with the `poll=1` parameter): + +``` +curl -s "ntfy.sh/mytopic/json?poll=1&sched=1" +``` + ### Subscribing to multiple topics It's possible to subscribe to multiple topics in one HTTP call by providing a comma-separated list of topics in the URL. This allows you to reduce the number of connections you have to maintain: diff --git a/server/server.go b/server/server.go index b5bd31e..6fadd9d 100644 --- a/server/server.go +++ b/server/server.go @@ -344,20 +344,20 @@ func (s *Server) parseHeaders(header http.Header, m *message) (cache bool, fireb m.Tags = append(m.Tags, strings.TrimSpace(s)) } } - whenStr := readHeader(header, "x-at", "at", "x-in", "in", "x-delay", "delay") - if whenStr != "" { + delayStr := readHeader(header, "x-delay", "delay", "x-at", "at", "x-in", "in") + if delayStr != "" { if !cache { return false, false, errHTTPBadRequest } - at, err := util.ParseFutureTime(whenStr, time.Now()) + delay, err := util.ParseFutureTime(delayStr, time.Now()) if err != nil { return false, false, errHTTPBadRequest - } else if at.Unix() < time.Now().Add(s.config.MinDelay).Unix() { + } else if delay.Unix() < time.Now().Add(s.config.MinDelay).Unix() { return false, false, errHTTPBadRequest - } else if at.Unix() > time.Now().Add(s.config.MaxDelay).Unix() { + } else if delay.Unix() > time.Now().Add(s.config.MaxDelay).Unix() { return false, false, errHTTPBadRequest } - m.Time = at.Unix() + m.Time = delay.Unix() } return cache, firebase, nil } diff --git a/util/time.go b/util/time.go index ed68b76..7050121 100644 --- a/util/time.go +++ b/util/time.go @@ -14,6 +14,8 @@ var ( durationStrRegex = regexp.MustCompile(`(?i)^(\d+)\s*(d|days?|h|hours?|m|mins?|minutes?|s|secs?|seconds?)$`) ) +// ParseFutureTime parses a date/time string to a time.Time. It supports unix timestamps, durations +// and natural language dates func ParseFutureTime(s string, now time.Time) (time.Time, error) { s = strings.TrimSpace(s) t, err := parseUnixTime(s, now)