initial commit

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>
This commit is contained in:
Evan Hazlett 2018-06-04 12:40:04 -04:00
commit 752f9b0d09
No known key found for this signature in database
GPG key ID: A519480096146526
10 changed files with 435 additions and 0 deletions

6
README.md Normal file
View file

@ -0,0 +1,6 @@
# Element
Element handles simplified clustering among nodes. It handles
peer management (joins, expiry, etc) as well as exposing GRPC services and publishing
the GRPC endpoint address for each node throughout the group. This enables services
to be built using element and allow simple publishing of the GRPC endpoints
to other nodes for accessing the services.

52
agent.go Normal file
View file

@ -0,0 +1,52 @@
package element
import (
"errors"
"time"
"github.com/hashicorp/memberlist"
"google.golang.org/grpc"
)
const (
defaultInterval = time.Second * 10
nodeReconcileTimeout = defaultInterval * 3
nodeUpdateTimeout = defaultInterval / 2
)
var (
ErrUnknownConnectionType = errors.New("unknown connection type")
)
// Agent represents the node agent
type Agent struct {
config *Config
members *memberlist.Memberlist
peerUpdateChan chan bool
nodeEventChan chan *NodeEvent
grpcServer *grpc.Server
registeredServices map[string]struct{}
}
// NewAgent returns a new node agent
func NewAgent(cfg *Config) (*Agent, error) {
updateCh := make(chan bool)
nodeEventCh := make(chan *NodeEvent)
mc, err := setupMemberlistConfig(cfg, updateCh, nodeEventCh)
if err != nil {
return nil, err
}
ml, err := memberlist.Create(mc)
if err != nil {
return nil, err
}
grpcServer := grpc.NewServer()
return &Agent{
config: cfg,
members: ml,
peerUpdateChan: updateCh,
nodeEventChan: nodeEventCh,
grpcServer: grpcServer,
}, nil
}

81
config.go Normal file
View file

@ -0,0 +1,81 @@
package element
import (
"fmt"
"io/ioutil"
"log"
"github.com/hashicorp/memberlist"
)
// ConnectionType defines the type of connection for the agent to use (wan, lan, local)
type ConnectionType string
const (
// LAN is a local area network connection intended for high speed, low latency networks
LAN ConnectionType = "lan"
// WAN is a wide area connection intended for long distance remote links
WAN ConnectionType = "wan"
// Local is a local connection intended for high speed local development links
Local ConnectionType = "local"
)
// Config is the agent config
type Config struct {
// NodeName is the name of the node. Each node must have a unique name in the cluster.
NodeName string
// AgentAddr is the address on which the agent will serve the GRPC services
AgentAddr string
// AgentPort is the port on which the agent will serve the GRPC services
AgentPort int
// ConnectionType is the connection type the agent will use
ConnectionType string
// BindAddr is the cluster bind address
BindAddr string
// BindPort is the cluster bind port
BindPort int
// AdvertiseAddr is the cluster address that will be used for membership communication
AdvertiseAddr string
// AdvertisePort is the cluster port that will be used for membership communication
AdvertisePort int
// Peers is a local cache of peer members
Peers []string
}
func setupMemberlistConfig(cfg *Config, peerUpdateChan chan bool, nodeEventChan chan *NodeEvent) (*memberlist.Config, error) {
var mc *memberlist.Config
switch cfg.ConnectionType {
case string(Local):
mc = memberlist.DefaultLocalConfig()
case string(WAN):
mc = memberlist.DefaultWANConfig()
case string(LAN):
mc = memberlist.DefaultLANConfig()
default:
return nil, ErrUnknownConnectionType
}
mc.Name = cfg.NodeName
mc.Delegate = NewAgentDelegate(cfg.NodeName, fmt.Sprintf("%s:%d", cfg.AgentAddr, cfg.AgentPort), peerUpdateChan, nodeEventChan)
mc.Events = NewEventHandler(nodeEventChan)
// disable logging for memberlist
// TODO: enable if debug
mc.Logger = log.New(ioutil.Discard, "", 0)
// ml overrides for connection
if v := cfg.BindAddr; v != "" {
mc.BindAddr = v
}
if v := cfg.BindPort; v > 0 {
mc.BindPort = v
}
if v := cfg.AdvertiseAddr; v != "" {
mc.AdvertiseAddr = v
}
if v := cfg.AdvertisePort; v > 0 {
mc.AdvertisePort = v
}
return mc, nil
}

98
delegate.go Normal file
View file

@ -0,0 +1,98 @@
package element
import (
"encoding/json"
"time"
"github.com/sirupsen/logrus"
)
type agentDelegate struct {
Name string
Addr string
Updated time.Time
Peers map[string]*PeerAgent
updateChan chan bool
nodeEventChan chan *NodeEvent
}
// NewAgentDelegate is the agent delegate used to handle cluster events
func NewAgentDelegate(name, addr string, updateCh chan bool, nodeEventCh chan *NodeEvent) *agentDelegate {
agent := &agentDelegate{
Name: name,
Addr: addr,
Peers: make(map[string]*PeerAgent),
updateChan: updateCh,
nodeEventChan: nodeEventCh,
}
// event handler
go func() {
for {
select {
case evt := <-nodeEventCh:
switch evt.EventType {
case NodeJoin:
case NodeUpdate:
case NodeLeave:
agent.removeNode(evt.Node.Name)
}
}
}
}()
return agent
}
// NodeMeta returns local node meta information
func (d *agentDelegate) NodeMeta(limit int) []byte {
data, err := json.Marshal(d.Peers)
if err != nil {
logrus.Errorf("error serializing node meta: %s", err)
}
return data
}
// NotifyMsg is used for handling cluster messages
func (d *agentDelegate) NotifyMsg(buf []byte) {
// this can be used to receive messages sent (i.e. SendReliable)
}
// GetBroadcasts is called when user messages can be broadcast
func (d *agentDelegate) GetBroadcasts(overhead, limit int) [][]byte {
return nil
}
// LocalState is the local cluster agent state
func (d *agentDelegate) LocalState(join bool) []byte {
data, err := json.Marshal(d)
if err != nil {
logrus.Errorf("error serializing local state: %s", err)
}
return []byte(data)
}
// MergeRemoteState is used to store remote peer information
func (d *agentDelegate) MergeRemoteState(buf []byte, join bool) {
var remoteAgent *agentDelegate
if err := json.Unmarshal(buf, &remoteAgent); err != nil {
logrus.Errorf("error parsing remote agent state: %s", err)
return
}
d.Updated = time.Now()
d.Peers[remoteAgent.Name] = &PeerAgent{
Name: remoteAgent.Name,
Addr: remoteAgent.Addr,
Updated: time.Now(),
}
// notify update
d.updateChan <- true
}
func (d *agentDelegate) removeNode(name string) {
if _, exists := d.Peers[name]; exists {
delete(d.Peers, name)
// notify update
d.updateChan <- true
}
}

60
events.go Normal file
View file

@ -0,0 +1,60 @@
package element
import (
"github.com/hashicorp/memberlist"
)
// NodeEventType is the type of node event
type NodeEventType string
const (
// NodeJoin is the event fired upon a node joining the cluster
NodeJoin NodeEventType = "join"
// NodeLeave is the event fired upon a node leaving the cluster
NodeLeave NodeEventType = "leave"
// NodeUpdate is the event fired upon a node updating in the cluster
NodeUpdate NodeEventType = "update"
)
// NodeEvent stores the event type and node information
type NodeEvent struct {
// EventType is the type of event fired
EventType NodeEventType
// Node is the internal cluster node
Node *memberlist.Node
}
// EventHandler is used for event handling
type EventHandler struct {
ch chan *NodeEvent
}
// NewEventHandler returns an EventHandler that is used to perform actions for the specified event
func NewEventHandler(ch chan *NodeEvent) *EventHandler {
return &EventHandler{
ch: ch,
}
}
// NotifyJoin notifies when a node joins the cluster
func (h *EventHandler) NotifyJoin(n *memberlist.Node) {
go h.notify(NodeJoin, n)
}
// NotifyLeave notifies when a node leaves the cluster
func (h *EventHandler) NotifyLeave(n *memberlist.Node) {
go h.notify(NodeLeave, n)
}
// NotifyUpdate notifies when a node is updated in the cluster
func (h *EventHandler) NotifyUpdate(n *memberlist.Node) {
go h.notify(NodeUpdate, n)
}
func (h *EventHandler) notify(t NodeEventType, n *memberlist.Node) {
// TODO: use context WithTimeout to enable cancel
h.ch <- &NodeEvent{
EventType: t,
Node: n,
}
}

41
peers.go Normal file
View file

@ -0,0 +1,41 @@
package element
import (
"encoding/json"
"fmt"
"time"
)
// PeerAgent is the peer information for an agent in the cluster including name and GRPC address
type PeerAgent struct {
Name string
Addr string
Updated time.Time
}
// Peers returns all known peers in the cluster
func (a *Agent) Peers() ([]*PeerAgent, error) {
self := a.members.LocalNode()
var (
peerAgents map[string]*PeerAgent
peers []*PeerAgent
)
if err := json.Unmarshal(self.Meta, &peerAgents); err != nil {
return nil, err
}
for _, p := range peerAgents {
peers = append(peers, p)
}
return peers, nil
}
// LocalNode returns local node peer info
func (a *Agent) LocalNode() (*PeerAgent, error) {
return &PeerAgent{
Name: a.config.NodeName,
Addr: fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort),
Updated: time.Now(),
}, nil
}

13
register.go Normal file
View file

@ -0,0 +1,13 @@
package element
import "fmt"
// Register registers a GRPC service with the agent
func (a *Agent) Register(svc Service) error {
id := svc.ID()
if _, exists := a.registeredServices[id]; exists {
return fmt.Errorf("service %s already registered", id)
}
svc.Register(a.grpcServer)
return nil
}

8
service.go Normal file
View file

@ -0,0 +1,8 @@
package element
import "google.golang.org/grpc"
type Service interface {
ID() string
Register(srv *grpc.Server) error
}

14
shutdown.go Normal file
View file

@ -0,0 +1,14 @@
package element
// Shutdown causes the local node to leave the cluster and perform a clean shutdown
func (a *Agent) Shutdown() error {
if err := a.members.Leave(nodeUpdateTimeout); err != nil {
return err
}
if err := a.members.Shutdown(); err != nil {
return err
}
return nil
}

62
start.go Normal file
View file

@ -0,0 +1,62 @@
package element
import (
"fmt"
"net"
"os"
"syscall"
"github.com/sirupsen/logrus"
)
// Start activates the GRPC listener as well as joins the cluster if specified and blocks until a SIGTERM or SIGINT is received
func (a *Agent) Start(signals chan os.Signal) error {
logrus.Infof("starting agent: grpc=%s:%d bind=%s:%d advertise=%s:%d",
a.config.AgentAddr,
a.config.AgentPort,
a.config.BindAddr,
a.config.BindPort,
a.config.AdvertiseAddr,
a.config.AdvertisePort,
)
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort))
if err != nil {
return err
}
go a.grpcServer.Serve(l)
// start node metadata updater
go func() {
for {
<-a.peerUpdateChan
if err := a.members.UpdateNode(nodeUpdateTimeout); err != nil {
logrus.Errorf("error updating node metadata: %s", err)
}
}
}()
if len(a.config.Peers) > 0 {
logrus.Debugf("joining peers: %v", a.config.Peers)
n, err := a.members.Join(a.config.Peers)
if err != nil {
return err
}
logrus.Infof("joined %d peer(s)", n)
}
for {
select {
case s := <-signals:
switch s {
case syscall.SIGTERM, syscall.SIGINT:
logrus.Debug("shutting down")
if err := a.Shutdown(); err != nil {
return err
}
return nil
}
}
}
}