Remove GRPC from element

I came in like a wrecking ball....

I think the grpc server and rpc configuration for the server should be
handled outside of this package.  There are many ways to configure it
and we need more flexability on start and shutdown for grpc services.
Signal handling should be in the caller.
This commit is contained in:
Michael Crosby 2018-09-14 08:25:35 -04:00
parent 52da33976c
commit 382f52335c
7 changed files with 32 additions and 90 deletions

View file

@ -5,7 +5,6 @@ import (
"time" "time"
"github.com/hashicorp/memberlist" "github.com/hashicorp/memberlist"
"google.golang.org/grpc"
) )
const ( const (
@ -24,7 +23,6 @@ type Agent struct {
members *memberlist.Memberlist members *memberlist.Memberlist
peerUpdateChan chan bool peerUpdateChan chan bool
nodeEventChan chan *NodeEvent nodeEventChan chan *NodeEvent
grpcServer *grpc.Server
registeredServices map[string]struct{} registeredServices map[string]struct{}
memberConfig *memberlist.Config memberConfig *memberlist.Config
} }
@ -33,7 +31,7 @@ type Agent struct {
func NewAgent(cfg *Config) (*Agent, error) { func NewAgent(cfg *Config) (*Agent, error) {
updateCh := make(chan bool) updateCh := make(chan bool)
nodeEventCh := make(chan *NodeEvent) nodeEventCh := make(chan *NodeEvent)
mc, err := setupMemberlistConfig(cfg, updateCh, nodeEventCh) mc, err := cfg.memberListConfig(updateCh, nodeEventCh)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -42,13 +40,11 @@ func NewAgent(cfg *Config) (*Agent, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
grpcServer := grpc.NewServer()
return &Agent{ return &Agent{
config: cfg, config: cfg,
members: ml, members: ml,
peerUpdateChan: updateCh, peerUpdateChan: updateCh,
nodeEventChan: nodeEventCh, nodeEventChan: nodeEventCh,
grpcServer: grpcServer,
memberConfig: mc, memberConfig: mc,
}, nil }, nil
} }

View file

@ -1,9 +1,10 @@
package element package element
import ( import (
"fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"net"
"strconv"
"github.com/hashicorp/memberlist" "github.com/hashicorp/memberlist"
) )
@ -24,20 +25,14 @@ const (
type Config struct { type Config struct {
// NodeName is the name of the node. Each node must have a unique name in the cluster. // NodeName is the name of the node. Each node must have a unique name in the cluster.
NodeName string NodeName string
// AgentAddr is the address on which the agent will serve the GRPC services // Address on which the agent will serve the GRPC services
AgentAddr string Address string
// AgentPort is the port on which the agent will serve the GRPC services
AgentPort int
// ConnectionType is the connection type the agent will use // ConnectionType is the connection type the agent will use
ConnectionType string ConnectionType string
// BindAddr is the cluster bind address // ClusterAddress bind address
BindAddr string ClusterAddress string
// BindPort is the cluster bind port // AdvertiseAddress for nat traversal
BindPort int AdvertiseAddress string
// AdvertiseAddr is the cluster address that will be used for membership communication
AdvertiseAddr string
// AdvertisePort is the cluster port that will be used for membership communication
AdvertisePort int
// Peers is a local cache of peer members // Peers is a local cache of peer members
Peers []string Peers []string
} }
@ -46,7 +41,7 @@ func (a *Agent) Config() *Config {
return a.config return a.config
} }
func setupMemberlistConfig(cfg *Config, peerUpdateChan chan bool, nodeEventChan chan *NodeEvent) (*memberlist.Config, error) { func (cfg *Config) memberListConfig(peerUpdateChan chan bool, nodeEventChan chan *NodeEvent) (*memberlist.Config, error) {
var mc *memberlist.Config var mc *memberlist.Config
switch cfg.ConnectionType { switch cfg.ConnectionType {
case string(Local): case string(Local):
@ -60,26 +55,32 @@ func setupMemberlistConfig(cfg *Config, peerUpdateChan chan bool, nodeEventChan
} }
mc.Name = cfg.NodeName mc.Name = cfg.NodeName
mc.Delegate = NewAgentDelegate(cfg.NodeName, fmt.Sprintf("%s:%d", cfg.AgentAddr, cfg.AgentPort), peerUpdateChan, nodeEventChan) mc.Delegate = NewAgentDelegate(cfg.NodeName, cfg.Address, peerUpdateChan, nodeEventChan)
mc.Events = NewEventHandler(nodeEventChan) mc.Events = NewEventHandler(nodeEventChan)
// disable logging for memberlist // disable logging for memberlist
// TODO: enable if debug // TODO: enable if debug
mc.Logger = log.New(ioutil.Discard, "", 0) mc.Logger = log.New(ioutil.Discard, "", 0)
host, port, err := net.SplitHostPort(cfg.ClusterAddress)
if err != nil {
return nil, err
}
// ml overrides for connection // ml overrides for connection
if v := cfg.BindAddr; v != "" { if v := host; v != "" {
mc.BindAddr = v mc.BindAddr = host
} }
if v := cfg.BindPort; v > 0 { if v := port; v != "" {
mc.BindPort = v mc.BindPort, _ = strconv.Atoi(port)
} }
if v := cfg.AdvertiseAddr; v != "" { if host, port, err = net.SplitHostPort(cfg.ClusterAddress); err != nil {
mc.AdvertiseAddr = v return nil, err
} }
if v := cfg.AdvertisePort; v > 0 { if v := host; v != "" {
mc.AdvertisePort = v mc.AdvertiseAddr = host
}
if v := port; v != "" {
mc.AdvertisePort, _ = strconv.Atoi(port)
} }
return mc, nil return mc, nil
} }

View file

@ -2,7 +2,6 @@ package element
import ( import (
"encoding/json" "encoding/json"
"fmt"
"time" "time"
) )
@ -35,7 +34,7 @@ func (a *Agent) Peers() ([]*PeerAgent, error) {
func (a *Agent) LocalNode() (*PeerAgent, error) { func (a *Agent) LocalNode() (*PeerAgent, error) {
return &PeerAgent{ return &PeerAgent{
Name: a.config.NodeName, Name: a.config.NodeName,
Addr: fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort), Addr: a.config.Address,
Updated: time.Now(), Updated: time.Now(),
}, nil }, nil
} }

View file

@ -1,13 +0,0 @@
package element
import "fmt"
// Register registers a GRPC service with the agent
func (a *Agent) Register(svc Service) error {
id := svc.ID()
if _, exists := a.registeredServices[id]; exists {
return fmt.Errorf("service %s already registered", id)
}
svc.Register(a.grpcServer)
return nil
}

View file

@ -1,10 +0,0 @@
package element
import "google.golang.org/grpc"
type Service interface {
// ID is the name of the service
ID() string
// Register is used to register the GRPC service
Register(srv *grpc.Server) error
}

View file

@ -5,10 +5,5 @@ func (a *Agent) Shutdown() error {
if err := a.members.Leave(nodeUpdateTimeout); err != nil { if err := a.members.Leave(nodeUpdateTimeout); err != nil {
return err return err
} }
return a.members.Shutdown()
if err := a.members.Shutdown(); err != nil {
return err
}
return nil
} }

View file

@ -1,44 +1,18 @@
package element package element
import ( import (
"fmt" "os"
"net"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// Start activates the GRPC listener as well as joins the cluster if specified and blocks until a SIGTERM or SIGINT is received // Start handles cluster events
func (a *Agent) Start() error { func (a *Agent) Start(s chan os.Signal) {
logrus.WithFields(logrus.Fields{
"grpc": fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort),
"bind": fmt.Sprintf("%s:%d", a.config.BindAddr, a.config.BindPort),
"advertise": fmt.Sprintf("%s:%d", a.config.AdvertiseAddr, a.config.AdvertisePort),
}).Info("starting agent")
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort))
if err != nil {
return err
}
go a.grpcServer.Serve(l)
// start node metadata updater
go func() { go func() {
for { for range a.peerUpdateChan {
<-a.peerUpdateChan
if err := a.members.UpdateNode(nodeUpdateTimeout); err != nil { if err := a.members.UpdateNode(nodeUpdateTimeout); err != nil {
logrus.Errorf("error updating node metadata: %s", err) logrus.Errorf("error updating node metadata: %s", err)
} }
} }
}() }()
if len(a.config.Peers) > 0 {
logrus.Debugf("joining peers: %v", a.config.Peers)
n, err := a.members.Join(a.config.Peers)
if err != nil {
return err
}
logrus.Infof("joined %d peer(s)", n)
}
return nil
} }