Fix panic in manager when attachment-cache-dir
is not set, fixes #617
This commit is contained in:
parent
7fb6f794e5
commit
38e7801b41
5 changed files with 68 additions and 30 deletions
|
@ -536,7 +536,7 @@ func (c *messageCache) ExpireMessages(topics ...string) error {
|
|||
}
|
||||
defer tx.Rollback()
|
||||
for _, t := range topics {
|
||||
if _, err := tx.Exec(updateMessagesForTopicExpiryQuery, time.Now().Unix(), t); err != nil {
|
||||
if _, err := tx.Exec(updateMessagesForTopicExpiryQuery, time.Now().Unix()-1, t); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -506,6 +506,7 @@ func (s *Server) maybeRemoveMessagesAndExcessReservations(r *http.Request, v *vi
|
|||
if err := s.messageCache.ExpireMessages(topics...); err != nil {
|
||||
return err
|
||||
}
|
||||
go s.pruneMessages()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -116,29 +116,30 @@ func (s *Server) pruneTokens() {
|
|||
}
|
||||
|
||||
func (s *Server) pruneAttachments() {
|
||||
if s.fileCache != nil {
|
||||
log.
|
||||
Tag(tagManager).
|
||||
Timing(func() {
|
||||
ids, err := s.messageCache.AttachmentsExpired()
|
||||
if err != nil {
|
||||
log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments")
|
||||
} else if len(ids) > 0 {
|
||||
if log.Tag(tagManager).IsDebug() {
|
||||
log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", "))
|
||||
}
|
||||
if err := s.fileCache.Remove(ids...); err != nil {
|
||||
log.Tag(tagManager).Err(err).Warn("Error deleting attachments")
|
||||
}
|
||||
if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil {
|
||||
log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
|
||||
}
|
||||
} else {
|
||||
log.Tag(tagManager).Debug("No expired attachments to delete")
|
||||
}
|
||||
}).
|
||||
Debug("Deleted expired attachments")
|
||||
if s.fileCache == nil {
|
||||
return
|
||||
}
|
||||
log.
|
||||
Tag(tagManager).
|
||||
Timing(func() {
|
||||
ids, err := s.messageCache.AttachmentsExpired()
|
||||
if err != nil {
|
||||
log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments")
|
||||
} else if len(ids) > 0 {
|
||||
if log.Tag(tagManager).IsDebug() {
|
||||
log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", "))
|
||||
}
|
||||
if err := s.fileCache.Remove(ids...); err != nil {
|
||||
log.Tag(tagManager).Err(err).Warn("Error deleting attachments")
|
||||
}
|
||||
if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil {
|
||||
log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
|
||||
}
|
||||
} else {
|
||||
log.Tag(tagManager).Debug("No expired attachments to delete")
|
||||
}
|
||||
}).
|
||||
Debug("Deleted expired attachments")
|
||||
}
|
||||
|
||||
func (s *Server) pruneMessages() {
|
||||
|
@ -149,8 +150,10 @@ func (s *Server) pruneMessages() {
|
|||
if err != nil {
|
||||
log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages")
|
||||
} else if len(expiredMessageIDs) > 0 {
|
||||
if err := s.fileCache.Remove(expiredMessageIDs...); err != nil {
|
||||
log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages")
|
||||
if s.fileCache != nil {
|
||||
if err := s.fileCache.Remove(expiredMessageIDs...); err != nil {
|
||||
log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages")
|
||||
}
|
||||
}
|
||||
if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil {
|
||||
log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
|
||||
|
|
28
server/server_manager_test.go
Normal file
28
server/server_manager_test.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestServer_Manager_Prune_Messages_Without_Attachments_DoesNotPanic(t *testing.T) {
|
||||
// Tests that the manager runs without attachment-cache-dir set, see #617
|
||||
c := newTestConfig(t)
|
||||
c.AttachmentCacheDir = ""
|
||||
s := newTestServer(t, c)
|
||||
|
||||
// Publish a message
|
||||
rr := request(t, s, "POST", "/mytopic", "hi", nil)
|
||||
require.Equal(t, 200, rr.Code)
|
||||
m := toMessage(t, rr.Body.String())
|
||||
|
||||
// Expire message
|
||||
require.Nil(t, s.messageCache.ExpireMessages("mytopic"))
|
||||
|
||||
// Does not panic
|
||||
s.pruneMessages()
|
||||
|
||||
// Actually deleted
|
||||
_, err := s.messageCache.Message(m.ID)
|
||||
require.Equal(t, errMessageNotFound, err)
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue