diff --git a/cmd/serve.go b/cmd/serve.go index aff7c7c..ecc4d4a 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -44,6 +44,8 @@ var flagsServe = append( altsrc.NewStringFlag(&cli.StringFlag{Name: "firebase-key-file", Aliases: []string{"firebase_key_file", "F"}, EnvVars: []string{"NTFY_FIREBASE_KEY_FILE"}, Usage: "Firebase credentials file; if set additionally publish to FCM topic"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "cache-file", Aliases: []string{"cache_file", "C"}, EnvVars: []string{"NTFY_CACHE_FILE"}, Usage: "cache file used for message caching"}), altsrc.NewDurationFlag(&cli.DurationFlag{Name: "cache-duration", Aliases: []string{"cache_duration", "b"}, EnvVars: []string{"NTFY_CACHE_DURATION"}, Value: server.DefaultCacheDuration, Usage: "buffer messages for this time to allow `since` requests"}), + altsrc.NewIntFlag(&cli.IntFlag{Name: "cache-batch-size", Aliases: []string{"cache_batch_size"}, EnvVars: []string{"NTFY_BATCH_SIZE"}, Usage: "max size of messages to batch together when writing to message cache (if zero, writes are synchronous)"}), + altsrc.NewDurationFlag(&cli.DurationFlag{Name: "cache-batch-timeout", Aliases: []string{"cache_batch_timeout"}, EnvVars: []string{"NTFY_CACHE_BATCH_TIMEOUT"}, Usage: "timeout for batched async writes to the message cache (if zero, writes are synchronous)"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "cache-startup-queries", Aliases: []string{"cache_startup_queries"}, EnvVars: []string{"NTFY_CACHE_STARTUP_QUERIES"}, Usage: "queries run when the cache database is initialized"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "auth-file", Aliases: []string{"auth_file", "H"}, EnvVars: []string{"NTFY_AUTH_FILE"}, Usage: "auth database file used for access control"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "auth-default-access", Aliases: []string{"auth_default_access", "p"}, EnvVars: []string{"NTFY_AUTH_DEFAULT_ACCESS"}, Value: "read-write", Usage: "default permissions if no matching entries in the auth database are found"}), @@ -110,6 +112,8 @@ func execServe(c *cli.Context) error { cacheFile := c.String("cache-file") cacheDuration := c.Duration("cache-duration") cacheStartupQueries := c.String("cache-startup-queries") + cacheBatchSize := c.Int("cache-batch-size") + cacheBatchTimeout := c.Duration("cache-batch-timeout") authFile := c.String("auth-file") authDefaultAccess := c.String("auth-default-access") attachmentCacheDir := c.String("attachment-cache-dir") @@ -233,6 +237,8 @@ func execServe(c *cli.Context) error { conf.CacheFile = cacheFile conf.CacheDuration = cacheDuration conf.CacheStartupQueries = cacheStartupQueries + conf.CacheBatchSize = cacheBatchSize + conf.CacheBatchTimeout = cacheBatchTimeout conf.AuthFile = authFile conf.AuthDefaultRead = authDefaultRead conf.AuthDefaultWrite = authDefaultWrite diff --git a/docs/config.md b/docs/config.md index 655b56c..a127a5a 100644 --- a/docs/config.md +++ b/docs/config.md @@ -825,19 +825,27 @@ out [this discussion on Reddit](https://www.reddit.com/r/golang/comments/r9u4ee/ Depending on *how you run it*, here are a few limits that are relevant: -### WAL for message cache +### Message cache By default, the [message cache](#message-cache) (defined by `cache-file`) uses the SQLite default settings, which means it syncs to disk on every write. For personal servers, this is perfectly adequate. For larger installations, such as ntfy.sh, the [write-ahead log (WAL)](https://sqlite.org/wal.html) should be enabled, and the sync mode should be adjusted. See [this article](https://phiresky.github.io/blog/2020/sqlite-performance-tuning/) for details. +In addition to that, for very high load servers (such as ntfy.sh), it may be beneficial to write messages to the cache +in batches, and asynchronously. This can be enabled with the `cache-batch-size` and `cache-batch-timeout`. If you start +seeing `database locked` messages in the logs, you should probably enable that. + Here's how ntfy.sh has been tuned in the `server.yml` file: ``` yaml +cache-batch-size: 25 +cache-batch-timeout: "1s" cache-startup-queries: | pragma journal_mode = WAL; pragma synchronous = normal; pragma temp_store = memory; + pragma busy_timeout = 15000; + vacuum; ``` ### For systemd services @@ -990,6 +998,8 @@ variable before running the `ntfy` command (e.g. `export NTFY_LISTEN_HTTP=:80`). | `cache-file` | `NTFY_CACHE_FILE` | *filename* | - | If set, messages are cached in a local SQLite database instead of only in-memory. This allows for service restarts without losing messages in support of the since= parameter. See [message cache](#message-cache). | | `cache-duration` | `NTFY_CACHE_DURATION` | *duration* | 12h | Duration for which messages will be buffered before they are deleted. This is required to support the `since=...` and `poll=1` parameter. Set this to `0` to disable the cache entirely. | | `cache-startup-queries` | `NTFY_CACHE_STARTUP_QUERIES` | *string (SQL queries)* | - | SQL queries to run during database startup; this is useful for tuning and [enabling WAL mode](#wal-for-message-cache) | +| `cache-batch-size` | `NTFY_CACHE_BATCH_SIZE` | *int* | 0 | Max size of messages to batch together when writing to message cache (if zero, writes are synchronous) | +| `cache-batch-timeout` | `NTFY_CACHE_BATCH_TIMEOUT` | *duration* | 0s | Timeout for batched async writes to the message cache (if zero, writes are synchronous) | | `auth-file` | `NTFY_AUTH_FILE` | *filename* | - | Auth database file used for access control. If set, enables authentication and access control. See [access control](#access-control). | | `auth-default-access` | `NTFY_AUTH_DEFAULT_ACCESS` | `read-write`, `read-only`, `write-only`, `deny-all` | `read-write` | Default permissions if no matching entries in the auth database are found. Default is `read-write`. | | `behind-proxy` | `NTFY_BEHIND_PROXY` | *bool* | false | If set, the X-Forwarded-For header is used to determine the visitor IP address instead of the remote address of the connection. | @@ -1054,6 +1064,8 @@ OPTIONS: --behind-proxy, --behind_proxy, -P if set, use X-Forwarded-For header to determine visitor IP address (for rate limiting) (default: false) [$NTFY_BEHIND_PROXY] --cache-duration since, --cache_duration since, -b since buffer messages for this time to allow since requests (default: 12h0m0s) [$NTFY_CACHE_DURATION] --cache-file value, --cache_file value, -C value cache file used for message caching [$NTFY_CACHE_FILE] + --cache-batch-size value, --cache_batch_size value max size of messages to batch together when writing to message cache (if zero, writes are synchronous) (default: 0) [$NTFY_BATCH_SIZE] + --cache-batch-timeout value, --cache_batch_timeout value timeout for batched async writes to the message cache (if zero, writes are synchronous) (default: 0s) [$NTFY_CACHE_BATCH_TIMEOUT] --cache-startup-queries value, --cache_startup_queries value queries run when the cache database is initialized [$NTFY_CACHE_STARTUP_QUERIES] --cert-file value, --cert_file value, -E value certificate file, if listen-https is set [$NTFY_CERT_FILE] --config value, -c value config file (default: /etc/ntfy/server.yml) [$NTFY_CONFIG_FILE] diff --git a/docs/releases.md b/docs/releases.md index f5fc9a4..e662fae 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -14,6 +14,10 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release ## ntfy server v1.30.0 (UNRELREASED) +**Features:** + +* High-load servers: Allow asynchronous batch-writing of messages to cache via `cache-batch-*` options ([#498](https://github.com/binwiederhier/ntfy/issues/498)/[#502](https://github.com/binwiederhier/ntfy/pull/502)) + **Documentation:** * GitHub Actions example ([#492](https://github.com/binwiederhier/ntfy/pull/492), thanks to [@ksurl](https://github.com/ksurl)) @@ -22,6 +26,7 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release **Other things:** * Put ntfy.sh docs on GitHub pages to reduce AWS outbound traffic cost ([#491](https://github.com/binwiederhier/ntfy/issues/491)) +* The ntfy.sh server hardware was upgraded to a bigger box. If you'd like to help out carrying the server cost, **[sponsorships and donations](https://github.com/sponsors/binwiederhier)** 💸 would be very much appreciated ## ntfy server v1.29.0 Released November 12, 2022 diff --git a/server/config.go b/server/config.go index d8fd429..1e2b517 100644 --- a/server/config.go +++ b/server/config.go @@ -61,6 +61,8 @@ type Config struct { CacheFile string CacheDuration time.Duration CacheStartupQueries string + CacheBatchSize int + CacheBatchTimeout time.Duration AuthFile string AuthDefaultRead bool AuthDefaultWrite bool @@ -114,6 +116,8 @@ func NewConfig() *Config { FirebaseKeyFile: "", CacheFile: "", CacheDuration: DefaultCacheDuration, + CacheBatchSize: 0, + CacheBatchTimeout: 0, AuthFile: "", AuthDefaultRead: true, AuthDefaultWrite: true, diff --git a/server/message_cache.go b/server/message_cache.go index f443339..bce9422 100644 --- a/server/message_cache.go +++ b/server/message_cache.go @@ -44,6 +44,7 @@ const ( published INT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid); + CREATE INDEX IF NOT EXISTS idx_time ON messages (time); CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic); COMMIT; ` @@ -92,7 +93,7 @@ const ( // Schema management queries const ( - currentSchemaVersion = 8 + currentSchemaVersion = 9 createSchemaVersionTableQuery = ` CREATE TABLE IF NOT EXISTS schemaVersion ( id INT PRIMARY KEY, @@ -185,15 +186,21 @@ const ( migrate7To8AlterMessagesTableQuery = ` ALTER TABLE messages ADD COLUMN icon TEXT NOT NULL DEFAULT(''); ` + + // 8 -> 9 + migrate8To9AlterMessagesTableQuery = ` + CREATE INDEX IF NOT EXISTS idx_time ON messages (time); + ` ) type messageCache struct { - db *sql.DB - nop bool + db *sql.DB + queue *util.BatchingQueue[*message] + nop bool } // newSqliteCache creates a SQLite file-backed cache -func newSqliteCache(filename, startupQueries string, nop bool) (*messageCache, error) { +func newSqliteCache(filename, startupQueries string, batchSize int, batchTimeout time.Duration, nop bool) (*messageCache, error) { db, err := sql.Open("sqlite3", filename) if err != nil { return nil, err @@ -201,21 +208,28 @@ func newSqliteCache(filename, startupQueries string, nop bool) (*messageCache, e if err := setupCacheDB(db, startupQueries); err != nil { return nil, err } - return &messageCache{ - db: db, - nop: nop, - }, nil + var queue *util.BatchingQueue[*message] + if batchSize > 0 || batchTimeout > 0 { + queue = util.NewBatchingQueue[*message](batchSize, batchTimeout) + } + cache := &messageCache{ + db: db, + queue: queue, + nop: nop, + } + go cache.processMessageBatches() + return cache, nil } // newMemCache creates an in-memory cache func newMemCache() (*messageCache, error) { - return newSqliteCache(createMemoryFilename(), "", false) + return newSqliteCache(createMemoryFilename(), "", 0, 0, false) } // newNopCache creates an in-memory cache that discards all messages; // it is always empty and can be used if caching is entirely disabled func newNopCache() (*messageCache, error) { - return newSqliteCache(createMemoryFilename(), "", true) + return newSqliteCache(createMemoryFilename(), "", 0, 0, true) } // createMemoryFilename creates a unique memory filename to use for the SQLite backend. @@ -228,14 +242,23 @@ func createMemoryFilename() string { return fmt.Sprintf("file:%s?mode=memory&cache=shared", util.RandomString(10)) } +// AddMessage stores a message to the message cache synchronously, or queues it to be stored at a later date asyncronously. +// The message is queued only if "batchSize" or "batchTimeout" are passed to the constructor. func (c *messageCache) AddMessage(m *message) error { + if c.queue != nil { + c.queue.Enqueue(m) + return nil + } return c.addMessages([]*message{m}) } +// addMessages synchronously stores a match of messages. If the database is locked, the transaction waits until +// SQLite's busy_timeout is exceeded before erroring out. func (c *messageCache) addMessages(ms []*message) error { if c.nop { return nil } + start := time.Now() tx, err := c.db.Begin() if err != nil { return err @@ -289,7 +312,12 @@ func (c *messageCache) addMessages(ms []*message) error { return err } } - return tx.Commit() + if err := tx.Commit(); err != nil { + log.Error("Cache: Writing %d message(s) failed (took %v)", len(ms), time.Since(start)) + return err + } + log.Debug("Cache: Wrote %d message(s) in %v", len(ms), time.Since(start)) + return nil } func (c *messageCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) { @@ -395,8 +423,12 @@ func (c *messageCache) Topics() (map[string]*topic, error) { } func (c *messageCache) Prune(olderThan time.Time) error { - _, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix()) - return err + start := time.Now() + if _, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix()); err != nil { + log.Warn("Cache: Pruning failed (after %v): %s", time.Since(start), err.Error()) + } + log.Debug("Cache: Pruning successful (took %v)", time.Since(start)) + return nil } func (c *messageCache) AttachmentBytesUsed(sender string) (int64, error) { @@ -417,6 +449,17 @@ func (c *messageCache) AttachmentBytesUsed(sender string) (int64, error) { return size, nil } +func (c *messageCache) processMessageBatches() { + if c.queue == nil { + return + } + for messages := range c.queue.Dequeue() { + if err := c.addMessages(messages); err != nil { + log.Error("Cache: %s", err.Error()) + } + } +} + func readMessages(rows *sql.Rows) ([]*message, error) { defer rows.Close() messages := make([]*message, 0) @@ -542,6 +585,8 @@ func setupCacheDB(db *sql.DB, startupQueries string) error { return migrateFrom6(db) } else if schemaVersion == 7 { return migrateFrom7(db) + } else if schemaVersion == 8 { + return migrateFrom8(db) } return fmt.Errorf("unexpected schema version found: %d", schemaVersion) } @@ -647,5 +692,16 @@ func migrateFrom7(db *sql.DB) error { if _, err := db.Exec(updateSchemaVersion, 8); err != nil { return err } + return migrateFrom8(db) +} + +func migrateFrom8(db *sql.DB) error { + log.Info("Migrating cache database schema: from 8 to 9") + if _, err := db.Exec(migrate8To9AlterMessagesTableQuery); err != nil { + return err + } + if _, err := db.Exec(updateSchemaVersion, 9); err != nil { + return err + } return nil // Update this when a new version is added } diff --git a/server/message_cache_test.go b/server/message_cache_test.go index c72debc..c3b7305 100644 --- a/server/message_cache_test.go +++ b/server/message_cache_test.go @@ -450,7 +450,7 @@ func TestSqliteCache_StartupQueries_WAL(t *testing.T) { startupQueries := `pragma journal_mode = WAL; pragma synchronous = normal; pragma temp_store = memory;` - db, err := newSqliteCache(filename, startupQueries, false) + db, err := newSqliteCache(filename, startupQueries, 0, 0, false) require.Nil(t, err) require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message"))) require.FileExists(t, filename) @@ -461,7 +461,7 @@ pragma temp_store = memory;` func TestSqliteCache_StartupQueries_None(t *testing.T) { filename := newSqliteTestCacheFile(t) startupQueries := "" - db, err := newSqliteCache(filename, startupQueries, false) + db, err := newSqliteCache(filename, startupQueries, 0, 0, false) require.Nil(t, err) require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message"))) require.FileExists(t, filename) @@ -472,7 +472,7 @@ func TestSqliteCache_StartupQueries_None(t *testing.T) { func TestSqliteCache_StartupQueries_Fail(t *testing.T) { filename := newSqliteTestCacheFile(t) startupQueries := `xx error` - _, err := newSqliteCache(filename, startupQueries, false) + _, err := newSqliteCache(filename, startupQueries, 0, 0, false) require.Error(t, err) } @@ -501,7 +501,7 @@ func TestMemCache_NopCache(t *testing.T) { } func newSqliteTestCache(t *testing.T) *messageCache { - c, err := newSqliteCache(newSqliteTestCacheFile(t), "", false) + c, err := newSqliteCache(newSqliteTestCacheFile(t), "", 0, 0, false) if err != nil { t.Fatal(err) } @@ -513,7 +513,7 @@ func newSqliteTestCacheFile(t *testing.T) string { } func newSqliteTestCacheFromFile(t *testing.T, filename, startupQueries string) *messageCache { - c, err := newSqliteCache(filename, startupQueries, false) + c, err := newSqliteCache(filename, startupQueries, 0, 0, false) if err != nil { t.Fatal(err) } diff --git a/server/server.go b/server/server.go index ef09100..fe729b1 100644 --- a/server/server.go +++ b/server/server.go @@ -159,7 +159,7 @@ func createMessageCache(conf *Config) (*messageCache, error) { if conf.CacheDuration == 0 { return newNopCache() } else if conf.CacheFile != "" { - return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, false) + return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, conf.CacheBatchSize, conf.CacheBatchTimeout, false) } return newMemCache() } @@ -491,6 +491,7 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes log.Debug("%s Message delayed, will process later", logMessagePrefix(v, m)) } if cache { + log.Debug("%s Adding message to cache", logMessagePrefix(v, m)) if err := s.messageCache.AddMessage(m); err != nil { return nil, err } diff --git a/server/server.yml b/server/server.yml index 9476478..1b26899 100644 --- a/server/server.yml +++ b/server/server.yml @@ -53,6 +53,12 @@ # pragma journal_mode = WAL; # pragma synchronous = normal; # pragma temp_store = memory; +# pragma busy_timeout = 15000; +# vacuum; +# +# The "cache-batch-size" and "cache-batch-timeout" parameter allow enabling async batch writing +# of messages. If set, messages will be queued and written to the database in batches of the given +# size, or after the given timeout. This is only required for high volume servers. # # Debian/RPM package users: # Use /var/cache/ntfy/cache.db as cache file to avoid permission issues. The package @@ -65,6 +71,8 @@ # cache-file: # cache-duration: "12h" # cache-startup-queries: +# cache-batch-size: 0 +# cache-batch-timeout: "0ms" # If set, access to the ntfy server and API can be controlled on a granular level using # the 'ntfy user' and 'ntfy access' commands. See the --help pages for details, or check the docs. diff --git a/util/batching_queue.go b/util/batching_queue.go new file mode 100644 index 0000000..85ba9be --- /dev/null +++ b/util/batching_queue.go @@ -0,0 +1,86 @@ +package util + +import ( + "sync" + "time" +) + +// BatchingQueue is a queue that creates batches of the enqueued elements based on a +// max batch size and a batch timeout. +// +// Example: +// +// q := NewBatchingQueue[int](2, 500 * time.Millisecond) +// go func() { +// for batch := range q.Dequeue() { +// fmt.Println(batch) +// } +// }() +// q.Enqueue(1) +// q.Enqueue(2) +// q.Enqueue(3) +// time.Sleep(time.Second) +// +// This example will emit batch [1, 2] immediately (because the batch size is 2), and +// a batch [3] after 500ms. +type BatchingQueue[T any] struct { + batchSize int + timeout time.Duration + in []T + out chan []T + mu sync.Mutex +} + +// NewBatchingQueue creates a new BatchingQueue +func NewBatchingQueue[T any](batchSize int, timeout time.Duration) *BatchingQueue[T] { + q := &BatchingQueue[T]{ + batchSize: batchSize, + timeout: timeout, + in: make([]T, 0), + out: make(chan []T), + } + go q.timeoutTicker() + return q +} + +// Enqueue enqueues an element to the queue. If the configured batch size is reached, +// the batch will be emitted immediately. +func (q *BatchingQueue[T]) Enqueue(element T) { + q.mu.Lock() + q.in = append(q.in, element) + var elements []T + if len(q.in) == q.batchSize { + elements = q.dequeueAll() + } + q.mu.Unlock() + if len(elements) > 0 { + q.out <- elements + } +} + +// Dequeue returns a channel emitting batches of elements +func (q *BatchingQueue[T]) Dequeue() <-chan []T { + return q.out +} + +func (q *BatchingQueue[T]) dequeueAll() []T { + elements := make([]T, len(q.in)) + copy(elements, q.in) + q.in = q.in[:0] + return elements +} + +func (q *BatchingQueue[T]) timeoutTicker() { + if q.timeout == 0 { + return + } + ticker := time.NewTicker(q.timeout) + for range ticker.C { + q.mu.Lock() + elements := q.dequeueAll() + q.mu.Unlock() + if len(elements) > 0 { + q.out <- elements + } + } +} diff --git a/util/batching_queue_test.go b/util/batching_queue_test.go new file mode 100644 index 0000000..b3c41a4 --- /dev/null +++ b/util/batching_queue_test.go @@ -0,0 +1,58 @@ +package util_test + +import ( + "github.com/stretchr/testify/require" + "heckel.io/ntfy/util" + "math/rand" + "sync" + "testing" + "time" +) + +func TestBatchingQueue_InfTimeout(t *testing.T) { + q := util.NewBatchingQueue[int](25, 1*time.Hour) + batches, total := make([][]int, 0), 0 + var mu sync.Mutex + go func() { + for batch := range q.Dequeue() { + mu.Lock() + batches = append(batches, batch) + total += len(batch) + mu.Unlock() + } + }() + for i := 0; i < 101; i++ { + go q.Enqueue(i) + } + time.Sleep(time.Second) + mu.Lock() + require.Equal(t, 100, total) // One is missing, stuck in the last batch! + require.Equal(t, 4, len(batches)) + mu.Unlock() +} + +func TestBatchingQueue_WithTimeout(t *testing.T) { + q := util.NewBatchingQueue[int](25, 100*time.Millisecond) + batches, total := make([][]int, 0), 0 + var mu sync.Mutex + go func() { + for batch := range q.Dequeue() { + mu.Lock() + batches = append(batches, batch) + total += len(batch) + mu.Unlock() + } + }() + for i := 0; i < 101; i++ { + go func(i int) { + time.Sleep(time.Duration(rand.Intn(700)) * time.Millisecond) + q.Enqueue(i) + }(i) + } + time.Sleep(time.Second) + mu.Lock() + require.Equal(t, 101, total) + require.True(t, len(batches) > 4) // 101/25 + require.True(t, len(batches) < 21) + mu.Unlock() +}