From 752f9b0d09dcc7b47b0cf3507bf76d8aa9798f81 Mon Sep 17 00:00:00 2001 From: Evan Hazlett Date: Mon, 4 Jun 2018 12:40:04 -0400 Subject: [PATCH] initial commit Signed-off-by: Evan Hazlett --- README.md | 6 ++++ agent.go | 52 ++++++++++++++++++++++++++++ config.go | 81 +++++++++++++++++++++++++++++++++++++++++++ delegate.go | 98 +++++++++++++++++++++++++++++++++++++++++++++++++++++ events.go | 60 ++++++++++++++++++++++++++++++++ peers.go | 41 ++++++++++++++++++++++ register.go | 13 +++++++ service.go | 8 +++++ shutdown.go | 14 ++++++++ start.go | 62 +++++++++++++++++++++++++++++++++ 10 files changed, 435 insertions(+) create mode 100644 README.md create mode 100644 agent.go create mode 100644 config.go create mode 100644 delegate.go create mode 100644 events.go create mode 100644 peers.go create mode 100644 register.go create mode 100644 service.go create mode 100644 shutdown.go create mode 100644 start.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..820878c --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +# Element +Element handles simplified clustering among nodes. It handles +peer management (joins, expiry, etc) as well as exposing GRPC services and publishing +the GRPC endpoint address for each node throughout the group. This enables services +to be built using element and allow simple publishing of the GRPC endpoints +to other nodes for accessing the services. diff --git a/agent.go b/agent.go new file mode 100644 index 0000000..0581d65 --- /dev/null +++ b/agent.go @@ -0,0 +1,52 @@ +package element + +import ( + "errors" + "time" + + "github.com/hashicorp/memberlist" + "google.golang.org/grpc" +) + +const ( + defaultInterval = time.Second * 10 + nodeReconcileTimeout = defaultInterval * 3 + nodeUpdateTimeout = defaultInterval / 2 +) + +var ( + ErrUnknownConnectionType = errors.New("unknown connection type") +) + +// Agent represents the node agent +type Agent struct { + config *Config + members *memberlist.Memberlist + peerUpdateChan chan bool + nodeEventChan chan *NodeEvent + grpcServer *grpc.Server + registeredServices map[string]struct{} +} + +// NewAgent returns a new node agent +func NewAgent(cfg *Config) (*Agent, error) { + updateCh := make(chan bool) + nodeEventCh := make(chan *NodeEvent) + mc, err := setupMemberlistConfig(cfg, updateCh, nodeEventCh) + if err != nil { + return nil, err + } + + ml, err := memberlist.Create(mc) + if err != nil { + return nil, err + } + grpcServer := grpc.NewServer() + return &Agent{ + config: cfg, + members: ml, + peerUpdateChan: updateCh, + nodeEventChan: nodeEventCh, + grpcServer: grpcServer, + }, nil +} diff --git a/config.go b/config.go new file mode 100644 index 0000000..688de80 --- /dev/null +++ b/config.go @@ -0,0 +1,81 @@ +package element + +import ( + "fmt" + "io/ioutil" + "log" + + "github.com/hashicorp/memberlist" +) + +// ConnectionType defines the type of connection for the agent to use (wan, lan, local) +type ConnectionType string + +const ( + // LAN is a local area network connection intended for high speed, low latency networks + LAN ConnectionType = "lan" + // WAN is a wide area connection intended for long distance remote links + WAN ConnectionType = "wan" + // Local is a local connection intended for high speed local development links + Local ConnectionType = "local" +) + +// Config is the agent config +type Config struct { + // NodeName is the name of the node. Each node must have a unique name in the cluster. + NodeName string + // AgentAddr is the address on which the agent will serve the GRPC services + AgentAddr string + // AgentPort is the port on which the agent will serve the GRPC services + AgentPort int + // ConnectionType is the connection type the agent will use + ConnectionType string + // BindAddr is the cluster bind address + BindAddr string + // BindPort is the cluster bind port + BindPort int + // AdvertiseAddr is the cluster address that will be used for membership communication + AdvertiseAddr string + // AdvertisePort is the cluster port that will be used for membership communication + AdvertisePort int + // Peers is a local cache of peer members + Peers []string +} + +func setupMemberlistConfig(cfg *Config, peerUpdateChan chan bool, nodeEventChan chan *NodeEvent) (*memberlist.Config, error) { + var mc *memberlist.Config + switch cfg.ConnectionType { + case string(Local): + mc = memberlist.DefaultLocalConfig() + case string(WAN): + mc = memberlist.DefaultWANConfig() + case string(LAN): + mc = memberlist.DefaultLANConfig() + default: + return nil, ErrUnknownConnectionType + } + + mc.Name = cfg.NodeName + mc.Delegate = NewAgentDelegate(cfg.NodeName, fmt.Sprintf("%s:%d", cfg.AgentAddr, cfg.AgentPort), peerUpdateChan, nodeEventChan) + mc.Events = NewEventHandler(nodeEventChan) + + // disable logging for memberlist + // TODO: enable if debug + mc.Logger = log.New(ioutil.Discard, "", 0) + + // ml overrides for connection + if v := cfg.BindAddr; v != "" { + mc.BindAddr = v + } + if v := cfg.BindPort; v > 0 { + mc.BindPort = v + } + if v := cfg.AdvertiseAddr; v != "" { + mc.AdvertiseAddr = v + } + if v := cfg.AdvertisePort; v > 0 { + mc.AdvertisePort = v + } + + return mc, nil +} diff --git a/delegate.go b/delegate.go new file mode 100644 index 0000000..73cffbf --- /dev/null +++ b/delegate.go @@ -0,0 +1,98 @@ +package element + +import ( + "encoding/json" + "time" + + "github.com/sirupsen/logrus" +) + +type agentDelegate struct { + Name string + Addr string + Updated time.Time + Peers map[string]*PeerAgent + updateChan chan bool + nodeEventChan chan *NodeEvent +} + +// NewAgentDelegate is the agent delegate used to handle cluster events +func NewAgentDelegate(name, addr string, updateCh chan bool, nodeEventCh chan *NodeEvent) *agentDelegate { + agent := &agentDelegate{ + Name: name, + Addr: addr, + Peers: make(map[string]*PeerAgent), + updateChan: updateCh, + nodeEventChan: nodeEventCh, + } + + // event handler + go func() { + for { + select { + case evt := <-nodeEventCh: + switch evt.EventType { + case NodeJoin: + case NodeUpdate: + case NodeLeave: + agent.removeNode(evt.Node.Name) + } + } + } + }() + + return agent +} + +// NodeMeta returns local node meta information +func (d *agentDelegate) NodeMeta(limit int) []byte { + data, err := json.Marshal(d.Peers) + if err != nil { + logrus.Errorf("error serializing node meta: %s", err) + } + return data +} + +// NotifyMsg is used for handling cluster messages +func (d *agentDelegate) NotifyMsg(buf []byte) { + // this can be used to receive messages sent (i.e. SendReliable) +} + +// GetBroadcasts is called when user messages can be broadcast +func (d *agentDelegate) GetBroadcasts(overhead, limit int) [][]byte { + return nil +} + +// LocalState is the local cluster agent state +func (d *agentDelegate) LocalState(join bool) []byte { + data, err := json.Marshal(d) + if err != nil { + logrus.Errorf("error serializing local state: %s", err) + } + return []byte(data) +} + +// MergeRemoteState is used to store remote peer information +func (d *agentDelegate) MergeRemoteState(buf []byte, join bool) { + var remoteAgent *agentDelegate + if err := json.Unmarshal(buf, &remoteAgent); err != nil { + logrus.Errorf("error parsing remote agent state: %s", err) + return + } + d.Updated = time.Now() + d.Peers[remoteAgent.Name] = &PeerAgent{ + Name: remoteAgent.Name, + Addr: remoteAgent.Addr, + Updated: time.Now(), + } + // notify update + d.updateChan <- true +} + +func (d *agentDelegate) removeNode(name string) { + if _, exists := d.Peers[name]; exists { + delete(d.Peers, name) + // notify update + d.updateChan <- true + } +} diff --git a/events.go b/events.go new file mode 100644 index 0000000..81826e1 --- /dev/null +++ b/events.go @@ -0,0 +1,60 @@ +package element + +import ( + "github.com/hashicorp/memberlist" +) + +// NodeEventType is the type of node event +type NodeEventType string + +const ( + // NodeJoin is the event fired upon a node joining the cluster + NodeJoin NodeEventType = "join" + // NodeLeave is the event fired upon a node leaving the cluster + NodeLeave NodeEventType = "leave" + // NodeUpdate is the event fired upon a node updating in the cluster + NodeUpdate NodeEventType = "update" +) + +// NodeEvent stores the event type and node information +type NodeEvent struct { + // EventType is the type of event fired + EventType NodeEventType + // Node is the internal cluster node + Node *memberlist.Node +} + +// EventHandler is used for event handling +type EventHandler struct { + ch chan *NodeEvent +} + +// NewEventHandler returns an EventHandler that is used to perform actions for the specified event +func NewEventHandler(ch chan *NodeEvent) *EventHandler { + return &EventHandler{ + ch: ch, + } +} + +// NotifyJoin notifies when a node joins the cluster +func (h *EventHandler) NotifyJoin(n *memberlist.Node) { + go h.notify(NodeJoin, n) +} + +// NotifyLeave notifies when a node leaves the cluster +func (h *EventHandler) NotifyLeave(n *memberlist.Node) { + go h.notify(NodeLeave, n) +} + +// NotifyUpdate notifies when a node is updated in the cluster +func (h *EventHandler) NotifyUpdate(n *memberlist.Node) { + go h.notify(NodeUpdate, n) +} + +func (h *EventHandler) notify(t NodeEventType, n *memberlist.Node) { + // TODO: use context WithTimeout to enable cancel + h.ch <- &NodeEvent{ + EventType: t, + Node: n, + } +} diff --git a/peers.go b/peers.go new file mode 100644 index 0000000..477da9e --- /dev/null +++ b/peers.go @@ -0,0 +1,41 @@ +package element + +import ( + "encoding/json" + "fmt" + "time" +) + +// PeerAgent is the peer information for an agent in the cluster including name and GRPC address +type PeerAgent struct { + Name string + Addr string + Updated time.Time +} + +// Peers returns all known peers in the cluster +func (a *Agent) Peers() ([]*PeerAgent, error) { + self := a.members.LocalNode() + var ( + peerAgents map[string]*PeerAgent + peers []*PeerAgent + ) + if err := json.Unmarshal(self.Meta, &peerAgents); err != nil { + return nil, err + } + + for _, p := range peerAgents { + peers = append(peers, p) + } + + return peers, nil +} + +// LocalNode returns local node peer info +func (a *Agent) LocalNode() (*PeerAgent, error) { + return &PeerAgent{ + Name: a.config.NodeName, + Addr: fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort), + Updated: time.Now(), + }, nil +} diff --git a/register.go b/register.go new file mode 100644 index 0000000..5e22a75 --- /dev/null +++ b/register.go @@ -0,0 +1,13 @@ +package element + +import "fmt" + +// Register registers a GRPC service with the agent +func (a *Agent) Register(svc Service) error { + id := svc.ID() + if _, exists := a.registeredServices[id]; exists { + return fmt.Errorf("service %s already registered", id) + } + svc.Register(a.grpcServer) + return nil +} diff --git a/service.go b/service.go new file mode 100644 index 0000000..64c2e18 --- /dev/null +++ b/service.go @@ -0,0 +1,8 @@ +package element + +import "google.golang.org/grpc" + +type Service interface { + ID() string + Register(srv *grpc.Server) error +} diff --git a/shutdown.go b/shutdown.go new file mode 100644 index 0000000..2a2ae67 --- /dev/null +++ b/shutdown.go @@ -0,0 +1,14 @@ +package element + +// Shutdown causes the local node to leave the cluster and perform a clean shutdown +func (a *Agent) Shutdown() error { + if err := a.members.Leave(nodeUpdateTimeout); err != nil { + return err + } + + if err := a.members.Shutdown(); err != nil { + return err + } + + return nil +} diff --git a/start.go b/start.go new file mode 100644 index 0000000..717e5b0 --- /dev/null +++ b/start.go @@ -0,0 +1,62 @@ +package element + +import ( + "fmt" + "net" + "os" + "syscall" + + "github.com/sirupsen/logrus" +) + +// Start activates the GRPC listener as well as joins the cluster if specified and blocks until a SIGTERM or SIGINT is received +func (a *Agent) Start(signals chan os.Signal) error { + logrus.Infof("starting agent: grpc=%s:%d bind=%s:%d advertise=%s:%d", + a.config.AgentAddr, + a.config.AgentPort, + a.config.BindAddr, + a.config.BindPort, + a.config.AdvertiseAddr, + a.config.AdvertisePort, + ) + l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort)) + if err != nil { + return err + } + go a.grpcServer.Serve(l) + + // start node metadata updater + go func() { + for { + <-a.peerUpdateChan + if err := a.members.UpdateNode(nodeUpdateTimeout); err != nil { + logrus.Errorf("error updating node metadata: %s", err) + } + } + }() + + if len(a.config.Peers) > 0 { + logrus.Debugf("joining peers: %v", a.config.Peers) + n, err := a.members.Join(a.config.Peers) + if err != nil { + return err + } + + logrus.Infof("joined %d peer(s)", n) + } + + for { + select { + case s := <-signals: + switch s { + case syscall.SIGTERM, syscall.SIGINT: + logrus.Debug("shutting down") + if err := a.Shutdown(); err != nil { + return err + } + + return nil + } + } + } +}