From 382f52335cac85ab687f13e0c0a6aa7d87ed76b5 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Fri, 14 Sep 2018 08:25:35 -0400 Subject: [PATCH] Remove GRPC from element I came in like a wrecking ball.... I think the grpc server and rpc configuration for the server should be handled outside of this package. There are many ways to configure it and we need more flexability on start and shutdown for grpc services. Signal handling should be in the caller. --- agent.go | 6 +----- config.go | 49 +++++++++++++++++++++++++------------------------ peers.go | 3 +-- register.go | 13 ------------- service.go | 10 ---------- shutdown.go | 7 +------ start.go | 34 ++++------------------------------ 7 files changed, 32 insertions(+), 90 deletions(-) delete mode 100644 register.go delete mode 100644 service.go diff --git a/agent.go b/agent.go index 1a5dfc6..90035de 100644 --- a/agent.go +++ b/agent.go @@ -5,7 +5,6 @@ import ( "time" "github.com/hashicorp/memberlist" - "google.golang.org/grpc" ) const ( @@ -24,7 +23,6 @@ type Agent struct { members *memberlist.Memberlist peerUpdateChan chan bool nodeEventChan chan *NodeEvent - grpcServer *grpc.Server registeredServices map[string]struct{} memberConfig *memberlist.Config } @@ -33,7 +31,7 @@ type Agent struct { func NewAgent(cfg *Config) (*Agent, error) { updateCh := make(chan bool) nodeEventCh := make(chan *NodeEvent) - mc, err := setupMemberlistConfig(cfg, updateCh, nodeEventCh) + mc, err := cfg.memberListConfig(updateCh, nodeEventCh) if err != nil { return nil, err } @@ -42,13 +40,11 @@ func NewAgent(cfg *Config) (*Agent, error) { if err != nil { return nil, err } - grpcServer := grpc.NewServer() return &Agent{ config: cfg, members: ml, peerUpdateChan: updateCh, nodeEventChan: nodeEventCh, - grpcServer: grpcServer, memberConfig: mc, }, nil } diff --git a/config.go b/config.go index 318fe91..a7fd817 100644 --- a/config.go +++ b/config.go @@ -1,9 +1,10 @@ package element import ( - "fmt" "io/ioutil" "log" + "net" + "strconv" "github.com/hashicorp/memberlist" ) @@ -24,20 +25,14 @@ const ( type Config struct { // NodeName is the name of the node. Each node must have a unique name in the cluster. NodeName string - // AgentAddr is the address on which the agent will serve the GRPC services - AgentAddr string - // AgentPort is the port on which the agent will serve the GRPC services - AgentPort int + // Address on which the agent will serve the GRPC services + Address string // ConnectionType is the connection type the agent will use ConnectionType string - // BindAddr is the cluster bind address - BindAddr string - // BindPort is the cluster bind port - BindPort int - // AdvertiseAddr is the cluster address that will be used for membership communication - AdvertiseAddr string - // AdvertisePort is the cluster port that will be used for membership communication - AdvertisePort int + // ClusterAddress bind address + ClusterAddress string + // AdvertiseAddress for nat traversal + AdvertiseAddress string // Peers is a local cache of peer members Peers []string } @@ -46,7 +41,7 @@ func (a *Agent) Config() *Config { return a.config } -func setupMemberlistConfig(cfg *Config, peerUpdateChan chan bool, nodeEventChan chan *NodeEvent) (*memberlist.Config, error) { +func (cfg *Config) memberListConfig(peerUpdateChan chan bool, nodeEventChan chan *NodeEvent) (*memberlist.Config, error) { var mc *memberlist.Config switch cfg.ConnectionType { case string(Local): @@ -60,26 +55,32 @@ func setupMemberlistConfig(cfg *Config, peerUpdateChan chan bool, nodeEventChan } mc.Name = cfg.NodeName - mc.Delegate = NewAgentDelegate(cfg.NodeName, fmt.Sprintf("%s:%d", cfg.AgentAddr, cfg.AgentPort), peerUpdateChan, nodeEventChan) + mc.Delegate = NewAgentDelegate(cfg.NodeName, cfg.Address, peerUpdateChan, nodeEventChan) mc.Events = NewEventHandler(nodeEventChan) // disable logging for memberlist // TODO: enable if debug mc.Logger = log.New(ioutil.Discard, "", 0) + host, port, err := net.SplitHostPort(cfg.ClusterAddress) + if err != nil { + return nil, err + } // ml overrides for connection - if v := cfg.BindAddr; v != "" { - mc.BindAddr = v + if v := host; v != "" { + mc.BindAddr = host } - if v := cfg.BindPort; v > 0 { - mc.BindPort = v + if v := port; v != "" { + mc.BindPort, _ = strconv.Atoi(port) } - if v := cfg.AdvertiseAddr; v != "" { - mc.AdvertiseAddr = v + if host, port, err = net.SplitHostPort(cfg.ClusterAddress); err != nil { + return nil, err } - if v := cfg.AdvertisePort; v > 0 { - mc.AdvertisePort = v + if v := host; v != "" { + mc.AdvertiseAddr = host + } + if v := port; v != "" { + mc.AdvertisePort, _ = strconv.Atoi(port) } - return mc, nil } diff --git a/peers.go b/peers.go index 477da9e..d41647a 100644 --- a/peers.go +++ b/peers.go @@ -2,7 +2,6 @@ package element import ( "encoding/json" - "fmt" "time" ) @@ -35,7 +34,7 @@ func (a *Agent) Peers() ([]*PeerAgent, error) { func (a *Agent) LocalNode() (*PeerAgent, error) { return &PeerAgent{ Name: a.config.NodeName, - Addr: fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort), + Addr: a.config.Address, Updated: time.Now(), }, nil } diff --git a/register.go b/register.go deleted file mode 100644 index 5e22a75..0000000 --- a/register.go +++ /dev/null @@ -1,13 +0,0 @@ -package element - -import "fmt" - -// Register registers a GRPC service with the agent -func (a *Agent) Register(svc Service) error { - id := svc.ID() - if _, exists := a.registeredServices[id]; exists { - return fmt.Errorf("service %s already registered", id) - } - svc.Register(a.grpcServer) - return nil -} diff --git a/service.go b/service.go deleted file mode 100644 index 61433e1..0000000 --- a/service.go +++ /dev/null @@ -1,10 +0,0 @@ -package element - -import "google.golang.org/grpc" - -type Service interface { - // ID is the name of the service - ID() string - // Register is used to register the GRPC service - Register(srv *grpc.Server) error -} diff --git a/shutdown.go b/shutdown.go index 2a2ae67..b19ffdb 100644 --- a/shutdown.go +++ b/shutdown.go @@ -5,10 +5,5 @@ func (a *Agent) Shutdown() error { if err := a.members.Leave(nodeUpdateTimeout); err != nil { return err } - - if err := a.members.Shutdown(); err != nil { - return err - } - - return nil + return a.members.Shutdown() } diff --git a/start.go b/start.go index 2f33bcb..2ecde43 100644 --- a/start.go +++ b/start.go @@ -1,44 +1,18 @@ package element import ( - "fmt" - "net" + "os" "github.com/sirupsen/logrus" ) -// Start activates the GRPC listener as well as joins the cluster if specified and blocks until a SIGTERM or SIGINT is received -func (a *Agent) Start() error { - logrus.WithFields(logrus.Fields{ - "grpc": fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort), - "bind": fmt.Sprintf("%s:%d", a.config.BindAddr, a.config.BindPort), - "advertise": fmt.Sprintf("%s:%d", a.config.AdvertiseAddr, a.config.AdvertisePort), - }).Info("starting agent") - l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort)) - if err != nil { - return err - } - go a.grpcServer.Serve(l) - - // start node metadata updater +// Start handles cluster events +func (a *Agent) Start(s chan os.Signal) { go func() { - for { - <-a.peerUpdateChan + for range a.peerUpdateChan { if err := a.members.UpdateNode(nodeUpdateTimeout); err != nil { logrus.Errorf("error updating node metadata: %s", err) } } }() - - if len(a.config.Peers) > 0 { - logrus.Debugf("joining peers: %v", a.config.Peers) - n, err := a.members.Join(a.config.Peers) - if err != nil { - return err - } - - logrus.Infof("joined %d peer(s)", n) - } - - return nil }