Self-review, round 2

This commit is contained in:
binwiederhier 2023-02-09 15:24:12 -05:00
parent bcb22d8d4c
commit e6bb5f484c
24 changed files with 288 additions and 183 deletions

View file

@ -164,6 +164,7 @@ func NewConfig() *Config {
AttachmentExpiryDuration: DefaultAttachmentExpiryDuration,
KeepaliveInterval: DefaultKeepaliveInterval,
ManagerInterval: DefaultManagerInterval,
DisallowedTopics: DefaultDisallowedTopics,
WebRootIsApp: false,
DelayedSenderInterval: DefaultDelayedSenderInterval,
FirebaseKeepaliveInterval: DefaultFirebaseKeepaliveInterval,

View file

@ -51,6 +51,8 @@ const (
CREATE INDEX IF NOT EXISTS idx_time ON messages (time);
CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
CREATE INDEX IF NOT EXISTS idx_expires ON messages (expires);
CREATE INDEX IF NOT EXISTS idx_sender ON messages (sender);
CREATE INDEX IF NOT EXISTS idx_user ON messages (user);
CREATE INDEX IF NOT EXISTS idx_attachment_expires ON messages (attachment_expires);
COMMIT;
`
@ -215,6 +217,8 @@ const (
ALTER TABLE messages ADD COLUMN attachment_deleted INT NOT NULL DEFAULT('0');
ALTER TABLE messages ADD COLUMN expires INT NOT NULL DEFAULT('0');
CREATE INDEX IF NOT EXISTS idx_expires ON messages (expires);
CREATE INDEX IF NOT EXISTS idx_sender ON messages (sender);
CREATE INDEX IF NOT EXISTS idx_user ON messages (user);
CREATE INDEX IF NOT EXISTS idx_attachment_expires ON messages (attachment_expires);
`
migrate9To10UpdateMessageExpiryQuery = `UPDATE messages SET expires = time + ?`
@ -883,8 +887,5 @@ func migrateFrom9(db *sql.DB, cacheDuration time.Duration) error {
if _, err := tx.Exec(updateSchemaVersion, 10); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
return nil // Update this when a new version is added
return tx.Commit()
}

View file

@ -37,12 +37,13 @@ import (
- HIGH Docs
- tiers
- api
- tokens
- HIGH Self-review
- MEDIUM: Test for expiring messages after reservation removal
- MEDIUM: uploading attachments leads to 404 -- race
- MEDIUM: Do not call tiers endoint when payments is not enabled
- MEDIUM: Test new token endpoints & never-expiring token
- LOW: UI: Flickering upgrade banner when logging in
- LOW: Menu item -> popup click should not open page
*/
@ -140,6 +141,7 @@ const (
const (
tagStartup = "startup"
tagPublish = "publish"
tagSubscribe = "subscribe"
tagFirebase = "firebase"
tagEmail = "email" // Send email
tagSMTP = "smtp" // Receive email
@ -649,7 +651,7 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes
}
u := v.User()
if s.userManager != nil && u != nil && u.Tier != nil {
go s.userManager.EnqueueStats(u.ID, v.Stats())
go s.userManager.EnqueueUserStats(u.ID, v.Stats())
}
s.mu.Lock()
s.messages++
@ -956,8 +958,8 @@ func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *v
}
func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *visitor, contentType string, encoder messageEncoder) error {
logvr(v, r).Debug("HTTP stream connection opened")
defer logvr(v, r).Debug("HTTP stream connection closed")
logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection opened")
defer logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection closed")
if !v.SubscriptionAllowed() {
return errHTTPTooManyRequestsLimitSubscriptions
}
@ -1025,7 +1027,7 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *
case <-r.Context().Done():
return nil
case <-time.After(s.config.KeepaliveInterval):
logvr(v, r).Trace("Sending keepalive message")
logvr(v, r).Tag(tagSubscribe).Trace("Sending keepalive message")
v.Keepalive()
if err := sub(v, newKeepaliveMessage(topicsStr)); err != nil { // Send keepalive message
return err
@ -1283,70 +1285,86 @@ func (s *Server) topicFromID(id string) (*topic, error) {
}
func (s *Server) execManager() {
log.Tag(tagManager).Debug("Starting manager")
defer log.Tag(tagManager).Debug("Finished manager")
// WARNING: Make sure to only selectively lock with the mutex, and be aware that this
// there is no mutex for the entire function.
// Expire visitors from rate visitors map
s.mu.Lock()
staleVisitors := 0
for ip, v := range s.visitors {
if v.Stale() {
log.Tag(tagManager).With(v).Trace("Deleting stale visitor")
delete(s.visitors, ip)
staleVisitors++
}
}
s.mu.Unlock()
log.Tag(tagManager).Field("stale_visitors", staleVisitors).Debug("Deleted %d stale visitor(s)", staleVisitors)
log.
Tag(tagManager).
Timing(func() {
s.mu.Lock()
defer s.mu.Unlock()
for ip, v := range s.visitors {
if v.Stale() {
log.Tag(tagManager).With(v).Trace("Deleting stale visitor")
delete(s.visitors, ip)
staleVisitors++
}
}
}).
Field("stale_visitors", staleVisitors).
Debug("Deleted %d stale visitor(s)", staleVisitors)
// Delete expired user tokens and users
if s.userManager != nil {
if err := s.userManager.RemoveExpiredTokens(); err != nil {
log.Tag(tagManager).Err(err).Warn("Error expiring user tokens")
}
if err := s.userManager.RemoveDeletedUsers(); err != nil {
log.Tag(tagManager).Err(err).Warn("Error deleting soft-deleted users")
}
log.
Tag(tagManager).
Timing(func() {
if err := s.userManager.RemoveExpiredTokens(); err != nil {
log.Tag(tagManager).Err(err).Warn("Error expiring user tokens")
}
if err := s.userManager.RemoveDeletedUsers(); err != nil {
log.Tag(tagManager).Err(err).Warn("Error deleting soft-deleted users")
}
}).
Debug("Removed expired tokens and users")
}
// Delete expired attachments
if s.fileCache != nil {
ids, err := s.messageCache.AttachmentsExpired()
if err != nil {
log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments")
} else if len(ids) > 0 {
if log.Tag(tagManager).IsDebug() {
log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", "))
}
if err := s.fileCache.Remove(ids...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error deleting attachments")
}
if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
}
} else {
log.Tag(tagManager).Debug("No expired attachments to delete")
}
log.
Tag(tagManager).
Timing(func() {
ids, err := s.messageCache.AttachmentsExpired()
if err != nil {
log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments")
} else if len(ids) > 0 {
if log.Tag(tagManager).IsDebug() {
log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", "))
}
if err := s.fileCache.Remove(ids...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error deleting attachments")
}
if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
}
} else {
log.Tag(tagManager).Debug("No expired attachments to delete")
}
}).
Debug("Deleted expired attachments")
}
// Prune messages
log.Tag(tagManager).Debug("Manager: Pruning messages")
expiredMessageIDs, err := s.messageCache.MessagesExpired()
if err != nil {
log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages")
} else if len(expiredMessageIDs) > 0 {
if err := s.fileCache.Remove(expiredMessageIDs...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages")
}
if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
}
} else {
log.Tag(tagManager).Debug("No expired messages to delete")
}
log.
Tag(tagManager).
Timing(func() {
expiredMessageIDs, err := s.messageCache.MessagesExpired()
if err != nil {
log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages")
} else if len(expiredMessageIDs) > 0 {
if err := s.fileCache.Remove(expiredMessageIDs...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages")
}
if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
}
} else {
log.Tag(tagManager).Debug("No expired messages to delete")
}
}).
Debug("Pruned messages")
// Message count per topic
var messagesCached int
@ -1360,20 +1378,26 @@ func (s *Server) execManager() {
}
// Remove subscriptions without subscribers
s.mu.Lock()
var subscribers int
for _, t := range s.topics {
subs := t.SubscribersCount()
log.Tag(tagManager).Trace("- topic %s: %d subscribers", t.ID, subs)
msgs, exists := messageCounts[t.ID]
if subs == 0 && (!exists || msgs == 0) {
log.Tag(tagManager).Trace("Deleting empty topic %s", t.ID)
delete(s.topics, t.ID)
continue
}
subscribers += subs
}
s.mu.Unlock()
var emptyTopics, subscribers int
log.
Tag(tagManager).
Timing(func() {
s.mu.Lock()
defer s.mu.Unlock()
for _, t := range s.topics {
subs := t.SubscribersCount()
log.Tag(tagManager).Trace("- topic %s: %d subscribers", t.ID, subs)
msgs, exists := messageCounts[t.ID]
if subs == 0 && (!exists || msgs == 0) {
log.Tag(tagManager).Trace("Deleting empty topic %s", t.ID)
emptyTopics++
delete(s.topics, t.ID)
continue
}
subscribers += subs
}
}).
Debug("Removed %d empty topic(s)", emptyTopics)
// Mail stats
var receivedMailTotal, receivedMailSuccess, receivedMailFailure int64
@ -1407,6 +1431,10 @@ func (s *Server) execManager() {
Info("Server stats")
}
func (s *Server) expireVisitors() {
}
func (s *Server) runSMTPServer() error {
s.smtpServerBackend = newMailBackend(s.config, s.handle)
s.smtpServer = smtp.NewServer(s.smtpServerBackend)
@ -1424,7 +1452,10 @@ func (s *Server) runManager() {
for {
select {
case <-time.After(s.config.ManagerInterval):
s.execManager()
log.
Tag(tagManager).
Timing(s.execManager).
Debug("Manager finished")
case <-s.closeChan:
return
}

View file

@ -314,7 +314,7 @@ func (s *Server) handleAccountSettingsChange(w http.ResponseWriter, r *http.Requ
}
}
logvr(v, r).Tag(tagAccount).Debug("Changing account settings for user %s", u.Name)
if err := s.userManager.ChangeSettings(u); err != nil {
if err := s.userManager.ChangeSettings(u.ID, prefs); err != nil {
return err
}
return s.writeJSON(w, newSuccessResponse())
@ -338,7 +338,8 @@ func (s *Server) handleAccountSubscriptionAdd(w http.ResponseWriter, r *http.Req
}
if newSubscription.ID == "" {
newSubscription.ID = util.RandomStringPrefix(subscriptionIDPrefix, subscriptionIDLength)
u.Prefs.Subscriptions = append(u.Prefs.Subscriptions, newSubscription)
prefs := u.Prefs
prefs.Subscriptions = append(prefs.Subscriptions, newSubscription)
logvr(v, r).
Tag(tagAccount).
Fields(log.Context{
@ -346,7 +347,7 @@ func (s *Server) handleAccountSubscriptionAdd(w http.ResponseWriter, r *http.Req
"topic": newSubscription.Topic,
}).
Debug("Adding subscription for user %s", u.Name)
if err := s.userManager.ChangeSettings(u); err != nil {
if err := s.userManager.ChangeSettings(u.ID, prefs); err != nil {
return err
}
}
@ -367,8 +368,9 @@ func (s *Server) handleAccountSubscriptionChange(w http.ResponseWriter, r *http.
if u.Prefs == nil || u.Prefs.Subscriptions == nil {
return errHTTPNotFound
}
prefs := u.Prefs
var subscription *user.Subscription
for _, sub := range u.Prefs.Subscriptions {
for _, sub := range prefs.Subscriptions {
if sub.ID == subscriptionID {
sub.DisplayName = updatedSubscription.DisplayName
subscription = sub
@ -386,7 +388,7 @@ func (s *Server) handleAccountSubscriptionChange(w http.ResponseWriter, r *http.
"display_name": subscription.DisplayName,
}).
Debug("Changing subscription for user %s", u.Name)
if err := s.userManager.ChangeSettings(u); err != nil {
if err := s.userManager.ChangeSettings(u.ID, prefs); err != nil {
return err
}
return s.writeJSON(w, subscription)
@ -417,8 +419,9 @@ func (s *Server) handleAccountSubscriptionDelete(w http.ResponseWriter, r *http.
}
}
if len(newSubscriptions) < len(u.Prefs.Subscriptions) {
u.Prefs.Subscriptions = newSubscriptions
if err := s.userManager.ChangeSettings(u); err != nil {
prefs := u.Prefs
prefs.Subscriptions = newSubscriptions
if err := s.userManager.ChangeSettings(u.ID, prefs); err != nil {
return err
}
}

View file

@ -724,5 +724,5 @@ func TestAccount_Persist_UserStats_After_Tier_Change(t *testing.T) {
time.Sleep(300 * time.Millisecond)
u, err = s.userManager.User("phil")
require.Nil(t, err)
require.Equal(t, int64(0), u.Stats.Messages) // v.EnqueueStats had run!
require.Equal(t, int64(0), u.Stats.Messages) // v.EnqueueUserStats had run!
}

View file

@ -938,7 +938,7 @@ func TestServer_DailyMessageQuotaFromDatabase(t *testing.T) {
u, err := s.userManager.User("phil")
require.Nil(t, err)
s.userManager.EnqueueStats(u.ID, &user.Stats{
s.userManager.EnqueueUserStats(u.ID, &user.Stats{
Messages: 123456,
Emails: 999,
})

View file

@ -88,7 +88,7 @@ func (t *topic) CancelSubscribers(exceptUserID string) {
defer t.mu.Unlock()
for _, s := range t.subscribers {
if s.userID != exceptUserID {
log.Field("topic", t.ID).Trace("Canceling subscriber %s", s.userID)
log.Tag(tagSubscribe).Field("topic", t.ID).Debug("Canceling subscriber %s", s.userID)
s.cancel()
}
}

View file

@ -27,7 +27,7 @@ const (
)
// Constants used to convert a tier-user's MessageLimit (see user.Tier) into adequate request limiter
// values (token bucket).
// values (token bucket). This is only used to increase the values in server.yml, never decrease them.
//
// Example: Assuming a user.Tier's MessageLimit is 10,000:
// - the allowed burst is 500 (= 10,000 * 5%), which is < 1000 (the max)
@ -59,7 +59,7 @@ type visitor struct {
subscriptionLimiter *util.FixedLimiter // Fixed limiter for active subscriptions (ongoing connections)
bandwidthLimiter *util.RateLimiter // Limiter for attachment bandwidth downloads
accountLimiter *rate.Limiter // Rate limiter for account creation, may be nil
authLimiter *rate.Limiter // Limiter for incorrect login attempts
authLimiter *rate.Limiter // Limiter for incorrect login attempts, may be nil
firebase time.Time // Next allowed Firebase message
seen time.Time // Last seen time of this visitor (needed for removal of stale visitors)
mu sync.Mutex
@ -360,7 +360,7 @@ func (v *visitor) resetLimitersNoLock(messages, emails int64, enqueueUpdate bool
v.authLimiter = nil // Users are already logged in, no need to limit requests
}
if enqueueUpdate && v.user != nil {
go v.userManager.EnqueueStats(v.user.ID, &user.Stats{
go v.userManager.EnqueueUserStats(v.user.ID, &user.Stats{
Messages: messages,
Emails: emails,
})