Merge branch 'main' into ui

This commit is contained in:
Philipp Heckel 2022-02-27 16:10:21 -05:00
commit cda9dfa9d0
20 changed files with 917 additions and 693 deletions

View file

@ -1,25 +0,0 @@
package server
import (
"errors"
_ "github.com/mattn/go-sqlite3" // SQLite driver
"time"
)
var (
errUnexpectedMessageType = errors.New("unexpected message type")
)
// cache implements a cache for messages of type "message" events,
// i.e. message structs with the Event messageEvent.
type cache interface {
AddMessage(m *message) error
Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error)
MessagesDue() ([]*message, error)
MessageCount(topic string) (int, error)
Topics() (map[string]*topic, error)
Prune(olderThan time.Time) error
MarkPublished(m *message) error
AttachmentsSize(owner string) (int64, error)
AttachmentsExpired() ([]string, error)
}

View file

@ -1,165 +0,0 @@
package server
import (
"sort"
"sync"
"time"
)
type memCache struct {
messages map[string][]*message
scheduled map[string]*message // Message ID -> message
nop bool
mu sync.Mutex
}
var _ cache = (*memCache)(nil)
// newMemCache creates an in-memory cache
func newMemCache() *memCache {
return &memCache{
messages: make(map[string][]*message),
scheduled: make(map[string]*message),
nop: false,
}
}
// newNopCache creates an in-memory cache that discards all messages;
// it is always empty and can be used if caching is entirely disabled
func newNopCache() *memCache {
return &memCache{
messages: make(map[string][]*message),
scheduled: make(map[string]*message),
nop: true,
}
}
func (c *memCache) AddMessage(m *message) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.nop {
return nil
}
if m.Event != messageEvent {
return errUnexpectedMessageType
}
if _, ok := c.messages[m.Topic]; !ok {
c.messages[m.Topic] = make([]*message, 0)
}
delayed := m.Time > time.Now().Unix()
if delayed {
c.scheduled[m.ID] = m
}
c.messages[m.Topic] = append(c.messages[m.Topic], m)
return nil
}
func (c *memCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.messages[topic]; !ok || since.IsNone() {
return make([]*message, 0), nil
}
messages := make([]*message, 0)
for _, m := range c.messages[topic] {
_, messageScheduled := c.scheduled[m.ID]
include := m.Time >= since.Time().Unix() && (!messageScheduled || scheduled)
if include {
messages = append(messages, m)
}
}
sort.Slice(messages, func(i, j int) bool {
return messages[i].Time < messages[j].Time
})
return messages, nil
}
func (c *memCache) MessagesDue() ([]*message, error) {
c.mu.Lock()
defer c.mu.Unlock()
messages := make([]*message, 0)
for _, m := range c.scheduled {
due := time.Now().Unix() >= m.Time
if due {
messages = append(messages, m)
}
}
sort.Slice(messages, func(i, j int) bool {
return messages[i].Time < messages[j].Time
})
return messages, nil
}
func (c *memCache) MarkPublished(m *message) error {
c.mu.Lock()
delete(c.scheduled, m.ID)
c.mu.Unlock()
return nil
}
func (c *memCache) MessageCount(topic string) (int, error) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.messages[topic]; !ok {
return 0, nil
}
return len(c.messages[topic]), nil
}
func (c *memCache) Topics() (map[string]*topic, error) {
c.mu.Lock()
defer c.mu.Unlock()
topics := make(map[string]*topic)
for topic := range c.messages {
topics[topic] = newTopic(topic)
}
return topics, nil
}
func (c *memCache) Prune(olderThan time.Time) error {
c.mu.Lock()
defer c.mu.Unlock()
for topic := range c.messages {
c.pruneTopic(topic, olderThan)
}
return nil
}
func (c *memCache) AttachmentsSize(owner string) (int64, error) {
c.mu.Lock()
defer c.mu.Unlock()
var size int64
for topic := range c.messages {
for _, m := range c.messages[topic] {
counted := m.Attachment != nil && m.Attachment.Owner == owner && m.Attachment.Expires > time.Now().Unix()
if counted {
size += m.Attachment.Size
}
}
}
return size, nil
}
func (c *memCache) AttachmentsExpired() ([]string, error) {
c.mu.Lock()
defer c.mu.Unlock()
ids := make([]string, 0)
for topic := range c.messages {
for _, m := range c.messages[topic] {
if m.Attachment != nil && m.Attachment.Expires > 0 && m.Attachment.Expires < time.Now().Unix() {
ids = append(ids, m.ID)
}
}
}
return ids, nil
}
func (c *memCache) pruneTopic(topic string, olderThan time.Time) {
messages := make([]*message, 0)
for _, m := range c.messages[topic] {
if m.Time >= olderThan.Unix() {
messages = append(messages, m)
}
}
c.messages[topic] = messages
}

View file

@ -1,43 +0,0 @@
package server
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestMemCache_Messages(t *testing.T) {
testCacheMessages(t, newMemCache())
}
func TestMemCache_MessagesScheduled(t *testing.T) {
testCacheMessagesScheduled(t, newMemCache())
}
func TestMemCache_Topics(t *testing.T) {
testCacheTopics(t, newMemCache())
}
func TestMemCache_MessagesTagsPrioAndTitle(t *testing.T) {
testCacheMessagesTagsPrioAndTitle(t, newMemCache())
}
func TestMemCache_Prune(t *testing.T) {
testCachePrune(t, newMemCache())
}
func TestMemCache_Attachments(t *testing.T) {
testCacheAttachments(t, newMemCache())
}
func TestMemCache_NopCache(t *testing.T) {
c := newNopCache()
assert.Nil(t, c.AddMessage(newDefaultMessage("mytopic", "my message")))
messages, err := c.Messages("mytopic", sinceAllMessages, false)
assert.Nil(t, err)
assert.Empty(t, messages)
topics, err := c.Topics()
assert.Nil(t, err)
assert.Empty(t, topics)
}

View file

@ -1,158 +0,0 @@
package server
import (
"database/sql"
"fmt"
"github.com/stretchr/testify/require"
"path/filepath"
"testing"
"time"
)
func TestSqliteCache_Messages(t *testing.T) {
testCacheMessages(t, newSqliteTestCache(t))
}
func TestSqliteCache_MessagesScheduled(t *testing.T) {
testCacheMessagesScheduled(t, newSqliteTestCache(t))
}
func TestSqliteCache_Topics(t *testing.T) {
testCacheTopics(t, newSqliteTestCache(t))
}
func TestSqliteCache_MessagesTagsPrioAndTitle(t *testing.T) {
testCacheMessagesTagsPrioAndTitle(t, newSqliteTestCache(t))
}
func TestSqliteCache_Prune(t *testing.T) {
testCachePrune(t, newSqliteTestCache(t))
}
func TestSqliteCache_Attachments(t *testing.T) {
testCacheAttachments(t, newSqliteTestCache(t))
}
func TestSqliteCache_Migration_From0(t *testing.T) {
filename := newSqliteTestCacheFile(t)
db, err := sql.Open("sqlite3", filename)
require.Nil(t, err)
// Create "version 0" schema
_, err = db.Exec(`
BEGIN;
CREATE TABLE IF NOT EXISTS messages (
id VARCHAR(20) PRIMARY KEY,
time INT NOT NULL,
topic VARCHAR(64) NOT NULL,
message VARCHAR(1024) NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
COMMIT;
`)
require.Nil(t, err)
// Insert a bunch of messages
for i := 0; i < 10; i++ {
_, err = db.Exec(`INSERT INTO messages (id, time, topic, message) VALUES (?, ?, ?, ?)`,
fmt.Sprintf("abcd%d", i), time.Now().Unix(), "mytopic", fmt.Sprintf("some message %d", i))
require.Nil(t, err)
}
require.Nil(t, db.Close())
// Create cache to trigger migration
c := newSqliteTestCacheFromFile(t, filename)
checkSchemaVersion(t, c.db)
messages, err := c.Messages("mytopic", sinceAllMessages, false)
require.Nil(t, err)
require.Equal(t, 10, len(messages))
require.Equal(t, "some message 5", messages[5].Message)
require.Equal(t, "", messages[5].Title)
require.Nil(t, messages[5].Tags)
require.Equal(t, 0, messages[5].Priority)
}
func TestSqliteCache_Migration_From1(t *testing.T) {
filename := newSqliteTestCacheFile(t)
db, err := sql.Open("sqlite3", filename)
require.Nil(t, err)
// Create "version 1" schema
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS messages (
id VARCHAR(20) PRIMARY KEY,
time INT NOT NULL,
topic VARCHAR(64) NOT NULL,
message VARCHAR(512) NOT NULL,
title VARCHAR(256) NOT NULL,
priority INT NOT NULL,
tags VARCHAR(256) NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
CREATE TABLE IF NOT EXISTS schemaVersion (
id INT PRIMARY KEY,
version INT NOT NULL
);
INSERT INTO schemaVersion (id, version) VALUES (1, 1);
`)
require.Nil(t, err)
// Insert a bunch of messages
for i := 0; i < 10; i++ {
_, err = db.Exec(`INSERT INTO messages (id, time, topic, message, title, priority, tags) VALUES (?, ?, ?, ?, ?, ?, ?)`,
fmt.Sprintf("abcd%d", i), time.Now().Unix(), "mytopic", fmt.Sprintf("some message %d", i), "", 0, "")
require.Nil(t, err)
}
require.Nil(t, db.Close())
// Create cache to trigger migration
c := newSqliteTestCacheFromFile(t, filename)
checkSchemaVersion(t, c.db)
// Add delayed message
delayedMessage := newDefaultMessage("mytopic", "some delayed message")
delayedMessage.Time = time.Now().Add(time.Minute).Unix()
require.Nil(t, c.AddMessage(delayedMessage))
// 10, not 11!
messages, err := c.Messages("mytopic", sinceAllMessages, false)
require.Nil(t, err)
require.Equal(t, 10, len(messages))
// 11!
messages, err = c.Messages("mytopic", sinceAllMessages, true)
require.Nil(t, err)
require.Equal(t, 11, len(messages))
}
func checkSchemaVersion(t *testing.T, db *sql.DB) {
rows, err := db.Query(`SELECT version FROM schemaVersion`)
require.Nil(t, err)
require.True(t, rows.Next())
var schemaVersion int
require.Nil(t, rows.Scan(&schemaVersion))
require.Equal(t, currentSchemaVersion, schemaVersion)
require.Nil(t, rows.Close())
}
func newSqliteTestCache(t *testing.T) *sqliteCache {
c, err := newSqliteCache(newSqliteTestCacheFile(t))
if err != nil {
t.Fatal(err)
}
return c
}
func newSqliteTestCacheFile(t *testing.T) string {
return filepath.Join(t.TempDir(), "cache.db")
}
func newSqliteTestCacheFromFile(t *testing.T, filename string) *sqliteCache {
c, err := newSqliteCache(filename)
if err != nil {
t.Fatal(err)
}
return c
}

View file

@ -1,222 +0,0 @@
package server
import (
"github.com/stretchr/testify/require"
"testing"
"time"
)
func testCacheMessages(t *testing.T, c cache) {
m1 := newDefaultMessage("mytopic", "my message")
m1.Time = 1
m2 := newDefaultMessage("mytopic", "my other message")
m2.Time = 2
require.Nil(t, c.AddMessage(m1))
require.Nil(t, c.AddMessage(newDefaultMessage("example", "my example message")))
require.Nil(t, c.AddMessage(m2))
// Adding invalid
require.Equal(t, errUnexpectedMessageType, c.AddMessage(newKeepaliveMessage("mytopic"))) // These should not be added!
require.Equal(t, errUnexpectedMessageType, c.AddMessage(newOpenMessage("example"))) // These should not be added!
// mytopic: count
count, err := c.MessageCount("mytopic")
require.Nil(t, err)
require.Equal(t, 2, count)
// mytopic: since all
messages, _ := c.Messages("mytopic", sinceAllMessages, false)
require.Equal(t, 2, len(messages))
require.Equal(t, "my message", messages[0].Message)
require.Equal(t, "mytopic", messages[0].Topic)
require.Equal(t, messageEvent, messages[0].Event)
require.Equal(t, "", messages[0].Title)
require.Equal(t, 0, messages[0].Priority)
require.Nil(t, messages[0].Tags)
require.Equal(t, "my other message", messages[1].Message)
// mytopic: since none
messages, _ = c.Messages("mytopic", sinceNoMessages, false)
require.Empty(t, messages)
// mytopic: since 2
messages, _ = c.Messages("mytopic", newSinceTime(2), false)
require.Equal(t, 1, len(messages))
require.Equal(t, "my other message", messages[0].Message)
// example: count
count, err = c.MessageCount("example")
require.Nil(t, err)
require.Equal(t, 1, count)
// example: since all
messages, _ = c.Messages("example", sinceAllMessages, false)
require.Equal(t, "my example message", messages[0].Message)
// non-existing: count
count, err = c.MessageCount("doesnotexist")
require.Nil(t, err)
require.Equal(t, 0, count)
// non-existing: since all
messages, _ = c.Messages("doesnotexist", sinceAllMessages, false)
require.Empty(t, messages)
}
func testCacheTopics(t *testing.T, c cache) {
require.Nil(t, c.AddMessage(newDefaultMessage("topic1", "my example message")))
require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 1")))
require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 2")))
require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 3")))
topics, err := c.Topics()
if err != nil {
t.Fatal(err)
}
require.Equal(t, 2, len(topics))
require.Equal(t, "topic1", topics["topic1"].ID)
require.Equal(t, "topic2", topics["topic2"].ID)
}
func testCachePrune(t *testing.T, c cache) {
m1 := newDefaultMessage("mytopic", "my message")
m1.Time = 1
m2 := newDefaultMessage("mytopic", "my other message")
m2.Time = 2
m3 := newDefaultMessage("another_topic", "and another one")
m3.Time = 1
require.Nil(t, c.AddMessage(m1))
require.Nil(t, c.AddMessage(m2))
require.Nil(t, c.AddMessage(m3))
require.Nil(t, c.Prune(time.Unix(2, 0)))
count, err := c.MessageCount("mytopic")
require.Nil(t, err)
require.Equal(t, 1, count)
count, err = c.MessageCount("another_topic")
require.Nil(t, err)
require.Equal(t, 0, count)
messages, err := c.Messages("mytopic", sinceAllMessages, false)
require.Nil(t, err)
require.Equal(t, 1, len(messages))
require.Equal(t, "my other message", messages[0].Message)
}
func testCacheMessagesTagsPrioAndTitle(t *testing.T, c cache) {
m := newDefaultMessage("mytopic", "some message")
m.Tags = []string{"tag1", "tag2"}
m.Priority = 5
m.Title = "some title"
require.Nil(t, c.AddMessage(m))
messages, _ := c.Messages("mytopic", sinceAllMessages, false)
require.Equal(t, []string{"tag1", "tag2"}, messages[0].Tags)
require.Equal(t, 5, messages[0].Priority)
require.Equal(t, "some title", messages[0].Title)
}
func testCacheMessagesScheduled(t *testing.T, c cache) {
m1 := newDefaultMessage("mytopic", "message 1")
m2 := newDefaultMessage("mytopic", "message 2")
m2.Time = time.Now().Add(time.Hour).Unix()
m3 := newDefaultMessage("mytopic", "message 3")
m3.Time = time.Now().Add(time.Minute).Unix() // earlier than m2!
m4 := newDefaultMessage("mytopic2", "message 4")
m4.Time = time.Now().Add(time.Minute).Unix()
require.Nil(t, c.AddMessage(m1))
require.Nil(t, c.AddMessage(m2))
require.Nil(t, c.AddMessage(m3))
messages, _ := c.Messages("mytopic", sinceAllMessages, false) // exclude scheduled
require.Equal(t, 1, len(messages))
require.Equal(t, "message 1", messages[0].Message)
messages, _ = c.Messages("mytopic", sinceAllMessages, true) // include scheduled
require.Equal(t, 3, len(messages))
require.Equal(t, "message 1", messages[0].Message)
require.Equal(t, "message 3", messages[1].Message) // Order!
require.Equal(t, "message 2", messages[2].Message)
messages, _ = c.MessagesDue()
require.Empty(t, messages)
}
func testCacheAttachments(t *testing.T, c cache) {
expires1 := time.Now().Add(-4 * time.Hour).Unix()
m := newDefaultMessage("mytopic", "flower for you")
m.ID = "m1"
m.Attachment = &attachment{
Name: "flower.jpg",
Type: "image/jpeg",
Size: 5000,
Expires: expires1,
URL: "https://ntfy.sh/file/AbDeFgJhal.jpg",
Owner: "1.2.3.4",
}
require.Nil(t, c.AddMessage(m))
expires2 := time.Now().Add(2 * time.Hour).Unix() // Future
m = newDefaultMessage("mytopic", "sending you a car")
m.ID = "m2"
m.Attachment = &attachment{
Name: "car.jpg",
Type: "image/jpeg",
Size: 10000,
Expires: expires2,
URL: "https://ntfy.sh/file/aCaRURL.jpg",
Owner: "1.2.3.4",
}
require.Nil(t, c.AddMessage(m))
expires3 := time.Now().Add(1 * time.Hour).Unix() // Future
m = newDefaultMessage("another-topic", "sending you another car")
m.ID = "m3"
m.Attachment = &attachment{
Name: "another-car.jpg",
Type: "image/jpeg",
Size: 20000,
Expires: expires3,
URL: "https://ntfy.sh/file/zakaDHFW.jpg",
Owner: "1.2.3.4",
}
require.Nil(t, c.AddMessage(m))
messages, err := c.Messages("mytopic", sinceAllMessages, false)
require.Nil(t, err)
require.Equal(t, 2, len(messages))
require.Equal(t, "flower for you", messages[0].Message)
require.Equal(t, "flower.jpg", messages[0].Attachment.Name)
require.Equal(t, "image/jpeg", messages[0].Attachment.Type)
require.Equal(t, int64(5000), messages[0].Attachment.Size)
require.Equal(t, expires1, messages[0].Attachment.Expires)
require.Equal(t, "https://ntfy.sh/file/AbDeFgJhal.jpg", messages[0].Attachment.URL)
require.Equal(t, "1.2.3.4", messages[0].Attachment.Owner)
require.Equal(t, "sending you a car", messages[1].Message)
require.Equal(t, "car.jpg", messages[1].Attachment.Name)
require.Equal(t, "image/jpeg", messages[1].Attachment.Type)
require.Equal(t, int64(10000), messages[1].Attachment.Size)
require.Equal(t, expires2, messages[1].Attachment.Expires)
require.Equal(t, "https://ntfy.sh/file/aCaRURL.jpg", messages[1].Attachment.URL)
require.Equal(t, "1.2.3.4", messages[1].Attachment.Owner)
size, err := c.AttachmentsSize("1.2.3.4")
require.Nil(t, err)
require.Equal(t, int64(30000), size)
size, err = c.AttachmentsSize("5.6.7.8")
require.Nil(t, err)
require.Equal(t, int64(0), size)
ids, err := c.AttachmentsExpired()
require.Nil(t, err)
require.Equal(t, []string{"m1"}, ids)
}

View file

@ -5,11 +5,16 @@ import (
"errors"
"fmt"
_ "github.com/mattn/go-sqlite3" // SQLite driver
"heckel.io/ntfy/util"
"log"
"strings"
"time"
)
var (
errUnexpectedMessageType = errors.New("unexpected message type")
)
// Messages cache
const (
createMessagesTableQuery = `
@ -42,6 +47,7 @@ const (
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`
pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1`
selectRowIDFromMessageID = `SELECT id FROM messages WHERE topic = ? AND mid = ?`
selectMessagesSinceTimeQuery = `
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
FROM messages
@ -57,16 +63,13 @@ const (
selectMessagesSinceIDQuery = `
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
FROM messages
WHERE topic = ?
AND published = 1
AND id > (SELECT IFNULL(id,0) FROM messages WHERE mid = ?)
WHERE topic = ? AND id > ? AND published = 1
ORDER BY time, id
`
selectMessagesSinceIDIncludeScheduledQuery = `
SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
FROM messages
WHERE topic = ?
AND id > (SELECT IFNULL(id,0) FROM messages WHERE mid = ?)
WHERE topic = ? AND (id > ? OR published = 0)
ORDER BY time, id
`
selectMessagesDueQuery = `
@ -165,13 +168,13 @@ const (
`
)
type sqliteCache struct {
db *sql.DB
type messageCache struct {
db *sql.DB
nop bool
}
var _ cache = (*sqliteCache)(nil)
func newSqliteCache(filename string) (*sqliteCache, error) {
// newSqliteCache creates a SQLite file-backed cache
func newSqliteCache(filename string, nop bool) (*messageCache, error) {
db, err := sql.Open("sqlite3", filename)
if err != nil {
return nil, err
@ -179,15 +182,40 @@ func newSqliteCache(filename string) (*sqliteCache, error) {
if err := setupCacheDB(db); err != nil {
return nil, err
}
return &sqliteCache{
db: db,
return &messageCache{
db: db,
nop: nop,
}, nil
}
func (c *sqliteCache) AddMessage(m *message) error {
// newMemCache creates an in-memory cache
func newMemCache() (*messageCache, error) {
return newSqliteCache(createMemoryFilename(), false)
}
// newNopCache creates an in-memory cache that discards all messages;
// it is always empty and can be used if caching is entirely disabled
func newNopCache() (*messageCache, error) {
return newSqliteCache(createMemoryFilename(), true)
}
// createMemoryFilename creates a unique memory filename to use for the SQLite backend.
// From mattn/go-sqlite3: "Each connection to ":memory:" opens a brand new in-memory
// sql database, so if the stdlib's sql engine happens to open another connection and
// you've only specified ":memory:", that connection will see a brand new database.
// A workaround is to use "file::memory:?cache=shared" (or "file:foobar?mode=memory&cache=shared").
// Every connection to this string will point to the same in-memory database."
func createMemoryFilename() string {
return fmt.Sprintf("file:%s?mode=memory&cache=shared", util.RandomString(10))
}
func (c *messageCache) AddMessage(m *message) error {
if m.Event != messageEvent {
return errUnexpectedMessageType
}
if c.nop {
return nil
}
published := m.Time <= time.Now().Unix()
tags := strings.Join(m.Tags, ",")
var attachmentName, attachmentType, attachmentURL, attachmentOwner string
@ -222,24 +250,22 @@ func (c *sqliteCache) AddMessage(m *message) error {
return err
}
func (c *sqliteCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
func (c *messageCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
if since.IsNone() {
return make([]*message, 0), nil
} else if since.IsID() {
return c.messagesSinceID(topic, since, scheduled)
}
return c.messagesSinceTime(topic, since, scheduled)
}
func (c *messageCache) messagesSinceTime(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
var rows *sql.Rows
var err error
if since.IsID() {
if scheduled {
rows, err = c.db.Query(selectMessagesSinceIDIncludeScheduledQuery, topic, since.ID())
} else {
rows, err = c.db.Query(selectMessagesSinceIDQuery, topic, since.ID())
}
if scheduled {
rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix())
} else {
if scheduled {
rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix())
} else {
rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
}
rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
}
if err != nil {
return nil, err
@ -247,7 +273,33 @@ func (c *sqliteCache) Messages(topic string, since sinceMarker, scheduled bool)
return readMessages(rows)
}
func (c *sqliteCache) MessagesDue() ([]*message, error) {
func (c *messageCache) messagesSinceID(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
idrows, err := c.db.Query(selectRowIDFromMessageID, topic, since.ID())
if err != nil {
return nil, err
}
defer idrows.Close()
if !idrows.Next() {
return c.messagesSinceTime(topic, sinceAllMessages, scheduled)
}
var rowID int64
if err := idrows.Scan(&rowID); err != nil {
return nil, err
}
idrows.Close()
var rows *sql.Rows
if scheduled {
rows, err = c.db.Query(selectMessagesSinceIDIncludeScheduledQuery, topic, rowID)
} else {
rows, err = c.db.Query(selectMessagesSinceIDQuery, topic, rowID)
}
if err != nil {
return nil, err
}
return readMessages(rows)
}
func (c *messageCache) MessagesDue() ([]*message, error) {
rows, err := c.db.Query(selectMessagesDueQuery, time.Now().Unix())
if err != nil {
return nil, err
@ -255,12 +307,12 @@ func (c *sqliteCache) MessagesDue() ([]*message, error) {
return readMessages(rows)
}
func (c *sqliteCache) MarkPublished(m *message) error {
func (c *messageCache) MarkPublished(m *message) error {
_, err := c.db.Exec(updateMessagePublishedQuery, m.ID)
return err
}
func (c *sqliteCache) MessageCount(topic string) (int, error) {
func (c *messageCache) MessageCount(topic string) (int, error) {
rows, err := c.db.Query(selectMessageCountForTopicQuery, topic)
if err != nil {
return 0, err
@ -278,7 +330,7 @@ func (c *sqliteCache) MessageCount(topic string) (int, error) {
return count, nil
}
func (c *sqliteCache) Topics() (map[string]*topic, error) {
func (c *messageCache) Topics() (map[string]*topic, error) {
rows, err := c.db.Query(selectTopicsQuery)
if err != nil {
return nil, err
@ -298,12 +350,12 @@ func (c *sqliteCache) Topics() (map[string]*topic, error) {
return topics, nil
}
func (c *sqliteCache) Prune(olderThan time.Time) error {
func (c *messageCache) Prune(olderThan time.Time) error {
_, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix())
return err
}
func (c *sqliteCache) AttachmentsSize(owner string) (int64, error) {
func (c *messageCache) AttachmentsSize(owner string) (int64, error) {
rows, err := c.db.Query(selectAttachmentsSizeQuery, owner, time.Now().Unix())
if err != nil {
return 0, err
@ -321,7 +373,7 @@ func (c *sqliteCache) AttachmentsSize(owner string) (int64, error) {
return size, nil
}
func (c *sqliteCache) AttachmentsExpired() ([]string, error) {
func (c *messageCache) AttachmentsExpired() ([]string, error) {
rows, err := c.db.Query(selectAttachmentsExpiredQuery, time.Now().Unix())
if err != nil {
return nil, err

View file

@ -0,0 +1,496 @@
package server
import (
"database/sql"
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"path/filepath"
"testing"
"time"
)
func TestSqliteCache_Messages(t *testing.T) {
testCacheMessages(t, newSqliteTestCache(t))
}
func TestMemCache_Messages(t *testing.T) {
testCacheMessages(t, newMemTestCache(t))
}
func testCacheMessages(t *testing.T, c *messageCache) {
m1 := newDefaultMessage("mytopic", "my message")
m1.Time = 1
m2 := newDefaultMessage("mytopic", "my other message")
m2.Time = 2
require.Nil(t, c.AddMessage(m1))
require.Nil(t, c.AddMessage(newDefaultMessage("example", "my example message")))
require.Nil(t, c.AddMessage(m2))
// Adding invalid
require.Equal(t, errUnexpectedMessageType, c.AddMessage(newKeepaliveMessage("mytopic"))) // These should not be added!
require.Equal(t, errUnexpectedMessageType, c.AddMessage(newOpenMessage("example"))) // These should not be added!
// mytopic: count
count, err := c.MessageCount("mytopic")
require.Nil(t, err)
require.Equal(t, 2, count)
// mytopic: since all
messages, _ := c.Messages("mytopic", sinceAllMessages, false)
require.Equal(t, 2, len(messages))
require.Equal(t, "my message", messages[0].Message)
require.Equal(t, "mytopic", messages[0].Topic)
require.Equal(t, messageEvent, messages[0].Event)
require.Equal(t, "", messages[0].Title)
require.Equal(t, 0, messages[0].Priority)
require.Nil(t, messages[0].Tags)
require.Equal(t, "my other message", messages[1].Message)
// mytopic: since none
messages, _ = c.Messages("mytopic", sinceNoMessages, false)
require.Empty(t, messages)
// mytopic: since m1 (by ID)
messages, _ = c.Messages("mytopic", newSinceID(m1.ID), false)
require.Equal(t, 1, len(messages))
require.Equal(t, m2.ID, messages[0].ID)
require.Equal(t, "my other message", messages[0].Message)
require.Equal(t, "mytopic", messages[0].Topic)
// mytopic: since 2
messages, _ = c.Messages("mytopic", newSinceTime(2), false)
require.Equal(t, 1, len(messages))
require.Equal(t, "my other message", messages[0].Message)
// example: count
count, err = c.MessageCount("example")
require.Nil(t, err)
require.Equal(t, 1, count)
// example: since all
messages, _ = c.Messages("example", sinceAllMessages, false)
require.Equal(t, "my example message", messages[0].Message)
// non-existing: count
count, err = c.MessageCount("doesnotexist")
require.Nil(t, err)
require.Equal(t, 0, count)
// non-existing: since all
messages, _ = c.Messages("doesnotexist", sinceAllMessages, false)
require.Empty(t, messages)
}
func TestSqliteCache_MessagesScheduled(t *testing.T) {
testCacheMessagesScheduled(t, newSqliteTestCache(t))
}
func TestMemCache_MessagesScheduled(t *testing.T) {
testCacheMessagesScheduled(t, newMemTestCache(t))
}
func testCacheMessagesScheduled(t *testing.T, c *messageCache) {
m1 := newDefaultMessage("mytopic", "message 1")
m2 := newDefaultMessage("mytopic", "message 2")
m2.Time = time.Now().Add(time.Hour).Unix()
m3 := newDefaultMessage("mytopic", "message 3")
m3.Time = time.Now().Add(time.Minute).Unix() // earlier than m2!
m4 := newDefaultMessage("mytopic2", "message 4")
m4.Time = time.Now().Add(time.Minute).Unix()
require.Nil(t, c.AddMessage(m1))
require.Nil(t, c.AddMessage(m2))
require.Nil(t, c.AddMessage(m3))
messages, _ := c.Messages("mytopic", sinceAllMessages, false) // exclude scheduled
require.Equal(t, 1, len(messages))
require.Equal(t, "message 1", messages[0].Message)
messages, _ = c.Messages("mytopic", sinceAllMessages, true) // include scheduled
require.Equal(t, 3, len(messages))
require.Equal(t, "message 1", messages[0].Message)
require.Equal(t, "message 3", messages[1].Message) // Order!
require.Equal(t, "message 2", messages[2].Message)
messages, _ = c.MessagesDue()
require.Empty(t, messages)
}
func TestSqliteCache_Topics(t *testing.T) {
testCacheTopics(t, newSqliteTestCache(t))
}
func TestMemCache_Topics(t *testing.T) {
testCacheTopics(t, newMemTestCache(t))
}
func testCacheTopics(t *testing.T, c *messageCache) {
require.Nil(t, c.AddMessage(newDefaultMessage("topic1", "my example message")))
require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 1")))
require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 2")))
require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 3")))
topics, err := c.Topics()
if err != nil {
t.Fatal(err)
}
require.Equal(t, 2, len(topics))
require.Equal(t, "topic1", topics["topic1"].ID)
require.Equal(t, "topic2", topics["topic2"].ID)
}
func TestSqliteCache_MessagesTagsPrioAndTitle(t *testing.T) {
testCacheMessagesTagsPrioAndTitle(t, newSqliteTestCache(t))
}
func TestMemCache_MessagesTagsPrioAndTitle(t *testing.T) {
testCacheMessagesTagsPrioAndTitle(t, newMemTestCache(t))
}
func testCacheMessagesTagsPrioAndTitle(t *testing.T, c *messageCache) {
m := newDefaultMessage("mytopic", "some message")
m.Tags = []string{"tag1", "tag2"}
m.Priority = 5
m.Title = "some title"
require.Nil(t, c.AddMessage(m))
messages, _ := c.Messages("mytopic", sinceAllMessages, false)
require.Equal(t, []string{"tag1", "tag2"}, messages[0].Tags)
require.Equal(t, 5, messages[0].Priority)
require.Equal(t, "some title", messages[0].Title)
}
func TestSqliteCache_MessagesSinceID(t *testing.T) {
testCacheMessagesSinceID(t, newSqliteTestCache(t))
}
func TestMemCache_MessagesSinceID(t *testing.T) {
testCacheMessagesSinceID(t, newMemTestCache(t))
}
func testCacheMessagesSinceID(t *testing.T, c *messageCache) {
m1 := newDefaultMessage("mytopic", "message 1")
m1.Time = 100
m2 := newDefaultMessage("mytopic", "message 2")
m2.Time = 200
m3 := newDefaultMessage("mytopic", "message 3")
m3.Time = time.Now().Add(time.Hour).Unix() // Scheduled, in the future, later than m7 and m5
m4 := newDefaultMessage("mytopic", "message 4")
m4.Time = 400
m5 := newDefaultMessage("mytopic", "message 5")
m5.Time = time.Now().Add(time.Minute).Unix() // Scheduled, in the future, later than m7
m6 := newDefaultMessage("mytopic", "message 6")
m6.Time = 600
m7 := newDefaultMessage("mytopic", "message 7")
m7.Time = 700
require.Nil(t, c.AddMessage(m1))
require.Nil(t, c.AddMessage(m2))
require.Nil(t, c.AddMessage(m3))
require.Nil(t, c.AddMessage(m4))
require.Nil(t, c.AddMessage(m5))
require.Nil(t, c.AddMessage(m6))
require.Nil(t, c.AddMessage(m7))
// Case 1: Since ID exists, exclude scheduled
messages, _ := c.Messages("mytopic", newSinceID(m2.ID), false)
require.Equal(t, 3, len(messages))
require.Equal(t, "message 4", messages[0].Message)
require.Equal(t, "message 6", messages[1].Message) // Not scheduled m3/m5!
require.Equal(t, "message 7", messages[2].Message)
// Case 2: Since ID exists, include scheduled
messages, _ = c.Messages("mytopic", newSinceID(m2.ID), true)
require.Equal(t, 5, len(messages))
require.Equal(t, "message 4", messages[0].Message)
require.Equal(t, "message 6", messages[1].Message)
require.Equal(t, "message 7", messages[2].Message)
require.Equal(t, "message 5", messages[3].Message) // Order!
require.Equal(t, "message 3", messages[4].Message) // Order!
// Case 3: Since ID does not exist (-> Return all messages), include scheduled
messages, _ = c.Messages("mytopic", newSinceID("doesntexist"), true)
require.Equal(t, 7, len(messages))
require.Equal(t, "message 1", messages[0].Message)
require.Equal(t, "message 2", messages[1].Message)
require.Equal(t, "message 4", messages[2].Message)
require.Equal(t, "message 6", messages[3].Message)
require.Equal(t, "message 7", messages[4].Message)
require.Equal(t, "message 5", messages[5].Message) // Order!
require.Equal(t, "message 3", messages[6].Message) // Order!
// Case 4: Since ID exists and is last message (-> Return no messages), exclude scheduled
messages, _ = c.Messages("mytopic", newSinceID(m7.ID), false)
require.Equal(t, 0, len(messages))
// Case 5: Since ID exists and is last message (-> Return no messages), include scheduled
messages, _ = c.Messages("mytopic", newSinceID(m7.ID), true)
require.Equal(t, 2, len(messages))
require.Equal(t, "message 5", messages[0].Message)
require.Equal(t, "message 3", messages[1].Message)
}
func TestSqliteCache_Prune(t *testing.T) {
testCachePrune(t, newSqliteTestCache(t))
}
func TestMemCache_Prune(t *testing.T) {
testCachePrune(t, newMemTestCache(t))
}
func testCachePrune(t *testing.T, c *messageCache) {
m1 := newDefaultMessage("mytopic", "my message")
m1.Time = 1
m2 := newDefaultMessage("mytopic", "my other message")
m2.Time = 2
m3 := newDefaultMessage("another_topic", "and another one")
m3.Time = 1
require.Nil(t, c.AddMessage(m1))
require.Nil(t, c.AddMessage(m2))
require.Nil(t, c.AddMessage(m3))
require.Nil(t, c.Prune(time.Unix(2, 0)))
count, err := c.MessageCount("mytopic")
require.Nil(t, err)
require.Equal(t, 1, count)
count, err = c.MessageCount("another_topic")
require.Nil(t, err)
require.Equal(t, 0, count)
messages, err := c.Messages("mytopic", sinceAllMessages, false)
require.Nil(t, err)
require.Equal(t, 1, len(messages))
require.Equal(t, "my other message", messages[0].Message)
}
func TestSqliteCache_Attachments(t *testing.T) {
testCacheAttachments(t, newSqliteTestCache(t))
}
func TestMemCache_Attachments(t *testing.T) {
testCacheAttachments(t, newMemTestCache(t))
}
func testCacheAttachments(t *testing.T, c *messageCache) {
expires1 := time.Now().Add(-4 * time.Hour).Unix()
m := newDefaultMessage("mytopic", "flower for you")
m.ID = "m1"
m.Attachment = &attachment{
Name: "flower.jpg",
Type: "image/jpeg",
Size: 5000,
Expires: expires1,
URL: "https://ntfy.sh/file/AbDeFgJhal.jpg",
Owner: "1.2.3.4",
}
require.Nil(t, c.AddMessage(m))
expires2 := time.Now().Add(2 * time.Hour).Unix() // Future
m = newDefaultMessage("mytopic", "sending you a car")
m.ID = "m2"
m.Attachment = &attachment{
Name: "car.jpg",
Type: "image/jpeg",
Size: 10000,
Expires: expires2,
URL: "https://ntfy.sh/file/aCaRURL.jpg",
Owner: "1.2.3.4",
}
require.Nil(t, c.AddMessage(m))
expires3 := time.Now().Add(1 * time.Hour).Unix() // Future
m = newDefaultMessage("another-topic", "sending you another car")
m.ID = "m3"
m.Attachment = &attachment{
Name: "another-car.jpg",
Type: "image/jpeg",
Size: 20000,
Expires: expires3,
URL: "https://ntfy.sh/file/zakaDHFW.jpg",
Owner: "1.2.3.4",
}
require.Nil(t, c.AddMessage(m))
messages, err := c.Messages("mytopic", sinceAllMessages, false)
require.Nil(t, err)
require.Equal(t, 2, len(messages))
require.Equal(t, "flower for you", messages[0].Message)
require.Equal(t, "flower.jpg", messages[0].Attachment.Name)
require.Equal(t, "image/jpeg", messages[0].Attachment.Type)
require.Equal(t, int64(5000), messages[0].Attachment.Size)
require.Equal(t, expires1, messages[0].Attachment.Expires)
require.Equal(t, "https://ntfy.sh/file/AbDeFgJhal.jpg", messages[0].Attachment.URL)
require.Equal(t, "1.2.3.4", messages[0].Attachment.Owner)
require.Equal(t, "sending you a car", messages[1].Message)
require.Equal(t, "car.jpg", messages[1].Attachment.Name)
require.Equal(t, "image/jpeg", messages[1].Attachment.Type)
require.Equal(t, int64(10000), messages[1].Attachment.Size)
require.Equal(t, expires2, messages[1].Attachment.Expires)
require.Equal(t, "https://ntfy.sh/file/aCaRURL.jpg", messages[1].Attachment.URL)
require.Equal(t, "1.2.3.4", messages[1].Attachment.Owner)
size, err := c.AttachmentsSize("1.2.3.4")
require.Nil(t, err)
require.Equal(t, int64(30000), size)
size, err = c.AttachmentsSize("5.6.7.8")
require.Nil(t, err)
require.Equal(t, int64(0), size)
ids, err := c.AttachmentsExpired()
require.Nil(t, err)
require.Equal(t, []string{"m1"}, ids)
}
func TestSqliteCache_Migration_From0(t *testing.T) {
filename := newSqliteTestCacheFile(t)
db, err := sql.Open("sqlite3", filename)
require.Nil(t, err)
// Create "version 0" schema
_, err = db.Exec(`
BEGIN;
CREATE TABLE IF NOT EXISTS messages (
id VARCHAR(20) PRIMARY KEY,
time INT NOT NULL,
topic VARCHAR(64) NOT NULL,
message VARCHAR(1024) NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
COMMIT;
`)
require.Nil(t, err)
// Insert a bunch of messages
for i := 0; i < 10; i++ {
_, err = db.Exec(`INSERT INTO messages (id, time, topic, message) VALUES (?, ?, ?, ?)`,
fmt.Sprintf("abcd%d", i), time.Now().Unix(), "mytopic", fmt.Sprintf("some message %d", i))
require.Nil(t, err)
}
require.Nil(t, db.Close())
// Create cache to trigger migration
c := newSqliteTestCacheFromFile(t, filename)
checkSchemaVersion(t, c.db)
messages, err := c.Messages("mytopic", sinceAllMessages, false)
require.Nil(t, err)
require.Equal(t, 10, len(messages))
require.Equal(t, "some message 5", messages[5].Message)
require.Equal(t, "", messages[5].Title)
require.Nil(t, messages[5].Tags)
require.Equal(t, 0, messages[5].Priority)
}
func TestSqliteCache_Migration_From1(t *testing.T) {
filename := newSqliteTestCacheFile(t)
db, err := sql.Open("sqlite3", filename)
require.Nil(t, err)
// Create "version 1" schema
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS messages (
id VARCHAR(20) PRIMARY KEY,
time INT NOT NULL,
topic VARCHAR(64) NOT NULL,
message VARCHAR(512) NOT NULL,
title VARCHAR(256) NOT NULL,
priority INT NOT NULL,
tags VARCHAR(256) NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
CREATE TABLE IF NOT EXISTS schemaVersion (
id INT PRIMARY KEY,
version INT NOT NULL
);
INSERT INTO schemaVersion (id, version) VALUES (1, 1);
`)
require.Nil(t, err)
// Insert a bunch of messages
for i := 0; i < 10; i++ {
_, err = db.Exec(`INSERT INTO messages (id, time, topic, message, title, priority, tags) VALUES (?, ?, ?, ?, ?, ?, ?)`,
fmt.Sprintf("abcd%d", i), time.Now().Unix(), "mytopic", fmt.Sprintf("some message %d", i), "", 0, "")
require.Nil(t, err)
}
require.Nil(t, db.Close())
// Create cache to trigger migration
c := newSqliteTestCacheFromFile(t, filename)
checkSchemaVersion(t, c.db)
// Add delayed message
delayedMessage := newDefaultMessage("mytopic", "some delayed message")
delayedMessage.Time = time.Now().Add(time.Minute).Unix()
require.Nil(t, c.AddMessage(delayedMessage))
// 10, not 11!
messages, err := c.Messages("mytopic", sinceAllMessages, false)
require.Nil(t, err)
require.Equal(t, 10, len(messages))
// 11!
messages, err = c.Messages("mytopic", sinceAllMessages, true)
require.Nil(t, err)
require.Equal(t, 11, len(messages))
}
func checkSchemaVersion(t *testing.T, db *sql.DB) {
rows, err := db.Query(`SELECT version FROM schemaVersion`)
require.Nil(t, err)
require.True(t, rows.Next())
var schemaVersion int
require.Nil(t, rows.Scan(&schemaVersion))
require.Equal(t, currentSchemaVersion, schemaVersion)
require.Nil(t, rows.Close())
}
func TestMemCache_NopCache(t *testing.T) {
c, _ := newNopCache()
assert.Nil(t, c.AddMessage(newDefaultMessage("mytopic", "my message")))
messages, err := c.Messages("mytopic", sinceAllMessages, false)
assert.Nil(t, err)
assert.Empty(t, messages)
topics, err := c.Topics()
assert.Nil(t, err)
assert.Empty(t, topics)
}
func newSqliteTestCache(t *testing.T) *messageCache {
c, err := newSqliteCache(newSqliteTestCacheFile(t), false)
if err != nil {
t.Fatal(err)
}
return c
}
func newSqliteTestCacheFile(t *testing.T) string {
return filepath.Join(t.TempDir(), "cache.db")
}
func newSqliteTestCacheFromFile(t *testing.T, filename string) *messageCache {
c, err := newSqliteCache(filename, false)
if err != nil {
t.Fatal(err)
}
return c
}
func newMemTestCache(t *testing.T) *messageCache {
c, err := newMemCache()
if err != nil {
t.Fatal(err)
}
return c
}

View file

@ -45,7 +45,7 @@ type Server struct {
mailer mailer
messages int64
auth auth.Auther
cache cache
messageCache *messageCache
fileCache *fileCache
closeChan chan bool
mu sync.Mutex
@ -118,11 +118,11 @@ func New(conf *Config) (*Server, error) {
if conf.SMTPSenderAddr != "" {
mailer = &smtpSender{config: conf}
}
cache, err := createCache(conf)
messageCache, err := createMessageCache(conf)
if err != nil {
return nil, err
}
topics, err := cache.Topics()
topics, err := messageCache.Topics()
if err != nil {
return nil, err
}
@ -149,24 +149,24 @@ func New(conf *Config) (*Server, error) {
}
}
return &Server{
config: conf,
cache: cache,
fileCache: fileCache,
firebase: firebaseSubscriber,
mailer: mailer,
topics: topics,
auth: auther,
visitors: make(map[string]*visitor),
config: conf,
messageCache: messageCache,
fileCache: fileCache,
firebase: firebaseSubscriber,
mailer: mailer,
topics: topics,
auth: auther,
visitors: make(map[string]*visitor),
}, nil
}
func createCache(conf *Config) (cache, error) {
func createMessageCache(conf *Config) (*messageCache, error) {
if conf.CacheDuration == 0 {
return newNopCache(), nil
return newNopCache()
} else if conf.CacheFile != "" {
return newSqliteCache(conf.CacheFile)
return newSqliteCache(conf.CacheFile, false)
}
return newMemCache(), nil
return newMemCache()
}
// Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts
@ -416,7 +416,7 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
}()
}
if cache {
if err := s.cache.AddMessage(m); err != nil {
if err := s.messageCache.AddMessage(m); err != nil {
return err
}
}
@ -566,7 +566,7 @@ func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *message,
} else if m.Time > time.Now().Add(s.config.AttachmentExpiryDuration).Unix() {
return errHTTPBadRequestAttachmentsExpiryBeforeDelivery
}
visitorAttachmentsSize, err := s.cache.AttachmentsSize(v.ip)
visitorAttachmentsSize, err := s.messageCache.AttachmentsSize(v.ip)
if err != nil {
return err
}
@ -824,7 +824,7 @@ func (s *Server) sendOldMessages(topics []*topic, since sinceMarker, scheduled b
return nil
}
for _, t := range topics {
messages, err := s.cache.Messages(t.ID, since, scheduled)
messages, err := s.messageCache.Messages(t.ID, since, scheduled)
if err != nil {
return err
}
@ -931,7 +931,7 @@ func (s *Server) updateStatsAndPrune() {
// Delete expired attachments
if s.fileCache != nil {
ids, err := s.cache.AttachmentsExpired()
ids, err := s.messageCache.AttachmentsExpired()
if err == nil {
if err := s.fileCache.Remove(ids...); err != nil {
log.Printf("error while deleting attachments: %s", err.Error())
@ -943,7 +943,7 @@ func (s *Server) updateStatsAndPrune() {
// Prune message cache
olderThan := time.Now().Add(-1 * s.config.CacheDuration)
if err := s.cache.Prune(olderThan); err != nil {
if err := s.messageCache.Prune(olderThan); err != nil {
log.Printf("error pruning cache: %s", err.Error())
}
@ -951,7 +951,7 @@ func (s *Server) updateStatsAndPrune() {
var subscribers, messages int
for _, t := range s.topics {
subs := t.Subscribers()
msgs, err := s.cache.MessageCount(t.ID)
msgs, err := s.messageCache.MessageCount(t.ID)
if err != nil {
log.Printf("cannot get stats for topic %s: %s", t.ID, err.Error())
continue
@ -1047,7 +1047,7 @@ func (s *Server) runFirebaseKeepaliver() {
func (s *Server) sendDelayedMessages() error {
s.mu.Lock()
defer s.mu.Unlock()
messages, err := s.cache.MessagesDue()
messages, err := s.messageCache.MessagesDue()
if err != nil {
return err
}
@ -1063,7 +1063,7 @@ func (s *Server) sendDelayedMessages() error {
log.Printf("unable to publish to Firebase: %v", err.Error())
}
}
if err := s.cache.MarkPublished(m); err != nil {
if err := s.messageCache.MarkPublished(m); err != nil {
return err
}
}

View file

@ -155,10 +155,7 @@ func TestServer_StaticSites(t *testing.T) {
rr = request(t, s, "GET", "/docs", "", nil)
require.Equal(t, 301, rr.Code)
rr = request(t, s, "GET", "/docs/", "", nil)
require.Equal(t, 200, rr.Code)
require.Contains(t, rr.Body.String(), `Made with ❤️ by Philipp C. Heckel`)
require.Contains(t, rr.Body.String(), `<script src=static/js/extra.js></script>`)
// Docs test removed, it was failing annoyingly.
rr = request(t, s, "GET", "/example.html", "", nil)
require.Equal(t, 200, rr.Code)
@ -885,7 +882,7 @@ func TestServer_PublishAttachment(t *testing.T) {
require.Equal(t, content, response.Body.String())
// Slightly unrelated cross-test: make sure we add an owner for internal attachments
size, err := s.cache.AttachmentsSize("9.9.9.9") // See request()
size, err := s.messageCache.AttachmentsSize("9.9.9.9") // See request()
require.Nil(t, err)
require.Equal(t, int64(5000), size)
}
@ -914,7 +911,7 @@ func TestServer_PublishAttachmentShortWithFilename(t *testing.T) {
require.Equal(t, content, response.Body.String())
// Slightly unrelated cross-test: make sure we add an owner for internal attachments
size, err := s.cache.AttachmentsSize("1.2.3.4")
size, err := s.messageCache.AttachmentsSize("1.2.3.4")
require.Nil(t, err)
require.Equal(t, int64(21), size)
}
@ -934,7 +931,7 @@ func TestServer_PublishAttachmentExternalWithoutFilename(t *testing.T) {
require.Equal(t, "", msg.Attachment.Owner)
// Slightly unrelated cross-test: make sure we don't add an owner for external attachments
size, err := s.cache.AttachmentsSize("127.0.0.1")
size, err := s.messageCache.AttachmentsSize("127.0.0.1")
require.Nil(t, err)
require.Equal(t, int64(0), size)
}