Fix another race, add test

This commit is contained in:
Philipp Heckel 2022-06-22 15:11:50 -04:00
parent b1089e21f9
commit 3334d84861
2 changed files with 40 additions and 1 deletions

View file

@ -1165,8 +1165,11 @@ func (s *Server) updateStatsAndPrune() {
} }
// Print stats // Print stats
s.mu.Lock()
messagesCount, topicsCount, visitorsCount := s.messages, len(s.topics), len(s.visitors)
s.mu.Unlock()
log.Info("Stats: %d messages published, %d in cache, %d topic(s) active, %d subscriber(s), %d visitor(s), %d mails received (%d successful, %d failed), %d mails sent (%d successful, %d failed)", log.Info("Stats: %d messages published, %d in cache, %d topic(s) active, %d subscriber(s), %d visitor(s), %d mails received (%d successful, %d failed), %d mails sent (%d successful, %d failed)",
s.messages, messages, len(s.topics), subscribers, len(s.visitors), messagesCount, messages, topicsCount, subscribers, visitorsCount,
receivedMailTotal, receivedMailSuccess, receivedMailFailure, receivedMailTotal, receivedMailSuccess, receivedMailFailure,
sentMailTotal, sentMailSuccess, sentMailFailure) sentMailTotal, sentMailSuccess, sentMailFailure)
} }

View file

@ -7,6 +7,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"log"
"math/rand" "math/rand"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -1409,6 +1410,41 @@ func TestServer_Visitor_XForwardedFor_Multiple(t *testing.T) {
require.Equal(t, "234.5.2.1", v.ip) require.Equal(t, "234.5.2.1", v.ip)
} }
func TestServer_PublishWhileUpdatingStatsWithLotsOfMessages(t *testing.T) {
count := 1000
s := newTestServer(t, newTestConfig(t))
// Add lots of messages
log.Printf("Adding %d messages", count)
start := time.Now()
for i := 0; i < count; i++ {
require.Nil(t, s.messageCache.AddMessage(newDefaultMessage(fmt.Sprintf("topic%d", i), "some message")))
}
log.Printf("Done: Adding %d messages; took %s", count, time.Since(start).Round(time.Millisecond))
// Update stats
statsChan := make(chan bool)
go func() {
log.Printf("Updating stats")
start := time.Now()
s.updateStatsAndPrune()
log.Printf("Done: Updating stats; took %s", time.Since(start).Round(time.Millisecond))
statsChan <- true
}()
time.Sleep(50 * time.Millisecond) // Make sure it starts first
// Publish message (during stats update)
log.Printf("Publishing message")
start = time.Now()
response := request(t, s, "PUT", "/mytopic", "some body", nil)
m := toMessage(t, response.Body.String())
require.Equal(t, "some body", m.Message)
require.True(t, time.Since(start) < 500*time.Millisecond)
log.Printf("Done: Publishing message; took %s", time.Since(start).Round(time.Millisecond))
<-statsChan
}
func newTestConfig(t *testing.T) *Config { func newTestConfig(t *testing.T) *Config {
conf := NewConfig() conf := NewConfig()
conf.BaseURL = "http://127.0.0.1:12345" conf.BaseURL = "http://127.0.0.1:12345"