commit 752f9b0d09dcc7b47b0cf3507bf76d8aa9798f81 Author: Evan Hazlett Date: Mon Jun 4 12:40:04 2018 -0400 initial commit Signed-off-by: Evan Hazlett diff --git a/README.md b/README.md new file mode 100644 index 0000000..820878c --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +# Element +Element handles simplified clustering among nodes. It handles +peer management (joins, expiry, etc) as well as exposing GRPC services and publishing +the GRPC endpoint address for each node throughout the group. This enables services +to be built using element and allow simple publishing of the GRPC endpoints +to other nodes for accessing the services. diff --git a/agent.go b/agent.go new file mode 100644 index 0000000..0581d65 --- /dev/null +++ b/agent.go @@ -0,0 +1,52 @@ +package element + +import ( + "errors" + "time" + + "github.com/hashicorp/memberlist" + "google.golang.org/grpc" +) + +const ( + defaultInterval = time.Second * 10 + nodeReconcileTimeout = defaultInterval * 3 + nodeUpdateTimeout = defaultInterval / 2 +) + +var ( + ErrUnknownConnectionType = errors.New("unknown connection type") +) + +// Agent represents the node agent +type Agent struct { + config *Config + members *memberlist.Memberlist + peerUpdateChan chan bool + nodeEventChan chan *NodeEvent + grpcServer *grpc.Server + registeredServices map[string]struct{} +} + +// NewAgent returns a new node agent +func NewAgent(cfg *Config) (*Agent, error) { + updateCh := make(chan bool) + nodeEventCh := make(chan *NodeEvent) + mc, err := setupMemberlistConfig(cfg, updateCh, nodeEventCh) + if err != nil { + return nil, err + } + + ml, err := memberlist.Create(mc) + if err != nil { + return nil, err + } + grpcServer := grpc.NewServer() + return &Agent{ + config: cfg, + members: ml, + peerUpdateChan: updateCh, + nodeEventChan: nodeEventCh, + grpcServer: grpcServer, + }, nil +} diff --git a/config.go b/config.go new file mode 100644 index 0000000..688de80 --- /dev/null +++ b/config.go @@ -0,0 +1,81 @@ +package element + +import ( + "fmt" + "io/ioutil" + "log" + + "github.com/hashicorp/memberlist" +) + +// ConnectionType defines the type of connection for the agent to use (wan, lan, local) +type ConnectionType string + +const ( + // LAN is a local area network connection intended for high speed, low latency networks + LAN ConnectionType = "lan" + // WAN is a wide area connection intended for long distance remote links + WAN ConnectionType = "wan" + // Local is a local connection intended for high speed local development links + Local ConnectionType = "local" +) + +// Config is the agent config +type Config struct { + // NodeName is the name of the node. Each node must have a unique name in the cluster. + NodeName string + // AgentAddr is the address on which the agent will serve the GRPC services + AgentAddr string + // AgentPort is the port on which the agent will serve the GRPC services + AgentPort int + // ConnectionType is the connection type the agent will use + ConnectionType string + // BindAddr is the cluster bind address + BindAddr string + // BindPort is the cluster bind port + BindPort int + // AdvertiseAddr is the cluster address that will be used for membership communication + AdvertiseAddr string + // AdvertisePort is the cluster port that will be used for membership communication + AdvertisePort int + // Peers is a local cache of peer members + Peers []string +} + +func setupMemberlistConfig(cfg *Config, peerUpdateChan chan bool, nodeEventChan chan *NodeEvent) (*memberlist.Config, error) { + var mc *memberlist.Config + switch cfg.ConnectionType { + case string(Local): + mc = memberlist.DefaultLocalConfig() + case string(WAN): + mc = memberlist.DefaultWANConfig() + case string(LAN): + mc = memberlist.DefaultLANConfig() + default: + return nil, ErrUnknownConnectionType + } + + mc.Name = cfg.NodeName + mc.Delegate = NewAgentDelegate(cfg.NodeName, fmt.Sprintf("%s:%d", cfg.AgentAddr, cfg.AgentPort), peerUpdateChan, nodeEventChan) + mc.Events = NewEventHandler(nodeEventChan) + + // disable logging for memberlist + // TODO: enable if debug + mc.Logger = log.New(ioutil.Discard, "", 0) + + // ml overrides for connection + if v := cfg.BindAddr; v != "" { + mc.BindAddr = v + } + if v := cfg.BindPort; v > 0 { + mc.BindPort = v + } + if v := cfg.AdvertiseAddr; v != "" { + mc.AdvertiseAddr = v + } + if v := cfg.AdvertisePort; v > 0 { + mc.AdvertisePort = v + } + + return mc, nil +} diff --git a/delegate.go b/delegate.go new file mode 100644 index 0000000..73cffbf --- /dev/null +++ b/delegate.go @@ -0,0 +1,98 @@ +package element + +import ( + "encoding/json" + "time" + + "github.com/sirupsen/logrus" +) + +type agentDelegate struct { + Name string + Addr string + Updated time.Time + Peers map[string]*PeerAgent + updateChan chan bool + nodeEventChan chan *NodeEvent +} + +// NewAgentDelegate is the agent delegate used to handle cluster events +func NewAgentDelegate(name, addr string, updateCh chan bool, nodeEventCh chan *NodeEvent) *agentDelegate { + agent := &agentDelegate{ + Name: name, + Addr: addr, + Peers: make(map[string]*PeerAgent), + updateChan: updateCh, + nodeEventChan: nodeEventCh, + } + + // event handler + go func() { + for { + select { + case evt := <-nodeEventCh: + switch evt.EventType { + case NodeJoin: + case NodeUpdate: + case NodeLeave: + agent.removeNode(evt.Node.Name) + } + } + } + }() + + return agent +} + +// NodeMeta returns local node meta information +func (d *agentDelegate) NodeMeta(limit int) []byte { + data, err := json.Marshal(d.Peers) + if err != nil { + logrus.Errorf("error serializing node meta: %s", err) + } + return data +} + +// NotifyMsg is used for handling cluster messages +func (d *agentDelegate) NotifyMsg(buf []byte) { + // this can be used to receive messages sent (i.e. SendReliable) +} + +// GetBroadcasts is called when user messages can be broadcast +func (d *agentDelegate) GetBroadcasts(overhead, limit int) [][]byte { + return nil +} + +// LocalState is the local cluster agent state +func (d *agentDelegate) LocalState(join bool) []byte { + data, err := json.Marshal(d) + if err != nil { + logrus.Errorf("error serializing local state: %s", err) + } + return []byte(data) +} + +// MergeRemoteState is used to store remote peer information +func (d *agentDelegate) MergeRemoteState(buf []byte, join bool) { + var remoteAgent *agentDelegate + if err := json.Unmarshal(buf, &remoteAgent); err != nil { + logrus.Errorf("error parsing remote agent state: %s", err) + return + } + d.Updated = time.Now() + d.Peers[remoteAgent.Name] = &PeerAgent{ + Name: remoteAgent.Name, + Addr: remoteAgent.Addr, + Updated: time.Now(), + } + // notify update + d.updateChan <- true +} + +func (d *agentDelegate) removeNode(name string) { + if _, exists := d.Peers[name]; exists { + delete(d.Peers, name) + // notify update + d.updateChan <- true + } +} diff --git a/events.go b/events.go new file mode 100644 index 0000000..81826e1 --- /dev/null +++ b/events.go @@ -0,0 +1,60 @@ +package element + +import ( + "github.com/hashicorp/memberlist" +) + +// NodeEventType is the type of node event +type NodeEventType string + +const ( + // NodeJoin is the event fired upon a node joining the cluster + NodeJoin NodeEventType = "join" + // NodeLeave is the event fired upon a node leaving the cluster + NodeLeave NodeEventType = "leave" + // NodeUpdate is the event fired upon a node updating in the cluster + NodeUpdate NodeEventType = "update" +) + +// NodeEvent stores the event type and node information +type NodeEvent struct { + // EventType is the type of event fired + EventType NodeEventType + // Node is the internal cluster node + Node *memberlist.Node +} + +// EventHandler is used for event handling +type EventHandler struct { + ch chan *NodeEvent +} + +// NewEventHandler returns an EventHandler that is used to perform actions for the specified event +func NewEventHandler(ch chan *NodeEvent) *EventHandler { + return &EventHandler{ + ch: ch, + } +} + +// NotifyJoin notifies when a node joins the cluster +func (h *EventHandler) NotifyJoin(n *memberlist.Node) { + go h.notify(NodeJoin, n) +} + +// NotifyLeave notifies when a node leaves the cluster +func (h *EventHandler) NotifyLeave(n *memberlist.Node) { + go h.notify(NodeLeave, n) +} + +// NotifyUpdate notifies when a node is updated in the cluster +func (h *EventHandler) NotifyUpdate(n *memberlist.Node) { + go h.notify(NodeUpdate, n) +} + +func (h *EventHandler) notify(t NodeEventType, n *memberlist.Node) { + // TODO: use context WithTimeout to enable cancel + h.ch <- &NodeEvent{ + EventType: t, + Node: n, + } +} diff --git a/peers.go b/peers.go new file mode 100644 index 0000000..477da9e --- /dev/null +++ b/peers.go @@ -0,0 +1,41 @@ +package element + +import ( + "encoding/json" + "fmt" + "time" +) + +// PeerAgent is the peer information for an agent in the cluster including name and GRPC address +type PeerAgent struct { + Name string + Addr string + Updated time.Time +} + +// Peers returns all known peers in the cluster +func (a *Agent) Peers() ([]*PeerAgent, error) { + self := a.members.LocalNode() + var ( + peerAgents map[string]*PeerAgent + peers []*PeerAgent + ) + if err := json.Unmarshal(self.Meta, &peerAgents); err != nil { + return nil, err + } + + for _, p := range peerAgents { + peers = append(peers, p) + } + + return peers, nil +} + +// LocalNode returns local node peer info +func (a *Agent) LocalNode() (*PeerAgent, error) { + return &PeerAgent{ + Name: a.config.NodeName, + Addr: fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort), + Updated: time.Now(), + }, nil +} diff --git a/register.go b/register.go new file mode 100644 index 0000000..5e22a75 --- /dev/null +++ b/register.go @@ -0,0 +1,13 @@ +package element + +import "fmt" + +// Register registers a GRPC service with the agent +func (a *Agent) Register(svc Service) error { + id := svc.ID() + if _, exists := a.registeredServices[id]; exists { + return fmt.Errorf("service %s already registered", id) + } + svc.Register(a.grpcServer) + return nil +} diff --git a/service.go b/service.go new file mode 100644 index 0000000..64c2e18 --- /dev/null +++ b/service.go @@ -0,0 +1,8 @@ +package element + +import "google.golang.org/grpc" + +type Service interface { + ID() string + Register(srv *grpc.Server) error +} diff --git a/shutdown.go b/shutdown.go new file mode 100644 index 0000000..2a2ae67 --- /dev/null +++ b/shutdown.go @@ -0,0 +1,14 @@ +package element + +// Shutdown causes the local node to leave the cluster and perform a clean shutdown +func (a *Agent) Shutdown() error { + if err := a.members.Leave(nodeUpdateTimeout); err != nil { + return err + } + + if err := a.members.Shutdown(); err != nil { + return err + } + + return nil +} diff --git a/start.go b/start.go new file mode 100644 index 0000000..717e5b0 --- /dev/null +++ b/start.go @@ -0,0 +1,62 @@ +package element + +import ( + "fmt" + "net" + "os" + "syscall" + + "github.com/sirupsen/logrus" +) + +// Start activates the GRPC listener as well as joins the cluster if specified and blocks until a SIGTERM or SIGINT is received +func (a *Agent) Start(signals chan os.Signal) error { + logrus.Infof("starting agent: grpc=%s:%d bind=%s:%d advertise=%s:%d", + a.config.AgentAddr, + a.config.AgentPort, + a.config.BindAddr, + a.config.BindPort, + a.config.AdvertiseAddr, + a.config.AdvertisePort, + ) + l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort)) + if err != nil { + return err + } + go a.grpcServer.Serve(l) + + // start node metadata updater + go func() { + for { + <-a.peerUpdateChan + if err := a.members.UpdateNode(nodeUpdateTimeout); err != nil { + logrus.Errorf("error updating node metadata: %s", err) + } + } + }() + + if len(a.config.Peers) > 0 { + logrus.Debugf("joining peers: %v", a.config.Peers) + n, err := a.members.Join(a.config.Peers) + if err != nil { + return err + } + + logrus.Infof("joined %d peer(s)", n) + } + + for { + select { + case s := <-signals: + switch s { + case syscall.SIGTERM, syscall.SIGINT: + logrus.Debug("shutting down") + if err := a.Shutdown(); err != nil { + return err + } + + return nil + } + } + } +}