Merge pull request #1 from crosbymichael/miley

Remove GRPC from element
This commit is contained in:
Evan Hazlett 2018-09-14 08:49:52 -04:00 committed by GitHub
commit a1a8f3fa5a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 32 additions and 90 deletions

View file

@ -5,7 +5,6 @@ import (
"time"
"github.com/hashicorp/memberlist"
"google.golang.org/grpc"
)
const (
@ -24,7 +23,6 @@ type Agent struct {
members *memberlist.Memberlist
peerUpdateChan chan bool
nodeEventChan chan *NodeEvent
grpcServer *grpc.Server
registeredServices map[string]struct{}
memberConfig *memberlist.Config
}
@ -33,7 +31,7 @@ type Agent struct {
func NewAgent(cfg *Config) (*Agent, error) {
updateCh := make(chan bool)
nodeEventCh := make(chan *NodeEvent)
mc, err := setupMemberlistConfig(cfg, updateCh, nodeEventCh)
mc, err := cfg.memberListConfig(updateCh, nodeEventCh)
if err != nil {
return nil, err
}
@ -42,13 +40,11 @@ func NewAgent(cfg *Config) (*Agent, error) {
if err != nil {
return nil, err
}
grpcServer := grpc.NewServer()
return &Agent{
config: cfg,
members: ml,
peerUpdateChan: updateCh,
nodeEventChan: nodeEventCh,
grpcServer: grpcServer,
memberConfig: mc,
}, nil
}

View file

@ -1,9 +1,10 @@
package element
import (
"fmt"
"io/ioutil"
"log"
"net"
"strconv"
"github.com/hashicorp/memberlist"
)
@ -24,20 +25,14 @@ const (
type Config struct {
// NodeName is the name of the node. Each node must have a unique name in the cluster.
NodeName string
// AgentAddr is the address on which the agent will serve the GRPC services
AgentAddr string
// AgentPort is the port on which the agent will serve the GRPC services
AgentPort int
// Address on which the agent will serve the GRPC services
Address string
// ConnectionType is the connection type the agent will use
ConnectionType string
// BindAddr is the cluster bind address
BindAddr string
// BindPort is the cluster bind port
BindPort int
// AdvertiseAddr is the cluster address that will be used for membership communication
AdvertiseAddr string
// AdvertisePort is the cluster port that will be used for membership communication
AdvertisePort int
// ClusterAddress bind address
ClusterAddress string
// AdvertiseAddress for nat traversal
AdvertiseAddress string
// Peers is a local cache of peer members
Peers []string
}
@ -46,7 +41,7 @@ func (a *Agent) Config() *Config {
return a.config
}
func setupMemberlistConfig(cfg *Config, peerUpdateChan chan bool, nodeEventChan chan *NodeEvent) (*memberlist.Config, error) {
func (cfg *Config) memberListConfig(peerUpdateChan chan bool, nodeEventChan chan *NodeEvent) (*memberlist.Config, error) {
var mc *memberlist.Config
switch cfg.ConnectionType {
case string(Local):
@ -60,26 +55,32 @@ func setupMemberlistConfig(cfg *Config, peerUpdateChan chan bool, nodeEventChan
}
mc.Name = cfg.NodeName
mc.Delegate = NewAgentDelegate(cfg.NodeName, fmt.Sprintf("%s:%d", cfg.AgentAddr, cfg.AgentPort), peerUpdateChan, nodeEventChan)
mc.Delegate = NewAgentDelegate(cfg.NodeName, cfg.Address, peerUpdateChan, nodeEventChan)
mc.Events = NewEventHandler(nodeEventChan)
// disable logging for memberlist
// TODO: enable if debug
mc.Logger = log.New(ioutil.Discard, "", 0)
host, port, err := net.SplitHostPort(cfg.ClusterAddress)
if err != nil {
return nil, err
}
// ml overrides for connection
if v := cfg.BindAddr; v != "" {
mc.BindAddr = v
if v := host; v != "" {
mc.BindAddr = host
}
if v := cfg.BindPort; v > 0 {
mc.BindPort = v
if v := port; v != "" {
mc.BindPort, _ = strconv.Atoi(port)
}
if v := cfg.AdvertiseAddr; v != "" {
mc.AdvertiseAddr = v
if host, port, err = net.SplitHostPort(cfg.ClusterAddress); err != nil {
return nil, err
}
if v := cfg.AdvertisePort; v > 0 {
mc.AdvertisePort = v
if v := host; v != "" {
mc.AdvertiseAddr = host
}
if v := port; v != "" {
mc.AdvertisePort, _ = strconv.Atoi(port)
}
return mc, nil
}

View file

@ -2,7 +2,6 @@ package element
import (
"encoding/json"
"fmt"
"time"
)
@ -35,7 +34,7 @@ func (a *Agent) Peers() ([]*PeerAgent, error) {
func (a *Agent) LocalNode() (*PeerAgent, error) {
return &PeerAgent{
Name: a.config.NodeName,
Addr: fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort),
Addr: a.config.Address,
Updated: time.Now(),
}, nil
}

View file

@ -1,13 +0,0 @@
package element
import "fmt"
// Register registers a GRPC service with the agent
func (a *Agent) Register(svc Service) error {
id := svc.ID()
if _, exists := a.registeredServices[id]; exists {
return fmt.Errorf("service %s already registered", id)
}
svc.Register(a.grpcServer)
return nil
}

View file

@ -1,10 +0,0 @@
package element
import "google.golang.org/grpc"
type Service interface {
// ID is the name of the service
ID() string
// Register is used to register the GRPC service
Register(srv *grpc.Server) error
}

View file

@ -5,10 +5,5 @@ func (a *Agent) Shutdown() error {
if err := a.members.Leave(nodeUpdateTimeout); err != nil {
return err
}
if err := a.members.Shutdown(); err != nil {
return err
}
return nil
return a.members.Shutdown()
}

View file

@ -1,44 +1,18 @@
package element
import (
"fmt"
"net"
"os"
"github.com/sirupsen/logrus"
)
// Start activates the GRPC listener as well as joins the cluster if specified and blocks until a SIGTERM or SIGINT is received
func (a *Agent) Start() error {
logrus.WithFields(logrus.Fields{
"grpc": fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort),
"bind": fmt.Sprintf("%s:%d", a.config.BindAddr, a.config.BindPort),
"advertise": fmt.Sprintf("%s:%d", a.config.AdvertiseAddr, a.config.AdvertisePort),
}).Info("starting agent")
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", a.config.AgentAddr, a.config.AgentPort))
if err != nil {
return err
}
go a.grpcServer.Serve(l)
// start node metadata updater
// Start handles cluster events
func (a *Agent) Start(s chan os.Signal) {
go func() {
for {
<-a.peerUpdateChan
for range a.peerUpdateChan {
if err := a.members.UpdateNode(nodeUpdateTimeout); err != nil {
logrus.Errorf("error updating node metadata: %s", err)
}
}
}()
if len(a.config.Peers) > 0 {
logrus.Debugf("joining peers: %v", a.config.Peers)
n, err := a.members.Join(a.config.Peers)
if err != nil {
return err
}
logrus.Infof("joined %d peer(s)", n)
}
return nil
}