a72c316bf4
A TopicFunc is an interface to let the pubisher decide whether it needs to send a message to a subscriber or not. It returns true if the publisher must send the message and false otherwise. Users of the pubsub package can create a subscriber with a topic function by calling `pubsub.SubscribeTopic`. Message delivery has also been modified to use concurrent channels per subscriber. That way, topic verification and message delivery is not o(N+M) anymore, based on the number of subscribers and topic verification complexity. Using pubsub topics, the API stops controlling the message delivery, delegating that function to a topic generated with the filtering provided by the user. The publisher sends every message to the subscriber if there is no filter, but the api doesn't have to select messages to return anymore. Signed-off-by: David Calavera <david.calavera@gmail.com>
104 lines
2.5 KiB
Go
104 lines
2.5 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// NewPublisher creates a new pub/sub publisher to broadcast messages.
|
|
// The duration is used as the send timeout as to not block the publisher publishing
|
|
// messages to other clients if one client is slow or unresponsive.
|
|
// The buffer is used when creating new channels for subscribers.
|
|
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
|
|
return &Publisher{
|
|
buffer: buffer,
|
|
timeout: publishTimeout,
|
|
subscribers: make(map[subscriber]topicFunc),
|
|
}
|
|
}
|
|
|
|
type subscriber chan interface{}
|
|
type topicFunc func(v interface{}) bool
|
|
|
|
// Publisher is basic pub/sub structure. Allows to send events and subscribe
|
|
// to them. Can be safely used from multiple goroutines.
|
|
type Publisher struct {
|
|
m sync.RWMutex
|
|
buffer int
|
|
timeout time.Duration
|
|
subscribers map[subscriber]topicFunc
|
|
}
|
|
|
|
// Len returns the number of subscribers for the publisher
|
|
func (p *Publisher) Len() int {
|
|
p.m.RLock()
|
|
i := len(p.subscribers)
|
|
p.m.RUnlock()
|
|
return i
|
|
}
|
|
|
|
// Subscribe adds a new subscriber to the publisher returning the channel.
|
|
func (p *Publisher) Subscribe() chan interface{} {
|
|
return p.SubscribeTopic(nil)
|
|
}
|
|
|
|
// SubscribeTopic adds a new subscriber that filters messages sent by a topic.
|
|
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
|
|
ch := make(chan interface{}, p.buffer)
|
|
p.m.Lock()
|
|
p.subscribers[ch] = topic
|
|
p.m.Unlock()
|
|
return ch
|
|
}
|
|
|
|
// Evict removes the specified subscriber from receiving any more messages.
|
|
func (p *Publisher) Evict(sub chan interface{}) {
|
|
p.m.Lock()
|
|
delete(p.subscribers, sub)
|
|
close(sub)
|
|
p.m.Unlock()
|
|
}
|
|
|
|
// Publish sends the data in v to all subscribers currently registered with the publisher.
|
|
func (p *Publisher) Publish(v interface{}) {
|
|
p.m.RLock()
|
|
wg := new(sync.WaitGroup)
|
|
for sub, topic := range p.subscribers {
|
|
wg.Add(1)
|
|
|
|
go p.sendTopic(sub, topic, v, wg)
|
|
}
|
|
wg.Wait()
|
|
p.m.RUnlock()
|
|
}
|
|
|
|
// Close closes the channels to all subscribers registered with the publisher.
|
|
func (p *Publisher) Close() {
|
|
p.m.Lock()
|
|
for sub := range p.subscribers {
|
|
delete(p.subscribers, sub)
|
|
close(sub)
|
|
}
|
|
p.m.Unlock()
|
|
}
|
|
|
|
func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
|
|
defer wg.Done()
|
|
if topic != nil && !topic(v) {
|
|
return
|
|
}
|
|
|
|
// send under a select as to not block if the receiver is unavailable
|
|
if p.timeout > 0 {
|
|
select {
|
|
case sub <- v:
|
|
case <-time.After(p.timeout):
|
|
}
|
|
return
|
|
}
|
|
|
|
select {
|
|
case sub <- v:
|
|
default:
|
|
}
|
|
}
|