diff --git a/server/cache.go b/server/cache.go index 89db72c..131c06b 100644 --- a/server/cache.go +++ b/server/cache.go @@ -14,7 +14,7 @@ var ( // i.e. message structs with the Event messageEvent. type cache interface { AddMessage(m *message) error - Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) + Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) MessagesDue() ([]*message, error) MessageCount(topic string) (int, error) Topics() (map[string]*topic, error) diff --git a/server/cache_mem.go b/server/cache_mem.go index 96c9831..db090a4 100644 --- a/server/cache_mem.go +++ b/server/cache_mem.go @@ -54,7 +54,7 @@ func (c *memCache) AddMessage(m *message) error { return nil } -func (c *memCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) { +func (c *memCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) { c.mu.Lock() defer c.mu.Unlock() if _, ok := c.messages[topic]; !ok || since.IsNone() { diff --git a/server/cache_sqlite.go b/server/cache_sqlite.go index e53918d..a2d9636 100644 --- a/server/cache_sqlite.go +++ b/server/cache_sqlite.go @@ -15,7 +15,8 @@ const ( createMessagesTableQuery = ` BEGIN; CREATE TABLE IF NOT EXISTS messages ( - id TEXT PRIMARY KEY, + id INTEGER PRIMARY KEY AUTOINCREMENT, + mid TEXT NOT NULL, time INT NOT NULL, topic TEXT NOT NULL, message TEXT NOT NULL, @@ -32,42 +33,59 @@ const ( encoding TEXT NOT NULL, published INT NOT NULL ); + CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid); CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic); COMMIT; ` insertMessageQuery = ` - INSERT INTO messages (id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published) + INSERT INTO messages (mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1` selectMessagesSinceTimeQuery = ` - SELECT id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding + SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding FROM messages WHERE topic = ? AND time >= ? AND published = 1 - ORDER BY time ASC + ORDER BY time, id ` selectMessagesSinceTimeIncludeScheduledQuery = ` - SELECT id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding + SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding FROM messages WHERE topic = ? AND time >= ? - ORDER BY time ASC + ORDER BY time, id + ` + selectMessagesSinceIDQuery = ` + SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding + FROM messages + WHERE topic = ? + AND published = 1 + AND id > (SELECT IFNULL(id,0) FROM messages WHERE mid = ?) + ORDER BY time, id + ` + selectMessagesSinceIDIncludeScheduledQuery = ` + SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding + FROM messages + WHERE topic = ? + AND id > (SELECT IFNULL(id,0) FROM messages WHERE mid = ?) + ORDER BY time, id ` selectMessagesDueQuery = ` - SELECT id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding + SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding FROM messages WHERE time <= ? AND published = 0 + ORDER BY time, id ` - updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE id = ?` + updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?` selectMessagesCountQuery = `SELECT COUNT(*) FROM messages` selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?` selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic` selectAttachmentsSizeQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE attachment_owner = ? AND attachment_expires >= ?` - selectAttachmentsExpiredQuery = `SELECT id FROM messages WHERE attachment_expires > 0 AND attachment_expires < ?` + selectAttachmentsExpiredQuery = `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires < ?` ) // Schema management queries const ( - currentSchemaVersion = 4 + currentSchemaVersion = 5 createSchemaVersionTableQuery = ` CREATE TABLE IF NOT EXISTS schemaVersion ( id INT PRIMARY KEY, @@ -108,6 +126,43 @@ const ( migrate3To4AlterMessagesTableQuery = ` ALTER TABLE messages ADD COLUMN encoding TEXT NOT NULL DEFAULT(''); ` + + // 4 -> 5 + migrate4To5AlterMessagesTableQuery = ` + BEGIN; + CREATE TABLE IF NOT EXISTS messages_new ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + mid TEXT NOT NULL, + time INT NOT NULL, + topic TEXT NOT NULL, + message TEXT NOT NULL, + title TEXT NOT NULL, + priority INT NOT NULL, + tags TEXT NOT NULL, + click TEXT NOT NULL, + attachment_name TEXT NOT NULL, + attachment_type TEXT NOT NULL, + attachment_size INT NOT NULL, + attachment_expires INT NOT NULL, + attachment_url TEXT NOT NULL, + attachment_owner TEXT NOT NULL, + encoding TEXT NOT NULL, + published INT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_mid ON messages_new (mid); + CREATE INDEX IF NOT EXISTS idx_topic ON messages_new (topic); + INSERT + INTO messages_new ( + mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, + attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published) + SELECT + id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, + attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published + FROM messages; + DROP TABLE messages; + ALTER TABLE messages_new RENAME TO messages; + COMMIT; + ` ) type sqliteCache struct { @@ -167,16 +222,24 @@ func (c *sqliteCache) AddMessage(m *message) error { return err } -func (c *sqliteCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) { +func (c *sqliteCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) { if since.IsNone() { return make([]*message, 0), nil } var rows *sql.Rows var err error - if scheduled { - rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix()) + if since.IsID() { + if scheduled { + rows, err = c.db.Query(selectMessagesSinceIDIncludeScheduledQuery, topic, since.ID()) + } else { + rows, err = c.db.Query(selectMessagesSinceIDQuery, topic, since.ID()) + } } else { - rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix()) + if scheduled { + rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix()) + } else { + rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix()) + } } if err != nil { return nil, err @@ -373,6 +436,8 @@ func setupCacheDB(db *sql.DB) error { return migrateFrom2(db) } else if schemaVersion == 3 { return migrateFrom3(db) + } else if schemaVersion == 4 { + return migrateFrom4(db) } return fmt.Errorf("unexpected schema version found: %d", schemaVersion) } @@ -434,5 +499,16 @@ func migrateFrom3(db *sql.DB) error { if _, err := db.Exec(updateSchemaVersion, 4); err != nil { return err } + return migrateFrom4(db) +} + +func migrateFrom4(db *sql.DB) error { + log.Print("Migrating cache database schema: from 4 to 5") + if _, err := db.Exec(migrate4To5AlterMessagesTableQuery); err != nil { + return err + } + if _, err := db.Exec(updateSchemaVersion, 5); err != nil { + return err + } return nil // Update this when a new version is added } diff --git a/server/cache_test.go b/server/cache_test.go index 71ba549..6e5eddb 100644 --- a/server/cache_test.go +++ b/server/cache_test.go @@ -42,7 +42,7 @@ func testCacheMessages(t *testing.T, c cache) { require.Empty(t, messages) // mytopic: since 2 - messages, _ = c.Messages("mytopic", sinceTime(time.Unix(2, 0)), false) + messages, _ = c.Messages("mytopic", newSinceTime(2), false) require.Equal(t, 1, len(messages)) require.Equal(t, "my other message", messages[0].Message) diff --git a/server/server.go b/server/server.go index 88be069..edf6bed 100644 --- a/server/server.go +++ b/server/server.go @@ -805,7 +805,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi return err } -func parseSubscribeParams(r *http.Request) (poll bool, since sinceTime, scheduled bool, filters *queryFilter, err error) { +func parseSubscribeParams(r *http.Request) (poll bool, since sinceMarker, scheduled bool, filters *queryFilter, err error) { poll = readBoolParam(r, false, "x-poll", "poll", "po") scheduled = readBoolParam(r, false, "x-scheduled", "scheduled", "sched") since, err = parseSince(r, poll) @@ -819,7 +819,7 @@ func parseSubscribeParams(r *http.Request) (poll bool, since sinceTime, schedule return } -func (s *Server) sendOldMessages(topics []*topic, since sinceTime, scheduled bool, sub subscriber) error { +func (s *Server) sendOldMessages(topics []*topic, since sinceMarker, scheduled bool, sub subscriber) error { if since.IsNone() { return nil } @@ -841,20 +841,28 @@ func (s *Server) sendOldMessages(topics []*topic, since sinceTime, scheduled boo // // Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h), or // "all" for all messages. -func parseSince(r *http.Request, poll bool) (sinceTime, error) { +func parseSince(r *http.Request, poll bool) (sinceMarker, error) { since := readParam(r, "x-since", "since", "si") + + // Easy cases (empty, all, none) if since == "" { if poll { return sinceAllMessages, nil } return sinceNoMessages, nil - } - if since == "all" { + } else if since == "all" { return sinceAllMessages, nil + } else if since == "none" { + return sinceNoMessages, nil + } + + // ID, timestamp, duration + if validMessageID(since) { + return newSinceID(since), nil } else if s, err := strconv.ParseInt(since, 10, 64); err == nil { - return sinceTime(time.Unix(s, 0)), nil + return newSinceTime(s), nil } else if d, err := time.ParseDuration(since); err == nil { - return sinceTime(time.Now().Add(-1 * d)), nil + return newSinceTime(time.Now().Add(-1 * d).Unix()), nil } return sinceNoMessages, errHTTPBadRequestSinceInvalid } diff --git a/server/types.go b/server/types.go index 0f0c194..9b132e2 100644 --- a/server/types.go +++ b/server/types.go @@ -15,7 +15,7 @@ const ( ) const ( - messageIDLength = 10 + messageIDLength = 12 ) // message represents a message published to a topic @@ -74,23 +74,46 @@ func newDefaultMessage(topic, msg string) *message { return newMessage(messageEvent, topic, msg) } -type sinceTime time.Time +func validMessageID(s string) bool { + return util.ValidRandomString(s, messageIDLength) +} -func (t sinceTime) IsAll() bool { +type sinceMarker struct { + time time.Time + id string +} + +func newSinceTime(timestamp int64) sinceMarker { + return sinceMarker{time.Unix(timestamp, 0), ""} +} + +func newSinceID(id string) sinceMarker { + return sinceMarker{time.Unix(0, 0), id} +} + +func (t sinceMarker) IsAll() bool { return t == sinceAllMessages } -func (t sinceTime) IsNone() bool { +func (t sinceMarker) IsNone() bool { return t == sinceNoMessages } -func (t sinceTime) Time() time.Time { - return time.Time(t) +func (t sinceMarker) IsID() bool { + return t.id != "" +} + +func (t sinceMarker) Time() time.Time { + return t.time +} + +func (t sinceMarker) ID() string { + return t.id } var ( - sinceAllMessages = sinceTime(time.Unix(0, 0)) - sinceNoMessages = sinceTime(time.Unix(1, 0)) + sinceAllMessages = sinceMarker{time.Unix(0, 0), ""} + sinceNoMessages = sinceMarker{time.Unix(1, 0), ""} ) type queryFilter struct { diff --git a/util/util.go b/util/util.go index 47a5877..e05736f 100644 --- a/util/util.go +++ b/util/util.go @@ -88,7 +88,20 @@ func RandomString(length int) string { return string(b) } -// DurationToHuman converts a duration to a human readable format +// ValidRandomString returns true if the given string matches the format created by RandomString +func ValidRandomString(s string, length int) bool { + if len(s) != length { + return false + } + for _, c := range strings.Split(s, "") { + if !strings.Contains(randomStringCharset, c) { + return false + } + } + return true +} + +// DurationToHuman converts a duration to a human-readable format func DurationToHuman(d time.Duration) (str string) { if d == 0 { return "0"