From 092f2d6aed68b5ce7e46efb44bc4b7adc6bb03ce Mon Sep 17 00:00:00 2001 From: Shijiang Wei Date: Thu, 21 Jan 2016 00:28:10 +0800 Subject: [PATCH 1/2] use pubsub instead of filenotify to follow json logs inotify event is trigged immediately there's data written to disk. But at the time that the inotify event is received, the json line might not fully saved to disk. If the json decoder tries to decode in such case, an io.UnexpectedEOF will be trigged. We used to retry for several times to mitigate the io.UnexpectedEOF error. But there are still flaky tests caused by the partial log entries. The daemon knows exactly when there are new log entries emitted. We can use the pubsub package to notify all the log readers instead of inotify. Signed-off-by: Shijiang Wei try to fix broken test. will squash once tests pass Signed-off-by: Shijiang Wei --- pubsub/publisher.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pubsub/publisher.go b/pubsub/publisher.go index 8529ffa..d48d432 100644 --- a/pubsub/publisher.go +++ b/pubsub/publisher.go @@ -54,8 +54,10 @@ func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { // Evict removes the specified subscriber from receiving any more messages. func (p *Publisher) Evict(sub chan interface{}) { p.m.Lock() - delete(p.subscribers, sub) - close(sub) + if _, ok := p.subscribers[sub]; ok { + delete(p.subscribers, sub) + close(sub) + } p.m.Unlock() } From b8296053e283984005b56502fc7fab9fc31aff14 Mon Sep 17 00:00:00 2001 From: Shijiang Wei Date: Fri, 29 Jan 2016 13:08:20 +0800 Subject: [PATCH 2/2] optimize pubsub.Publish function Signed-off-by: Shijiang Wei --- pubsub/publisher.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pubsub/publisher.go b/pubsub/publisher.go index d48d432..22be5b7 100644 --- a/pubsub/publisher.go +++ b/pubsub/publisher.go @@ -64,10 +64,14 @@ func (p *Publisher) Evict(sub chan interface{}) { // Publish sends the data in v to all subscribers currently registered with the publisher. func (p *Publisher) Publish(v interface{}) { p.m.RLock() + if len(p.subscribers) == 0 { + p.m.RUnlock() + return + } + wg := new(sync.WaitGroup) for sub, topic := range p.subscribers { wg.Add(1) - go p.sendTopic(sub, topic, v, wg) } wg.Wait()