diff --git a/agent.go b/agent.go index 1a5dfc6..90035de 100644 --- a/agent.go +++ b/agent.go @@ -5,7 +5,6 @@ import ( "time" "github.com/hashicorp/memberlist" - "google.golang.org/grpc" ) const ( @@ -24,7 +23,6 @@ type Agent struct { members *memberlist.Memberlist peerUpdateChan chan bool nodeEventChan chan *NodeEvent - grpcServer *grpc.Server registeredServices map[string]struct{} memberConfig *memberlist.Config } @@ -33,7 +31,7 @@ type Agent struct { func NewAgent(cfg *Config) (*Agent, error) { updateCh := make(chan bool) nodeEventCh := make(chan *NodeEvent) - mc, err := setupMemberlistConfig(cfg, updateCh, nodeEventCh) + mc, err := cfg.memberListConfig(updateCh, nodeEventCh) if err != nil { return nil, err } @@ -42,13 +40,11 @@ func NewAgent(cfg *Config) (*Agent, error) { if err != nil { return nil, err } - grpcServer := grpc.NewServer() return &Agent{ config: cfg, members: ml, peerUpdateChan: updateCh, nodeEventChan: nodeEventCh, - grpcServer: grpcServer, memberConfig: mc, }, nil } diff --git a/config.go b/config.go index 318fe91..a7fd817 100644 --- a/config.go +++ b/config.go @@ -1,9 +1,10 @@ package element import ( - "fmt" "io/ioutil" "log" + "net" + "strconv" "github.com/hashicorp/memberlist" ) @@ -24,20 +25,14 @@ const ( type Config struct { // NodeName is the name of the node. Each node must have a unique name in the cluster. NodeName string - // AgentAddr is the address on which the agent will serve the GRPC services - AgentAddr string - // AgentPort is the port on which the agent will serve the GRPC services - AgentPort int + // Address on which the agent will serve the GRPC services + Address string // ConnectionType is the connection type the agent will use ConnectionType string - // BindAddr is the cluster bind address - BindAddr string - // BindPort is the cluster bind port - BindPort int - // AdvertiseAddr is the cluster address that will be used for membership communication - AdvertiseAddr string - // AdvertisePort is the cluster port that will be used for membership communication - AdvertisePort int + // ClusterAddress bind address + ClusterAddress string + // AdvertiseAddress for nat traversal + AdvertiseAddress string // Peers is a local cache of peer members Peers []string } @@ -46,7 +41,7 @@ func (a *Agent) Config() *Config { return a.config } -func setupMemberlistConfig(cfg *Config, peerUpdateChan chan bool, nodeEventChan chan *NodeEvent) (*memberlist.Config, error) { +func (cfg *Config) memberListConfig(peerUpdateChan chan bool, nodeEventChan chan *NodeEvent) (*memberlist.Config, error) { var mc *memberlist.Config switch cfg.ConnectionType { case string(Local): @@ -60,26 +55,32 @@ func setupMemberlistConfig(cfg *Config, peerUpdateChan chan bool, nodeEventChan } mc.Name = cfg.NodeName - mc.Delegate = NewAgentDelegate(cfg.NodeName, fmt.Sprintf("%s:%d", cfg.AgentAddr, cfg.AgentPort), peerUpdateChan, nodeEventChan) + mc.Delegate = NewAgentDelegate(cfg.NodeName, cfg.Address, peerUpdateChan, nodeEventChan) mc.Events = NewEventHandler(nodeEventChan) // disable logging for memberlist // TODO: enable if debug mc.Logger = log.New(ioutil.Discard, "", 0) + host, port, err := net.SplitHostPort(cfg.ClusterAddress) + if err != nil { + return nil, err + } // ml overrides for connection - if v := cfg.BindAddr; v != "" { - mc.BindAddr = v + if v := host; v != "" { + mc.BindAddr = host } - if v := cfg.BindPort; v > 0 { - mc.BindPort = v + if v := port; v != "" { + mc.BindPort, _ = strconv.Atoi(port) } - if v := cfg.AdvertiseAddr; v != "" { - mc.AdvertiseAddr = v + if host, port, err = net.SplitHostPort(cfg.ClusterAddress); err != nil { + return nil, err } - if v := cfg.AdvertisePort; v > 0 { - mc.AdvertisePort = v + if v := host; v != "" { + mc.AdvertiseAddr = host + } + if v := port; v != "" { + mc.AdvertisePort, _ = strconv.Atoi(port) } - return mc, nil } diff --git a/peers.go b/peers.go index 477da9e..d41647a 100644 --- a/peers.go +++ b/peers.go @@ -2,7 +2,6 @@ package element import ( "encoding/json" - "fmt" "time" ) @@ -35,7 +34,7 @@ func (a *Agent) Peers() ([]*PeerAgent, error) { func (a *Agent) LocalNode() (*PeerAgent, error) { return &PeerAgent{ Name: a.config.NodeName, - Addr: fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort), + Addr: a.config.Address, Updated: time.Now(), }, nil } diff --git a/register.go b/register.go deleted file mode 100644 index 5e22a75..0000000 --- a/register.go +++ /dev/null @@ -1,13 +0,0 @@ -package element - -import "fmt" - -// Register registers a GRPC service with the agent -func (a *Agent) Register(svc Service) error { - id := svc.ID() - if _, exists := a.registeredServices[id]; exists { - return fmt.Errorf("service %s already registered", id) - } - svc.Register(a.grpcServer) - return nil -} diff --git a/service.go b/service.go deleted file mode 100644 index 61433e1..0000000 --- a/service.go +++ /dev/null @@ -1,10 +0,0 @@ -package element - -import "google.golang.org/grpc" - -type Service interface { - // ID is the name of the service - ID() string - // Register is used to register the GRPC service - Register(srv *grpc.Server) error -} diff --git a/shutdown.go b/shutdown.go index 2a2ae67..b19ffdb 100644 --- a/shutdown.go +++ b/shutdown.go @@ -5,10 +5,5 @@ func (a *Agent) Shutdown() error { if err := a.members.Leave(nodeUpdateTimeout); err != nil { return err } - - if err := a.members.Shutdown(); err != nil { - return err - } - - return nil + return a.members.Shutdown() } diff --git a/start.go b/start.go index 2f33bcb..2ecde43 100644 --- a/start.go +++ b/start.go @@ -1,44 +1,18 @@ package element import ( - "fmt" - "net" + "os" "github.com/sirupsen/logrus" ) -// Start activates the GRPC listener as well as joins the cluster if specified and blocks until a SIGTERM or SIGINT is received -func (a *Agent) Start() error { - logrus.WithFields(logrus.Fields{ - "grpc": fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort), - "bind": fmt.Sprintf("%s:%d", a.config.BindAddr, a.config.BindPort), - "advertise": fmt.Sprintf("%s:%d", a.config.AdvertiseAddr, a.config.AdvertisePort), - }).Info("starting agent") - l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort)) - if err != nil { - return err - } - go a.grpcServer.Serve(l) - - // start node metadata updater +// Start handles cluster events +func (a *Agent) Start(s chan os.Signal) { go func() { - for { - <-a.peerUpdateChan + for range a.peerUpdateChan { if err := a.members.UpdateNode(nodeUpdateTimeout); err != nil { logrus.Errorf("error updating node metadata: %s", err) } } }() - - if len(a.config.Peers) > 0 { - logrus.Debugf("joining peers: %v", a.config.Peers) - n, err := a.members.Join(a.config.Peers) - if err != nil { - return err - } - - logrus.Infof("joined %d peer(s)", n) - } - - return nil }