Rename cache to messageCache

This commit is contained in:
Philipp Heckel 2022-02-27 14:47:28 -05:00
parent e79dbf4d00
commit 4cd30c35ce
4 changed files with 50 additions and 49 deletions

View file

@ -168,12 +168,13 @@ const (
` `
) )
type sqliteCache struct { type messageCache struct {
db *sql.DB db *sql.DB
nop bool nop bool
} }
func newSqliteCache(filename string, nop bool) (*sqliteCache, error) { // newSqliteCache creates a SQLite file-backed cache
func newSqliteCache(filename string, nop bool) (*messageCache, error) {
db, err := sql.Open("sqlite3", filename) db, err := sql.Open("sqlite3", filename)
if err != nil { if err != nil {
return nil, err return nil, err
@ -181,20 +182,20 @@ func newSqliteCache(filename string, nop bool) (*sqliteCache, error) {
if err := setupCacheDB(db); err != nil { if err := setupCacheDB(db); err != nil {
return nil, err return nil, err
} }
return &sqliteCache{ return &messageCache{
db: db, db: db,
nop: nop, nop: nop,
}, nil }, nil
} }
// newMemCache creates an in-memory cache // newMemCache creates an in-memory cache
func newMemCache() (*sqliteCache, error) { func newMemCache() (*messageCache, error) {
return newSqliteCache(createMemoryFilename(), false) return newSqliteCache(createMemoryFilename(), false)
} }
// newNopCache creates an in-memory cache that discards all messages; // newNopCache creates an in-memory cache that discards all messages;
// it is always empty and can be used if caching is entirely disabled // it is always empty and can be used if caching is entirely disabled
func newNopCache() (*sqliteCache, error) { func newNopCache() (*messageCache, error) {
return newSqliteCache(createMemoryFilename(), true) return newSqliteCache(createMemoryFilename(), true)
} }
@ -208,7 +209,7 @@ func createMemoryFilename() string {
return fmt.Sprintf("file:%s?mode=memory&cache=shared", util.RandomString(10)) return fmt.Sprintf("file:%s?mode=memory&cache=shared", util.RandomString(10))
} }
func (c *sqliteCache) AddMessage(m *message) error { func (c *messageCache) AddMessage(m *message) error {
if m.Event != messageEvent { if m.Event != messageEvent {
return errUnexpectedMessageType return errUnexpectedMessageType
} }
@ -249,7 +250,7 @@ func (c *sqliteCache) AddMessage(m *message) error {
return err return err
} }
func (c *sqliteCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) { func (c *messageCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
if since.IsNone() { if since.IsNone() {
return make([]*message, 0), nil return make([]*message, 0), nil
} else if since.IsID() { } else if since.IsID() {
@ -258,7 +259,7 @@ func (c *sqliteCache) Messages(topic string, since sinceMarker, scheduled bool)
return c.messagesSinceTime(topic, since, scheduled) return c.messagesSinceTime(topic, since, scheduled)
} }
func (c *sqliteCache) messagesSinceTime(topic string, since sinceMarker, scheduled bool) ([]*message, error) { func (c *messageCache) messagesSinceTime(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
var rows *sql.Rows var rows *sql.Rows
var err error var err error
if scheduled { if scheduled {
@ -272,7 +273,7 @@ func (c *sqliteCache) messagesSinceTime(topic string, since sinceMarker, schedul
return readMessages(rows) return readMessages(rows)
} }
func (c *sqliteCache) messagesSinceID(topic string, since sinceMarker, scheduled bool) ([]*message, error) { func (c *messageCache) messagesSinceID(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
idrows, err := c.db.Query(selectRowIDFromMessageID, topic, since.ID()) idrows, err := c.db.Query(selectRowIDFromMessageID, topic, since.ID())
if err != nil { if err != nil {
return nil, err return nil, err
@ -298,7 +299,7 @@ func (c *sqliteCache) messagesSinceID(topic string, since sinceMarker, scheduled
return readMessages(rows) return readMessages(rows)
} }
func (c *sqliteCache) MessagesDue() ([]*message, error) { func (c *messageCache) MessagesDue() ([]*message, error) {
rows, err := c.db.Query(selectMessagesDueQuery, time.Now().Unix()) rows, err := c.db.Query(selectMessagesDueQuery, time.Now().Unix())
if err != nil { if err != nil {
return nil, err return nil, err
@ -306,12 +307,12 @@ func (c *sqliteCache) MessagesDue() ([]*message, error) {
return readMessages(rows) return readMessages(rows)
} }
func (c *sqliteCache) MarkPublished(m *message) error { func (c *messageCache) MarkPublished(m *message) error {
_, err := c.db.Exec(updateMessagePublishedQuery, m.ID) _, err := c.db.Exec(updateMessagePublishedQuery, m.ID)
return err return err
} }
func (c *sqliteCache) MessageCount(topic string) (int, error) { func (c *messageCache) MessageCount(topic string) (int, error) {
rows, err := c.db.Query(selectMessageCountForTopicQuery, topic) rows, err := c.db.Query(selectMessageCountForTopicQuery, topic)
if err != nil { if err != nil {
return 0, err return 0, err
@ -329,7 +330,7 @@ func (c *sqliteCache) MessageCount(topic string) (int, error) {
return count, nil return count, nil
} }
func (c *sqliteCache) Topics() (map[string]*topic, error) { func (c *messageCache) Topics() (map[string]*topic, error) {
rows, err := c.db.Query(selectTopicsQuery) rows, err := c.db.Query(selectTopicsQuery)
if err != nil { if err != nil {
return nil, err return nil, err
@ -349,12 +350,12 @@ func (c *sqliteCache) Topics() (map[string]*topic, error) {
return topics, nil return topics, nil
} }
func (c *sqliteCache) Prune(olderThan time.Time) error { func (c *messageCache) Prune(olderThan time.Time) error {
_, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix()) _, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix())
return err return err
} }
func (c *sqliteCache) AttachmentsSize(owner string) (int64, error) { func (c *messageCache) AttachmentsSize(owner string) (int64, error) {
rows, err := c.db.Query(selectAttachmentsSizeQuery, owner, time.Now().Unix()) rows, err := c.db.Query(selectAttachmentsSizeQuery, owner, time.Now().Unix())
if err != nil { if err != nil {
return 0, err return 0, err
@ -372,7 +373,7 @@ func (c *sqliteCache) AttachmentsSize(owner string) (int64, error) {
return size, nil return size, nil
} }
func (c *sqliteCache) AttachmentsExpired() ([]string, error) { func (c *messageCache) AttachmentsExpired() ([]string, error) {
rows, err := c.db.Query(selectAttachmentsExpiredQuery, time.Now().Unix()) rows, err := c.db.Query(selectAttachmentsExpiredQuery, time.Now().Unix())
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -18,7 +18,7 @@ func TestMemCache_Messages(t *testing.T) {
testCacheMessages(t, newMemTestCache(t)) testCacheMessages(t, newMemTestCache(t))
} }
func testCacheMessages(t *testing.T, c *sqliteCache) { func testCacheMessages(t *testing.T, c *messageCache) {
m1 := newDefaultMessage("mytopic", "my message") m1 := newDefaultMessage("mytopic", "my message")
m1.Time = 1 m1.Time = 1
@ -92,7 +92,7 @@ func TestMemCache_MessagesScheduled(t *testing.T) {
testCacheMessagesScheduled(t, newMemTestCache(t)) testCacheMessagesScheduled(t, newMemTestCache(t))
} }
func testCacheMessagesScheduled(t *testing.T, c *sqliteCache) { func testCacheMessagesScheduled(t *testing.T, c *messageCache) {
m1 := newDefaultMessage("mytopic", "message 1") m1 := newDefaultMessage("mytopic", "message 1")
m2 := newDefaultMessage("mytopic", "message 2") m2 := newDefaultMessage("mytopic", "message 2")
m2.Time = time.Now().Add(time.Hour).Unix() m2.Time = time.Now().Add(time.Hour).Unix()
@ -126,7 +126,7 @@ func TestMemCache_Topics(t *testing.T) {
testCacheTopics(t, newMemTestCache(t)) testCacheTopics(t, newMemTestCache(t))
} }
func testCacheTopics(t *testing.T, c *sqliteCache) { func testCacheTopics(t *testing.T, c *messageCache) {
require.Nil(t, c.AddMessage(newDefaultMessage("topic1", "my example message"))) require.Nil(t, c.AddMessage(newDefaultMessage("topic1", "my example message")))
require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 1"))) require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 1")))
require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 2"))) require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 2")))
@ -149,7 +149,7 @@ func TestMemCache_MessagesTagsPrioAndTitle(t *testing.T) {
testCacheMessagesTagsPrioAndTitle(t, newMemTestCache(t)) testCacheMessagesTagsPrioAndTitle(t, newMemTestCache(t))
} }
func testCacheMessagesTagsPrioAndTitle(t *testing.T, c *sqliteCache) { func testCacheMessagesTagsPrioAndTitle(t *testing.T, c *messageCache) {
m := newDefaultMessage("mytopic", "some message") m := newDefaultMessage("mytopic", "some message")
m.Tags = []string{"tag1", "tag2"} m.Tags = []string{"tag1", "tag2"}
m.Priority = 5 m.Priority = 5
@ -170,7 +170,7 @@ func TestMemCache_MessagesSinceID(t *testing.T) {
testCacheMessagesSinceID(t, newMemTestCache(t)) testCacheMessagesSinceID(t, newMemTestCache(t))
} }
func testCacheMessagesSinceID(t *testing.T, c *sqliteCache) { func testCacheMessagesSinceID(t *testing.T, c *messageCache) {
m1 := newDefaultMessage("mytopic", "message 1") m1 := newDefaultMessage("mytopic", "message 1")
m1.Time = 100 m1.Time = 100
m2 := newDefaultMessage("mytopic", "message 2") m2 := newDefaultMessage("mytopic", "message 2")
@ -240,7 +240,7 @@ func TestMemCache_Prune(t *testing.T) {
testCachePrune(t, newMemTestCache(t)) testCachePrune(t, newMemTestCache(t))
} }
func testCachePrune(t *testing.T, c *sqliteCache) { func testCachePrune(t *testing.T, c *messageCache) {
m1 := newDefaultMessage("mytopic", "my message") m1 := newDefaultMessage("mytopic", "my message")
m1.Time = 1 m1.Time = 1
@ -277,7 +277,7 @@ func TestMemCache_Attachments(t *testing.T) {
testCacheAttachments(t, newMemTestCache(t)) testCacheAttachments(t, newMemTestCache(t))
} }
func testCacheAttachments(t *testing.T, c *sqliteCache) { func testCacheAttachments(t *testing.T, c *messageCache) {
expires1 := time.Now().Add(-4 * time.Hour).Unix() expires1 := time.Now().Add(-4 * time.Hour).Unix()
m := newDefaultMessage("mytopic", "flower for you") m := newDefaultMessage("mytopic", "flower for you")
m.ID = "m1" m.ID = "m1"
@ -467,7 +467,7 @@ func TestMemCache_NopCache(t *testing.T) {
assert.Empty(t, topics) assert.Empty(t, topics)
} }
func newSqliteTestCache(t *testing.T) *sqliteCache { func newSqliteTestCache(t *testing.T) *messageCache {
c, err := newSqliteCache(newSqliteTestCacheFile(t), false) c, err := newSqliteCache(newSqliteTestCacheFile(t), false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -479,7 +479,7 @@ func newSqliteTestCacheFile(t *testing.T) string {
return filepath.Join(t.TempDir(), "cache.db") return filepath.Join(t.TempDir(), "cache.db")
} }
func newSqliteTestCacheFromFile(t *testing.T, filename string) *sqliteCache { func newSqliteTestCacheFromFile(t *testing.T, filename string) *messageCache {
c, err := newSqliteCache(filename, false) c, err := newSqliteCache(filename, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -487,7 +487,7 @@ func newSqliteTestCacheFromFile(t *testing.T, filename string) *sqliteCache {
return c return c
} }
func newMemTestCache(t *testing.T) *sqliteCache { func newMemTestCache(t *testing.T) *messageCache {
c, err := newMemCache() c, err := newMemCache()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View file

@ -45,7 +45,7 @@ type Server struct {
mailer mailer mailer mailer
messages int64 messages int64
auth auth.Auther auth auth.Auther
cache *sqliteCache messageCache *messageCache
fileCache *fileCache fileCache *fileCache
closeChan chan bool closeChan chan bool
mu sync.Mutex mu sync.Mutex
@ -118,11 +118,11 @@ func New(conf *Config) (*Server, error) {
if conf.SMTPSenderAddr != "" { if conf.SMTPSenderAddr != "" {
mailer = &smtpSender{config: conf} mailer = &smtpSender{config: conf}
} }
cache, err := createCache(conf) messageCache, err := createMessageCache(conf)
if err != nil { if err != nil {
return nil, err return nil, err
} }
topics, err := cache.Topics() topics, err := messageCache.Topics()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -149,18 +149,18 @@ func New(conf *Config) (*Server, error) {
} }
} }
return &Server{ return &Server{
config: conf, config: conf,
cache: cache, messageCache: messageCache,
fileCache: fileCache, fileCache: fileCache,
firebase: firebaseSubscriber, firebase: firebaseSubscriber,
mailer: mailer, mailer: mailer,
topics: topics, topics: topics,
auth: auther, auth: auther,
visitors: make(map[string]*visitor), visitors: make(map[string]*visitor),
}, nil }, nil
} }
func createCache(conf *Config) (*sqliteCache, error) { func createMessageCache(conf *Config) (*messageCache, error) {
if conf.CacheDuration == 0 { if conf.CacheDuration == 0 {
return newNopCache() return newNopCache()
} else if conf.CacheFile != "" { } else if conf.CacheFile != "" {
@ -416,7 +416,7 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
}() }()
} }
if cache { if cache {
if err := s.cache.AddMessage(m); err != nil { if err := s.messageCache.AddMessage(m); err != nil {
return err return err
} }
} }
@ -566,7 +566,7 @@ func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *message,
} else if m.Time > time.Now().Add(s.config.AttachmentExpiryDuration).Unix() { } else if m.Time > time.Now().Add(s.config.AttachmentExpiryDuration).Unix() {
return errHTTPBadRequestAttachmentsExpiryBeforeDelivery return errHTTPBadRequestAttachmentsExpiryBeforeDelivery
} }
visitorAttachmentsSize, err := s.cache.AttachmentsSize(v.ip) visitorAttachmentsSize, err := s.messageCache.AttachmentsSize(v.ip)
if err != nil { if err != nil {
return err return err
} }
@ -824,7 +824,7 @@ func (s *Server) sendOldMessages(topics []*topic, since sinceMarker, scheduled b
return nil return nil
} }
for _, t := range topics { for _, t := range topics {
messages, err := s.cache.Messages(t.ID, since, scheduled) messages, err := s.messageCache.Messages(t.ID, since, scheduled)
if err != nil { if err != nil {
return err return err
} }
@ -930,7 +930,7 @@ func (s *Server) updateStatsAndPrune() {
// Delete expired attachments // Delete expired attachments
if s.fileCache != nil { if s.fileCache != nil {
ids, err := s.cache.AttachmentsExpired() ids, err := s.messageCache.AttachmentsExpired()
if err == nil { if err == nil {
if err := s.fileCache.Remove(ids...); err != nil { if err := s.fileCache.Remove(ids...); err != nil {
log.Printf("error while deleting attachments: %s", err.Error()) log.Printf("error while deleting attachments: %s", err.Error())
@ -942,7 +942,7 @@ func (s *Server) updateStatsAndPrune() {
// Prune message cache // Prune message cache
olderThan := time.Now().Add(-1 * s.config.CacheDuration) olderThan := time.Now().Add(-1 * s.config.CacheDuration)
if err := s.cache.Prune(olderThan); err != nil { if err := s.messageCache.Prune(olderThan); err != nil {
log.Printf("error pruning cache: %s", err.Error()) log.Printf("error pruning cache: %s", err.Error())
} }
@ -950,7 +950,7 @@ func (s *Server) updateStatsAndPrune() {
var subscribers, messages int var subscribers, messages int
for _, t := range s.topics { for _, t := range s.topics {
subs := t.Subscribers() subs := t.Subscribers()
msgs, err := s.cache.MessageCount(t.ID) msgs, err := s.messageCache.MessageCount(t.ID)
if err != nil { if err != nil {
log.Printf("cannot get stats for topic %s: %s", t.ID, err.Error()) log.Printf("cannot get stats for topic %s: %s", t.ID, err.Error())
continue continue
@ -1046,7 +1046,7 @@ func (s *Server) runFirebaseKeepaliver() {
func (s *Server) sendDelayedMessages() error { func (s *Server) sendDelayedMessages() error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
messages, err := s.cache.MessagesDue() messages, err := s.messageCache.MessagesDue()
if err != nil { if err != nil {
return err return err
} }
@ -1062,7 +1062,7 @@ func (s *Server) sendDelayedMessages() error {
log.Printf("unable to publish to Firebase: %v", err.Error()) log.Printf("unable to publish to Firebase: %v", err.Error())
} }
} }
if err := s.cache.MarkPublished(m); err != nil { if err := s.messageCache.MarkPublished(m); err != nil {
return err return err
} }
} }

View file

@ -863,7 +863,7 @@ func TestServer_PublishAttachment(t *testing.T) {
require.Equal(t, content, response.Body.String()) require.Equal(t, content, response.Body.String())
// Slightly unrelated cross-test: make sure we add an owner for internal attachments // Slightly unrelated cross-test: make sure we add an owner for internal attachments
size, err := s.cache.AttachmentsSize("9.9.9.9") // See request() size, err := s.messageCache.AttachmentsSize("9.9.9.9") // See request()
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, int64(5000), size) require.Equal(t, int64(5000), size)
} }
@ -892,7 +892,7 @@ func TestServer_PublishAttachmentShortWithFilename(t *testing.T) {
require.Equal(t, content, response.Body.String()) require.Equal(t, content, response.Body.String())
// Slightly unrelated cross-test: make sure we add an owner for internal attachments // Slightly unrelated cross-test: make sure we add an owner for internal attachments
size, err := s.cache.AttachmentsSize("1.2.3.4") size, err := s.messageCache.AttachmentsSize("1.2.3.4")
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, int64(21), size) require.Equal(t, int64(21), size)
} }
@ -912,7 +912,7 @@ func TestServer_PublishAttachmentExternalWithoutFilename(t *testing.T) {
require.Equal(t, "", msg.Attachment.Owner) require.Equal(t, "", msg.Attachment.Owner)
// Slightly unrelated cross-test: make sure we don't add an owner for external attachments // Slightly unrelated cross-test: make sure we don't add an owner for external attachments
size, err := s.cache.AttachmentsSize("127.0.0.1") size, err := s.messageCache.AttachmentsSize("127.0.0.1")
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, int64(0), size) require.Equal(t, int64(0), size)
} }