diff --git a/server/server_manager.go b/server/server_manager.go index 6d30635..f232ee0 100644 --- a/server/server_manager.go +++ b/server/server_manager.go @@ -40,7 +40,7 @@ func (s *Server) execManager() { if ev.IsTrace() { expiryMessage := "" if subs == 0 { - expiryTime := time.Until(t.rateVisitorExpires) + expiryTime := time.Until(t.expires) expiryMessage = ", expires in " + expiryTime.String() } ev.Trace("- topic %s: %d subscribers%s", t.ID, subs, expiryMessage) diff --git a/server/topic.go b/server/topic.go index 9b4f757..2bd5a47 100644 --- a/server/topic.go +++ b/server/topic.go @@ -8,17 +8,17 @@ import ( ) const ( - rateVisitorExpiryDuration = 12 * time.Hour + topicExpiryDuration = 6 * time.Hour ) // topic represents a channel to which subscribers can subscribe, and publishers // can publish a message type topic struct { - ID string - subscribers map[int]*topicSubscriber - rateVisitor *visitor - rateVisitorExpires time.Time - mu sync.RWMutex + ID string + subscribers map[int]*topicSubscriber + rateVisitor *visitor + expires time.Time + mu sync.RWMutex } type topicSubscriber struct { @@ -54,25 +54,18 @@ func (t *topic) Subscribe(s subscriber, visitor *visitor, cancel func()) int { func (t *topic) Stale() bool { t.mu.Lock() defer t.mu.Unlock() - if t.rateVisitorExpires.Before(time.Now()) { - t.rateVisitor = nil - } - return len(t.subscribers) == 0 && t.rateVisitor == nil + return len(t.subscribers) == 0 && t.expires.Before(time.Now()) } func (t *topic) SetRateVisitor(v *visitor) { t.mu.Lock() defer t.mu.Unlock() t.rateVisitor = v - t.rateVisitorExpires = time.Now().Add(rateVisitorExpiryDuration) } func (t *topic) RateVisitor() *visitor { t.mu.Lock() defer t.mu.Unlock() - if t.rateVisitorExpires.Before(time.Now()) { - t.rateVisitor = nil - } return t.rateVisitor } @@ -81,6 +74,9 @@ func (t *topic) Unsubscribe(id int) { t.mu.Lock() defer t.mu.Unlock() delete(t.subscribers, id) + if len(t.subscribers) == 0 { + t.expires = time.Now().Add(topicExpiryDuration) + } } // Publish asynchronously publishes to all subscribers