diff --git a/server/message_cache.go b/server/message_cache.go index 8e2eb06..fc80c4e 100644 --- a/server/message_cache.go +++ b/server/message_cache.go @@ -215,6 +215,22 @@ const ( CREATE INDEX IF NOT EXISTS idx_expires ON messages (expires); CREATE INDEX IF NOT EXISTS idx_attachment_expires ON messages (attachment_expires); ` + migrate9To10UpdateMessageExpiryQuery = `UPDATE messages SET expires = time + ?` +) + +var ( + migrations = map[int]func(db *sql.DB, cacheDuration time.Duration) error{ + 0: migrateFrom0, + 1: migrateFrom1, + 2: migrateFrom2, + 3: migrateFrom3, + 4: migrateFrom4, + 5: migrateFrom5, + 6: migrateFrom6, + 7: migrateFrom7, + 8: migrateFrom8, + 9: migrateFrom9, + } ) type messageCache struct { @@ -224,12 +240,12 @@ type messageCache struct { } // newSqliteCache creates a SQLite file-backed cache -func newSqliteCache(filename, startupQueries string, batchSize int, batchTimeout time.Duration, nop bool) (*messageCache, error) { +func newSqliteCache(filename, startupQueries string, cacheDuration time.Duration, batchSize int, batchTimeout time.Duration, nop bool) (*messageCache, error) { db, err := sql.Open("sqlite3", filename) if err != nil { return nil, err } - if err := setupCacheDB(db, startupQueries); err != nil { + if err := setupDB(db, startupQueries, cacheDuration); err != nil { return nil, err } var queue *util.BatchingQueue[*message] @@ -247,13 +263,13 @@ func newSqliteCache(filename, startupQueries string, batchSize int, batchTimeout // newMemCache creates an in-memory cache func newMemCache() (*messageCache, error) { - return newSqliteCache(createMemoryFilename(), "", 0, 0, false) + return newSqliteCache(createMemoryFilename(), "", 0, 0, 0, false) } // newNopCache creates an in-memory cache that discards all messages; // it is always empty and can be used if caching is entirely disabled func newNopCache() (*messageCache, error) { - return newSqliteCache(createMemoryFilename(), "", 0, 0, true) + return newSqliteCache(createMemoryFilename(), "", 0, 0, 0, true) } // createMemoryFilename creates a unique memory filename to use for the SQLite backend. @@ -637,7 +653,7 @@ func readMessages(rows *sql.Rows) ([]*message, error) { return messages, nil } -func setupCacheDB(db *sql.DB, startupQueries string) error { +func setupDB(db *sql.DB, startupQueries string, cacheDuration time.Duration) error { // Run startup queries if startupQueries != "" { if _, err := db.Exec(startupQueries); err != nil { @@ -669,28 +685,18 @@ func setupCacheDB(db *sql.DB, startupQueries string) error { // Do migrations if schemaVersion == currentSchemaVersion { return nil - } else if schemaVersion == 0 { - return migrateFrom0(db) - } else if schemaVersion == 1 { - return migrateFrom1(db) - } else if schemaVersion == 2 { - return migrateFrom2(db) - } else if schemaVersion == 3 { - return migrateFrom3(db) - } else if schemaVersion == 4 { - return migrateFrom4(db) - } else if schemaVersion == 5 { - return migrateFrom5(db) - } else if schemaVersion == 6 { - return migrateFrom6(db) - } else if schemaVersion == 7 { - return migrateFrom7(db) - } else if schemaVersion == 8 { - return migrateFrom8(db) - } else if schemaVersion == 9 { - return migrateFrom9(db) + } else if schemaVersion > currentSchemaVersion { + return fmt.Errorf("unexpected schema version: version %d is higher than current version %d", schemaVersion, currentSchemaVersion) } - return fmt.Errorf("unexpected schema version found: %d", schemaVersion) + for i := schemaVersion; i < currentSchemaVersion; i++ { + fn, ok := migrations[i] + if !ok { + return fmt.Errorf("cannot find migration step from schema version %d to %d", i, i+1) + } else if err := fn(db, cacheDuration); err != nil { + return err + } + } + return nil } func setupNewCacheDB(db *sql.DB) error { @@ -706,7 +712,7 @@ func setupNewCacheDB(db *sql.DB) error { return nil } -func migrateFrom0(db *sql.DB) error { +func migrateFrom0(db *sql.DB, _ time.Duration) error { log.Info("Migrating cache database schema: from 0 to 1") if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil { return err @@ -717,10 +723,10 @@ func migrateFrom0(db *sql.DB) error { if _, err := db.Exec(insertSchemaVersion, 1); err != nil { return err } - return migrateFrom1(db) + return nil } -func migrateFrom1(db *sql.DB) error { +func migrateFrom1(db *sql.DB, _ time.Duration) error { log.Info("Migrating cache database schema: from 1 to 2") if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil { return err @@ -728,10 +734,10 @@ func migrateFrom1(db *sql.DB) error { if _, err := db.Exec(updateSchemaVersion, 2); err != nil { return err } - return migrateFrom2(db) + return nil } -func migrateFrom2(db *sql.DB) error { +func migrateFrom2(db *sql.DB, _ time.Duration) error { log.Info("Migrating cache database schema: from 2 to 3") if _, err := db.Exec(migrate2To3AlterMessagesTableQuery); err != nil { return err @@ -739,10 +745,10 @@ func migrateFrom2(db *sql.DB) error { if _, err := db.Exec(updateSchemaVersion, 3); err != nil { return err } - return migrateFrom3(db) + return nil } -func migrateFrom3(db *sql.DB) error { +func migrateFrom3(db *sql.DB, _ time.Duration) error { log.Info("Migrating cache database schema: from 3 to 4") if _, err := db.Exec(migrate3To4AlterMessagesTableQuery); err != nil { return err @@ -750,10 +756,10 @@ func migrateFrom3(db *sql.DB) error { if _, err := db.Exec(updateSchemaVersion, 4); err != nil { return err } - return migrateFrom4(db) + return nil } -func migrateFrom4(db *sql.DB) error { +func migrateFrom4(db *sql.DB, _ time.Duration) error { log.Info("Migrating cache database schema: from 4 to 5") if _, err := db.Exec(migrate4To5AlterMessagesTableQuery); err != nil { return err @@ -761,10 +767,10 @@ func migrateFrom4(db *sql.DB) error { if _, err := db.Exec(updateSchemaVersion, 5); err != nil { return err } - return migrateFrom5(db) + return nil } -func migrateFrom5(db *sql.DB) error { +func migrateFrom5(db *sql.DB, _ time.Duration) error { log.Info("Migrating cache database schema: from 5 to 6") if _, err := db.Exec(migrate5To6AlterMessagesTableQuery); err != nil { return err @@ -772,10 +778,10 @@ func migrateFrom5(db *sql.DB) error { if _, err := db.Exec(updateSchemaVersion, 6); err != nil { return err } - return migrateFrom6(db) + return nil } -func migrateFrom6(db *sql.DB) error { +func migrateFrom6(db *sql.DB, _ time.Duration) error { log.Info("Migrating cache database schema: from 6 to 7") if _, err := db.Exec(migrate6To7AlterMessagesTableQuery); err != nil { return err @@ -783,10 +789,10 @@ func migrateFrom6(db *sql.DB) error { if _, err := db.Exec(updateSchemaVersion, 7); err != nil { return err } - return migrateFrom7(db) + return nil } -func migrateFrom7(db *sql.DB) error { +func migrateFrom7(db *sql.DB, _ time.Duration) error { log.Info("Migrating cache database schema: from 7 to 8") if _, err := db.Exec(migrate7To8AlterMessagesTableQuery); err != nil { return err @@ -794,10 +800,10 @@ func migrateFrom7(db *sql.DB) error { if _, err := db.Exec(updateSchemaVersion, 8); err != nil { return err } - return migrateFrom8(db) + return nil } -func migrateFrom8(db *sql.DB) error { +func migrateFrom8(db *sql.DB, _ time.Duration) error { log.Info("Migrating cache database schema: from 8 to 9") if _, err := db.Exec(migrate8To9AlterMessagesTableQuery); err != nil { return err @@ -805,10 +811,10 @@ func migrateFrom8(db *sql.DB) error { if _, err := db.Exec(updateSchemaVersion, 9); err != nil { return err } - return migrateFrom9(db) + return nil } -func migrateFrom9(db *sql.DB) error { +func migrateFrom9(db *sql.DB, cacheDuration time.Duration) error { log.Info("Migrating cache database schema: from 9 to 10") tx, err := db.Begin() if err != nil { @@ -818,7 +824,9 @@ func migrateFrom9(db *sql.DB) error { if _, err := tx.Exec(migrate9To10AlterMessagesTableQuery); err != nil { return err } - // FIXME add logic to set "expires" column + if _, err := tx.Exec(migrate9To10UpdateMessageExpiryQuery, int64(cacheDuration.Seconds())); err != nil { + return err + } if _, err := tx.Exec(updateSchemaVersion, 10); err != nil { return err } diff --git a/server/message_cache_test.go b/server/message_cache_test.go index 5d84592..a15b343 100644 --- a/server/message_cache_test.go +++ b/server/message_cache_test.go @@ -459,12 +459,108 @@ func TestSqliteCache_Migration_From1(t *testing.T) { require.Equal(t, 11, len(messages)) } +func TestSqliteCache_Migration_From9(t *testing.T) { + // This primarily tests the awkward migration that introduces the "expires" column. + // The migration logic has to update the column, using the existing "cache-duration" value. + + filename := newSqliteTestCacheFile(t) + db, err := sql.Open("sqlite3", filename) + require.Nil(t, err) + + // Create "version 8" schema + _, err = db.Exec(` + BEGIN; + CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + mid TEXT NOT NULL, + time INT NOT NULL, + topic TEXT NOT NULL, + message TEXT NOT NULL, + title TEXT NOT NULL, + priority INT NOT NULL, + tags TEXT NOT NULL, + click TEXT NOT NULL, + icon TEXT NOT NULL, + actions TEXT NOT NULL, + attachment_name TEXT NOT NULL, + attachment_type TEXT NOT NULL, + attachment_size INT NOT NULL, + attachment_expires INT NOT NULL, + attachment_url TEXT NOT NULL, + sender TEXT NOT NULL, + encoding TEXT NOT NULL, + published INT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid); + CREATE INDEX IF NOT EXISTS idx_time ON messages (time); + CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic); + CREATE TABLE IF NOT EXISTS schemaVersion ( + id INT PRIMARY KEY, + version INT NOT NULL + ); + INSERT INTO schemaVersion (id, version) VALUES (1, 9); + COMMIT; + `) + require.Nil(t, err) + + // Insert a bunch of messages + insertQuery := ` + INSERT INTO messages (mid, time, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, encoding, published) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ` + for i := 0; i < 10; i++ { + _, err = db.Exec( + insertQuery, + fmt.Sprintf("abcd%d", i), + time.Now().Unix(), + "mytopic", + fmt.Sprintf("some message %d", i), + "", // title + 0, // priority + "", // tags + "", // click + "", // icon + "", // actions + "", // attachment_name + "", // attachment_type + 0, // attachment_size + 0, // attachment_type + "", // attachment_url + "9.9.9.9", // sender + "", // encoding + 1, // published + ) + require.Nil(t, err) + } + + // Create cache to trigger migration + cacheDuration := 17 * time.Hour + c, err := newSqliteCache(filename, "", cacheDuration, 0, 0, false) + checkSchemaVersion(t, c.db) + + // Check version + rows, err := db.Query(`SELECT version FROM main.schemaVersion WHERE id = 1`) + require.Nil(t, err) + require.True(t, rows.Next()) + var version int + require.Nil(t, rows.Scan(&version)) + require.Equal(t, currentSchemaVersion, version) + + messages, err := c.Messages("mytopic", sinceAllMessages, false) + require.Nil(t, err) + require.Equal(t, 10, len(messages)) + for _, m := range messages { + require.True(t, m.Expires > time.Now().Add(cacheDuration-5*time.Second).Unix()) + require.True(t, m.Expires < time.Now().Add(cacheDuration+5*time.Second).Unix()) + } +} + func TestSqliteCache_StartupQueries_WAL(t *testing.T) { filename := newSqliteTestCacheFile(t) startupQueries := `pragma journal_mode = WAL; pragma synchronous = normal; pragma temp_store = memory;` - db, err := newSqliteCache(filename, startupQueries, 0, 0, false) + db, err := newSqliteCache(filename, startupQueries, time.Hour, 0, 0, false) require.Nil(t, err) require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message"))) require.FileExists(t, filename) @@ -475,7 +571,7 @@ pragma temp_store = memory;` func TestSqliteCache_StartupQueries_None(t *testing.T) { filename := newSqliteTestCacheFile(t) startupQueries := "" - db, err := newSqliteCache(filename, startupQueries, 0, 0, false) + db, err := newSqliteCache(filename, startupQueries, time.Hour, 0, 0, false) require.Nil(t, err) require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message"))) require.FileExists(t, filename) @@ -486,7 +582,7 @@ func TestSqliteCache_StartupQueries_None(t *testing.T) { func TestSqliteCache_StartupQueries_Fail(t *testing.T) { filename := newSqliteTestCacheFile(t) startupQueries := `xx error` - _, err := newSqliteCache(filename, startupQueries, 0, 0, false) + _, err := newSqliteCache(filename, startupQueries, time.Hour, 0, 0, false) require.Error(t, err) } @@ -538,7 +634,7 @@ func TestMemCache_NopCache(t *testing.T) { } func newSqliteTestCache(t *testing.T) *messageCache { - c, err := newSqliteCache(newSqliteTestCacheFile(t), "", 0, 0, false) + c, err := newSqliteCache(newSqliteTestCacheFile(t), "", time.Hour, 0, 0, false) if err != nil { t.Fatal(err) } @@ -550,7 +646,7 @@ func newSqliteTestCacheFile(t *testing.T) string { } func newSqliteTestCacheFromFile(t *testing.T, filename, startupQueries string) *messageCache { - c, err := newSqliteCache(filename, startupQueries, 0, 0, false) + c, err := newSqliteCache(filename, startupQueries, time.Hour, 0, 0, false) if err != nil { t.Fatal(err) } diff --git a/server/server.go b/server/server.go index 4e67787..4cc811c 100644 --- a/server/server.go +++ b/server/server.go @@ -41,7 +41,6 @@ import ( purge accounts that were not logged int o in X reset daily Limits for users Make sure account endpoints make sense for admins - add logic to set "expires" column (this is gonna be dirty) UI: - Align size of message bar and upgrade banner - flicker of upgrade banner @@ -199,7 +198,7 @@ func createMessageCache(conf *Config) (*messageCache, error) { if conf.CacheDuration == 0 { return newNopCache() } else if conf.CacheFile != "" { - return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, conf.CacheBatchSize, conf.CacheBatchTimeout, false) + return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, conf.CacheDuration, conf.CacheBatchSize, conf.CacheBatchTimeout, false) } return newMemCache() }