From cfe971591172716c2383a6ac17bdde07657b79b2 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Tue, 18 Sep 2018 22:08:30 -0400 Subject: [PATCH] Use atomic pub/sub for events --- agent.go | 52 ++++++++++++++++++++++++++++++++++++++++++---------- events.go | 14 +++++++++++++- 2 files changed, 55 insertions(+), 11 deletions(-) diff --git a/agent.go b/agent.go index d7037e1..489df4e 100644 --- a/agent.go +++ b/agent.go @@ -2,6 +2,7 @@ package element import ( "errors" + "sync" "time" "github.com/hashicorp/memberlist" @@ -19,6 +20,8 @@ var ( // Agent represents the node agent type Agent struct { + *subscribers + config *Config members *memberlist.Memberlist peerUpdateChan chan bool @@ -35,6 +38,7 @@ func NewAgent(info *Peer, cfg *Config) (*Agent, error) { nodeEventCh = make(chan *NodeEvent, 64) ) a := &Agent{ + subscribers: newSubscribers(), config: cfg, peerUpdateChan: updateCh, nodeEventChan: nodeEventCh, @@ -62,14 +66,42 @@ func (a *Agent) SyncInterval() time.Duration { return a.memberConfig.PushPullInterval } -// Subscribe subscribes to the node event channel -func (a *Agent) Subscribe(ch chan *NodeEvent) { - go func() { - for { - select { - case evt := <-a.nodeEventChan: - ch <- evt - } - } - }() +func newSubscribers() *subscribers { + return &subscribers{ + subs: make(map[chan *NodeEvent]struct{}), + } +} + +type subscribers struct { + mu sync.Mutex + + subs map[chan *NodeEvent]struct{} +} + +// Subscribe subscribes to the node event channel +func (s *subscribers) Subscribe() chan *NodeEvent { + ch := make(chan *NodeEvent, 64) + s.mu.Lock() + s.subs[ch] = struct{}{} + s.mu.Unlock() + return ch +} + +// Unsubscribe removes the channel from node events +func (s *subscribers) Unsubscribe(ch chan *NodeEvent) { + s.mu.Lock() + delete(s.subs, ch) + s.mu.Unlock() +} + +func (s *subscribers) send(e *NodeEvent) { + s.mu.Lock() + for ch := range s.subs { + // non-blocking send + select { + case ch <- e: + default: + } + } + s.mu.Unlock() } diff --git a/events.go b/events.go index f6dcf7d..3183991 100644 --- a/events.go +++ b/events.go @@ -19,21 +19,33 @@ const ( // NodeEvent stores the event type and node information type NodeEvent struct { // EventType is the type of event fired - EventType NodeEventType + Type NodeEventType // Node is the internal cluster node Node *memberlist.Node } // NotifyJoin notifies when a node joins the cluster func (a *Agent) NotifyJoin(n *memberlist.Node) { + a.send(&NodeEvent{ + Type: NodeJoin, + Node: n, + }) } // NotifyLeave notifies when a node leaves the cluster func (a *Agent) NotifyLeave(n *memberlist.Node) { delete(a.state.Peers, n.Name) a.peerUpdateChan <- true + a.send(&NodeEvent{ + Type: NodeLeave, + Node: n, + }) } // NotifyUpdate notifies when a node is updated in the cluster func (a *Agent) NotifyUpdate(n *memberlist.Node) { + a.send(&NodeEvent{ + Type: NodeUpdate, + Node: n, + }) }