From b74defef14099d68b835f58844ee7fe97e6c4932 Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Wed, 22 Jun 2022 20:17:47 -0400 Subject: [PATCH] Enable WAL mode, add changelog --- docs/releases.md | 6 +++ server/message_cache.go | 107 ++++++++++++++++++++++++++-------------- server/server_test.go | 20 ++++++-- 3 files changed, 90 insertions(+), 43 deletions(-) diff --git a/docs/releases.md b/docs/releases.md index 1d6246a..9998bc7 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -6,8 +6,14 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release ## ntfy server v1.27.0 (UNRELEASED) +!!! info + The message cache database (typically `cache.db`) now enables the write-ahead log (WAL) in SQLite. + WAL mode will create two additional files (`cache.db-wal` and `cache.db-shm`). This is perfectly normal. + Do not delete or modify these files, as that can lead to database corruption. + **Features:** +* Greatly improve SQLite performance for the message cache by enabling WAL mode (no ticket) * ntfy CLI can now [wait for a command or PID](https://ntfy.sh/docs/subscribe/cli/#wait-for-pidcommand) before publishing ([#263](https://github.com/binwiederhier/ntfy/issues/263), thanks to the [original ntfy](https://github.com/dschep/ntfy) for the idea) * Trace: Log entire HTTP request to simplify debugging (no ticket) * Allow setting user password via `NTFY_PASSWORD` env variable ([#327](https://github.com/binwiederhier/ntfy/pull/327), thanks to [@Kenix3](https://github.com/Kenix3)) diff --git a/server/message_cache.go b/server/message_cache.go index d6024c8..b1d427f 100644 --- a/server/message_cache.go +++ b/server/message_cache.go @@ -88,6 +88,18 @@ const ( selectAttachmentsExpiredQuery = `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires < ?` ) +// Performance & setup queries (see https://phiresky.github.io/blog/2020/sqlite-performance-tuning/) +// - Write-ahead log (speeds up reads) +// - Only sync on WAL checkpoint +// - Temporary indices in memory +const ( + setupQueries = ` + pragma journal_mode = WAL; + pragma synchronous = normal; + pragma temp_store = memory; + ` +) + // Schema management queries const ( currentSchemaVersion = 7 @@ -222,52 +234,66 @@ func createMemoryFilename() string { } func (c *messageCache) AddMessage(m *message) error { - if m.Event != messageEvent { - return errUnexpectedMessageType - } + return c.addMessages([]*message{m}) +} + +func (c *messageCache) addMessages(ms []*message) error { if c.nop { return nil } - published := m.Time <= time.Now().Unix() - tags := strings.Join(m.Tags, ",") - var attachmentName, attachmentType, attachmentURL string - var attachmentSize, attachmentExpires int64 - if m.Attachment != nil { - attachmentName = m.Attachment.Name - attachmentType = m.Attachment.Type - attachmentSize = m.Attachment.Size - attachmentExpires = m.Attachment.Expires - attachmentURL = m.Attachment.URL + tx, err := c.db.Begin() + if err != nil { + return err } - var actionsStr string - if len(m.Actions) > 0 { - actionsBytes, err := json.Marshal(m.Actions) + defer tx.Rollback() + for _, m := range ms { + if m.Event != messageEvent { + return errUnexpectedMessageType + } + published := m.Time <= time.Now().Unix() + tags := strings.Join(m.Tags, ",") + var attachmentName, attachmentType, attachmentURL string + var attachmentSize, attachmentExpires int64 + if m.Attachment != nil { + attachmentName = m.Attachment.Name + attachmentType = m.Attachment.Type + attachmentSize = m.Attachment.Size + attachmentExpires = m.Attachment.Expires + attachmentURL = m.Attachment.URL + } + var actionsStr string + if len(m.Actions) > 0 { + actionsBytes, err := json.Marshal(m.Actions) + if err != nil { + return err + } + actionsStr = string(actionsBytes) + } + _, err := tx.Exec( + insertMessageQuery, + m.ID, + m.Time, + m.Topic, + m.Message, + m.Title, + m.Priority, + tags, + m.Click, + actionsStr, + attachmentName, + attachmentType, + attachmentSize, + attachmentExpires, + attachmentURL, + m.Sender, + m.Encoding, + published, + ) if err != nil { return err } - actionsStr = string(actionsBytes) } - _, err := c.db.Exec( - insertMessageQuery, - m.ID, - m.Time, - m.Topic, - m.Message, - m.Title, - m.Priority, - tags, - m.Click, - actionsStr, - attachmentName, - attachmentType, - attachmentSize, - attachmentExpires, - attachmentURL, - m.Sender, - m.Encoding, - published, - ) - return err + return tx.Commit() } func (c *messageCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) { @@ -486,6 +512,11 @@ func readMessages(rows *sql.Rows) ([]*message, error) { } func setupCacheDB(db *sql.DB) error { + // Performance: WAL mode, only sync on WAL checkpoints + if _, err := db.Exec(setupQueries); err != nil { + return err + } + // If 'messages' table does not exist, this must be a new database rowsMC, err := db.Query(selectMessagesCountQuery) if err != nil { diff --git a/server/server_test.go b/server/server_test.go index 54fef13..d3f25bd 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -6,6 +6,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "github.com/stretchr/testify/assert" "io" "log" "math/rand" @@ -1411,15 +1412,22 @@ func TestServer_Visitor_XForwardedFor_Multiple(t *testing.T) { } func TestServer_PublishWhileUpdatingStatsWithLotsOfMessages(t *testing.T) { - count := 1000 - s := newTestServer(t, newTestConfig(t)) + count := 50000 + c := newTestConfig(t) + c.TotalTopicLimit = 50001 + s := newTestServer(t, c) // Add lots of messages log.Printf("Adding %d messages", count) start := time.Now() + messages := make([]*message, 0) for i := 0; i < count; i++ { - require.Nil(t, s.messageCache.AddMessage(newDefaultMessage(fmt.Sprintf("topic%d", i), "some message"))) + topicID := fmt.Sprintf("topic%d", i) + _, err := s.topicsFromIDs(topicID) // Add topic to internal s.topics array + require.Nil(t, err) + messages = append(messages, newDefaultMessage(topicID, "some message")) } + require.Nil(t, s.messageCache.addMessages(messages)) log.Printf("Done: Adding %d messages; took %s", count, time.Since(start).Round(time.Millisecond)) // Update stats @@ -1438,11 +1446,13 @@ func TestServer_PublishWhileUpdatingStatsWithLotsOfMessages(t *testing.T) { start = time.Now() response := request(t, s, "PUT", "/mytopic", "some body", nil) m := toMessage(t, response.Body.String()) - require.Equal(t, "some body", m.Message) - require.True(t, time.Since(start) < 500*time.Millisecond) + assert.Equal(t, "some body", m.Message) + assert.True(t, time.Since(start) < 100*time.Millisecond) log.Printf("Done: Publishing message; took %s", time.Since(start).Round(time.Millisecond)) + // Wait for all goroutines <-statsChan + log.Printf("Done: Waiting for all locks") } func newTestConfig(t *testing.T) *Config {