Topic expiry attempt

This commit is contained in:
binwiederhier 2023-02-23 16:03:40 -05:00
parent 57e1104afb
commit 8eae44ea61
2 changed files with 11 additions and 15 deletions

View file

@ -40,7 +40,7 @@ func (s *Server) execManager() {
if ev.IsTrace() { if ev.IsTrace() {
expiryMessage := "" expiryMessage := ""
if subs == 0 { if subs == 0 {
expiryTime := time.Until(t.rateVisitorExpires) expiryTime := time.Until(t.expires)
expiryMessage = ", expires in " + expiryTime.String() expiryMessage = ", expires in " + expiryTime.String()
} }
ev.Trace("- topic %s: %d subscribers%s", t.ID, subs, expiryMessage) ev.Trace("- topic %s: %d subscribers%s", t.ID, subs, expiryMessage)

View file

@ -8,17 +8,17 @@ import (
) )
const ( const (
rateVisitorExpiryDuration = 12 * time.Hour topicExpiryDuration = 6 * time.Hour
) )
// topic represents a channel to which subscribers can subscribe, and publishers // topic represents a channel to which subscribers can subscribe, and publishers
// can publish a message // can publish a message
type topic struct { type topic struct {
ID string ID string
subscribers map[int]*topicSubscriber subscribers map[int]*topicSubscriber
rateVisitor *visitor rateVisitor *visitor
rateVisitorExpires time.Time expires time.Time
mu sync.RWMutex mu sync.RWMutex
} }
type topicSubscriber struct { type topicSubscriber struct {
@ -54,25 +54,18 @@ func (t *topic) Subscribe(s subscriber, visitor *visitor, cancel func()) int {
func (t *topic) Stale() bool { func (t *topic) Stale() bool {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
if t.rateVisitorExpires.Before(time.Now()) { return len(t.subscribers) == 0 && t.expires.Before(time.Now())
t.rateVisitor = nil
}
return len(t.subscribers) == 0 && t.rateVisitor == nil
} }
func (t *topic) SetRateVisitor(v *visitor) { func (t *topic) SetRateVisitor(v *visitor) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
t.rateVisitor = v t.rateVisitor = v
t.rateVisitorExpires = time.Now().Add(rateVisitorExpiryDuration)
} }
func (t *topic) RateVisitor() *visitor { func (t *topic) RateVisitor() *visitor {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
if t.rateVisitorExpires.Before(time.Now()) {
t.rateVisitor = nil
}
return t.rateVisitor return t.rateVisitor
} }
@ -81,6 +74,9 @@ func (t *topic) Unsubscribe(id int) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
delete(t.subscribers, id) delete(t.subscribers, id)
if len(t.subscribers) == 0 {
t.expires = time.Now().Add(topicExpiryDuration)
}
} }
// Publish asynchronously publishes to all subscribers // Publish asynchronously publishes to all subscribers