Enable WAL mode, add changelog
This commit is contained in:
parent
3334d84861
commit
b74defef14
3 changed files with 90 additions and 43 deletions
|
@ -6,8 +6,14 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release
|
||||||
|
|
||||||
## ntfy server v1.27.0 (UNRELEASED)
|
## ntfy server v1.27.0 (UNRELEASED)
|
||||||
|
|
||||||
|
!!! info
|
||||||
|
The message cache database (typically `cache.db`) now enables the write-ahead log (WAL) in SQLite.
|
||||||
|
WAL mode will create two additional files (`cache.db-wal` and `cache.db-shm`). This is perfectly normal.
|
||||||
|
Do not delete or modify these files, as that can lead to database corruption.
|
||||||
|
|
||||||
**Features:**
|
**Features:**
|
||||||
|
|
||||||
|
* Greatly improve SQLite performance for the message cache by enabling WAL mode (no ticket)
|
||||||
* ntfy CLI can now [wait for a command or PID](https://ntfy.sh/docs/subscribe/cli/#wait-for-pidcommand) before publishing ([#263](https://github.com/binwiederhier/ntfy/issues/263), thanks to the [original ntfy](https://github.com/dschep/ntfy) for the idea)
|
* ntfy CLI can now [wait for a command or PID](https://ntfy.sh/docs/subscribe/cli/#wait-for-pidcommand) before publishing ([#263](https://github.com/binwiederhier/ntfy/issues/263), thanks to the [original ntfy](https://github.com/dschep/ntfy) for the idea)
|
||||||
* Trace: Log entire HTTP request to simplify debugging (no ticket)
|
* Trace: Log entire HTTP request to simplify debugging (no ticket)
|
||||||
* Allow setting user password via `NTFY_PASSWORD` env variable ([#327](https://github.com/binwiederhier/ntfy/pull/327), thanks to [@Kenix3](https://github.com/Kenix3))
|
* Allow setting user password via `NTFY_PASSWORD` env variable ([#327](https://github.com/binwiederhier/ntfy/pull/327), thanks to [@Kenix3](https://github.com/Kenix3))
|
||||||
|
|
|
@ -88,6 +88,18 @@ const (
|
||||||
selectAttachmentsExpiredQuery = `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires < ?`
|
selectAttachmentsExpiredQuery = `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires < ?`
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Performance & setup queries (see https://phiresky.github.io/blog/2020/sqlite-performance-tuning/)
|
||||||
|
// - Write-ahead log (speeds up reads)
|
||||||
|
// - Only sync on WAL checkpoint
|
||||||
|
// - Temporary indices in memory
|
||||||
|
const (
|
||||||
|
setupQueries = `
|
||||||
|
pragma journal_mode = WAL;
|
||||||
|
pragma synchronous = normal;
|
||||||
|
pragma temp_store = memory;
|
||||||
|
`
|
||||||
|
)
|
||||||
|
|
||||||
// Schema management queries
|
// Schema management queries
|
||||||
const (
|
const (
|
||||||
currentSchemaVersion = 7
|
currentSchemaVersion = 7
|
||||||
|
@ -222,52 +234,66 @@ func createMemoryFilename() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *messageCache) AddMessage(m *message) error {
|
func (c *messageCache) AddMessage(m *message) error {
|
||||||
if m.Event != messageEvent {
|
return c.addMessages([]*message{m})
|
||||||
return errUnexpectedMessageType
|
}
|
||||||
}
|
|
||||||
|
func (c *messageCache) addMessages(ms []*message) error {
|
||||||
if c.nop {
|
if c.nop {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
published := m.Time <= time.Now().Unix()
|
tx, err := c.db.Begin()
|
||||||
tags := strings.Join(m.Tags, ",")
|
if err != nil {
|
||||||
var attachmentName, attachmentType, attachmentURL string
|
return err
|
||||||
var attachmentSize, attachmentExpires int64
|
|
||||||
if m.Attachment != nil {
|
|
||||||
attachmentName = m.Attachment.Name
|
|
||||||
attachmentType = m.Attachment.Type
|
|
||||||
attachmentSize = m.Attachment.Size
|
|
||||||
attachmentExpires = m.Attachment.Expires
|
|
||||||
attachmentURL = m.Attachment.URL
|
|
||||||
}
|
}
|
||||||
var actionsStr string
|
defer tx.Rollback()
|
||||||
if len(m.Actions) > 0 {
|
for _, m := range ms {
|
||||||
actionsBytes, err := json.Marshal(m.Actions)
|
if m.Event != messageEvent {
|
||||||
|
return errUnexpectedMessageType
|
||||||
|
}
|
||||||
|
published := m.Time <= time.Now().Unix()
|
||||||
|
tags := strings.Join(m.Tags, ",")
|
||||||
|
var attachmentName, attachmentType, attachmentURL string
|
||||||
|
var attachmentSize, attachmentExpires int64
|
||||||
|
if m.Attachment != nil {
|
||||||
|
attachmentName = m.Attachment.Name
|
||||||
|
attachmentType = m.Attachment.Type
|
||||||
|
attachmentSize = m.Attachment.Size
|
||||||
|
attachmentExpires = m.Attachment.Expires
|
||||||
|
attachmentURL = m.Attachment.URL
|
||||||
|
}
|
||||||
|
var actionsStr string
|
||||||
|
if len(m.Actions) > 0 {
|
||||||
|
actionsBytes, err := json.Marshal(m.Actions)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
actionsStr = string(actionsBytes)
|
||||||
|
}
|
||||||
|
_, err := tx.Exec(
|
||||||
|
insertMessageQuery,
|
||||||
|
m.ID,
|
||||||
|
m.Time,
|
||||||
|
m.Topic,
|
||||||
|
m.Message,
|
||||||
|
m.Title,
|
||||||
|
m.Priority,
|
||||||
|
tags,
|
||||||
|
m.Click,
|
||||||
|
actionsStr,
|
||||||
|
attachmentName,
|
||||||
|
attachmentType,
|
||||||
|
attachmentSize,
|
||||||
|
attachmentExpires,
|
||||||
|
attachmentURL,
|
||||||
|
m.Sender,
|
||||||
|
m.Encoding,
|
||||||
|
published,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
actionsStr = string(actionsBytes)
|
|
||||||
}
|
}
|
||||||
_, err := c.db.Exec(
|
return tx.Commit()
|
||||||
insertMessageQuery,
|
|
||||||
m.ID,
|
|
||||||
m.Time,
|
|
||||||
m.Topic,
|
|
||||||
m.Message,
|
|
||||||
m.Title,
|
|
||||||
m.Priority,
|
|
||||||
tags,
|
|
||||||
m.Click,
|
|
||||||
actionsStr,
|
|
||||||
attachmentName,
|
|
||||||
attachmentType,
|
|
||||||
attachmentSize,
|
|
||||||
attachmentExpires,
|
|
||||||
attachmentURL,
|
|
||||||
m.Sender,
|
|
||||||
m.Encoding,
|
|
||||||
published,
|
|
||||||
)
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *messageCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
|
func (c *messageCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
|
||||||
|
@ -486,6 +512,11 @@ func readMessages(rows *sql.Rows) ([]*message, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupCacheDB(db *sql.DB) error {
|
func setupCacheDB(db *sql.DB) error {
|
||||||
|
// Performance: WAL mode, only sync on WAL checkpoints
|
||||||
|
if _, err := db.Exec(setupQueries); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// If 'messages' table does not exist, this must be a new database
|
// If 'messages' table does not exist, this must be a new database
|
||||||
rowsMC, err := db.Query(selectMessagesCountQuery)
|
rowsMC, err := db.Query(selectMessagesCountQuery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
@ -1411,15 +1412,22 @@ func TestServer_Visitor_XForwardedFor_Multiple(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServer_PublishWhileUpdatingStatsWithLotsOfMessages(t *testing.T) {
|
func TestServer_PublishWhileUpdatingStatsWithLotsOfMessages(t *testing.T) {
|
||||||
count := 1000
|
count := 50000
|
||||||
s := newTestServer(t, newTestConfig(t))
|
c := newTestConfig(t)
|
||||||
|
c.TotalTopicLimit = 50001
|
||||||
|
s := newTestServer(t, c)
|
||||||
|
|
||||||
// Add lots of messages
|
// Add lots of messages
|
||||||
log.Printf("Adding %d messages", count)
|
log.Printf("Adding %d messages", count)
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
messages := make([]*message, 0)
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
require.Nil(t, s.messageCache.AddMessage(newDefaultMessage(fmt.Sprintf("topic%d", i), "some message")))
|
topicID := fmt.Sprintf("topic%d", i)
|
||||||
|
_, err := s.topicsFromIDs(topicID) // Add topic to internal s.topics array
|
||||||
|
require.Nil(t, err)
|
||||||
|
messages = append(messages, newDefaultMessage(topicID, "some message"))
|
||||||
}
|
}
|
||||||
|
require.Nil(t, s.messageCache.addMessages(messages))
|
||||||
log.Printf("Done: Adding %d messages; took %s", count, time.Since(start).Round(time.Millisecond))
|
log.Printf("Done: Adding %d messages; took %s", count, time.Since(start).Round(time.Millisecond))
|
||||||
|
|
||||||
// Update stats
|
// Update stats
|
||||||
|
@ -1438,11 +1446,13 @@ func TestServer_PublishWhileUpdatingStatsWithLotsOfMessages(t *testing.T) {
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
response := request(t, s, "PUT", "/mytopic", "some body", nil)
|
response := request(t, s, "PUT", "/mytopic", "some body", nil)
|
||||||
m := toMessage(t, response.Body.String())
|
m := toMessage(t, response.Body.String())
|
||||||
require.Equal(t, "some body", m.Message)
|
assert.Equal(t, "some body", m.Message)
|
||||||
require.True(t, time.Since(start) < 500*time.Millisecond)
|
assert.True(t, time.Since(start) < 100*time.Millisecond)
|
||||||
log.Printf("Done: Publishing message; took %s", time.Since(start).Round(time.Millisecond))
|
log.Printf("Done: Publishing message; took %s", time.Since(start).Round(time.Millisecond))
|
||||||
|
|
||||||
|
// Wait for all goroutines
|
||||||
<-statsChan
|
<-statsChan
|
||||||
|
log.Printf("Done: Waiting for all locks")
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestConfig(t *testing.T) *Config {
|
func newTestConfig(t *testing.T) *Config {
|
||||||
|
|
Loading…
Reference in a new issue