pkg/pubsub/publisher.go
Shijiang Wei 092f2d6aed use pubsub instead of filenotify to follow json logs
inotify event is trigged immediately there's data written to disk.
But at the time that the inotify event is received, the json line might
not fully saved to disk. If the json decoder tries to decode in such
case, an io.UnexpectedEOF will be trigged.
We used to retry for several times to mitigate the io.UnexpectedEOF error.
But there are still flaky tests caused by the partial log entries.

The daemon knows exactly when there are new log entries emitted. We can
use the pubsub package to notify all the log readers instead of inotify.

Signed-off-by: Shijiang Wei <mountkin@gmail.com>

try to fix broken test. will squash once tests pass

Signed-off-by: Shijiang Wei <mountkin@gmail.com>
2016-02-15 19:25:16 +08:00

106 lines
2.5 KiB
Go

package pubsub
import (
"sync"
"time"
)
// NewPublisher creates a new pub/sub publisher to broadcast messages.
// The duration is used as the send timeout as to not block the publisher publishing
// messages to other clients if one client is slow or unresponsive.
// The buffer is used when creating new channels for subscribers.
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
type subscriber chan interface{}
type topicFunc func(v interface{}) bool
// Publisher is basic pub/sub structure. Allows to send events and subscribe
// to them. Can be safely used from multiple goroutines.
type Publisher struct {
m sync.RWMutex
buffer int
timeout time.Duration
subscribers map[subscriber]topicFunc
}
// Len returns the number of subscribers for the publisher
func (p *Publisher) Len() int {
p.m.RLock()
i := len(p.subscribers)
p.m.RUnlock()
return i
}
// Subscribe adds a new subscriber to the publisher returning the channel.
func (p *Publisher) Subscribe() chan interface{} {
return p.SubscribeTopic(nil)
}
// SubscribeTopic adds a new subscriber that filters messages sent by a topic.
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// Evict removes the specified subscriber from receiving any more messages.
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
if _, ok := p.subscribers[sub]; ok {
delete(p.subscribers, sub)
close(sub)
}
p.m.Unlock()
}
// Publish sends the data in v to all subscribers currently registered with the publisher.
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
wg := new(sync.WaitGroup)
for sub, topic := range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, topic, v, wg)
}
wg.Wait()
p.m.RUnlock()
}
// Close closes the channels to all subscribers registered with the publisher.
func (p *Publisher) Close() {
p.m.Lock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
p.m.Unlock()
}
func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
defer wg.Done()
if topic != nil && !topic(v) {
return
}
// send under a select as to not block if the receiver is unavailable
if p.timeout > 0 {
select {
case sub <- v:
case <-time.After(p.timeout):
}
return
}
select {
case sub <- v:
default:
}
}