Merge pull request #4 from crosbymichael/labels

Expand Peer information
This commit is contained in:
Evan Hazlett 2018-09-20 11:36:46 -04:00 committed by GitHub
commit 27fd24a181
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 340 additions and 147 deletions

4
Makefile Normal file
View file

@ -0,0 +1,4 @@
PACKAGES=$(shell go list ./... | grep -v /vendor/)
protos:
protobuild --quiet ${PACKAGES}

30
Protobuild.toml Normal file
View file

@ -0,0 +1,30 @@
version = "unstable"
generator = "gogo"
plugins = []
# Control protoc include paths. Below are usually some good defaults, but feel
# free to try it without them if it works for your project.
[includes]
# Include paths that will be added before all others. Typically, you want to
# treat the root of the project as an include, but this may not be necessary.
# before = ["."]
# Paths that should be treated as include roots in relation to the vendor
# directory. These will be calculated with the vendor directory nearest the
# target package.
# vendored = ["github.com/gogo/protobuf"]
packages = ["github.com/gogo/protobuf"]
# Paths that will be added untouched to the end of the includes. We use
# `/usr/local/include` to pickup the common install location of protobuf.
# This is the default.
after = ["/usr/local/include"]
# This section maps protobuf imports to Go packages. These will become
# `-M` directives in the call to the go protobuf generator.
[packages]
"gogoproto/gogo.proto" = "github.com/gogo/protobuf/gogoproto"
"google/protobuf/any.proto" = "github.com/gogo/protobuf/types"
"google/protobuf/descriptor.proto" = "github.com/gogo/protobuf/protoc-gen-gogo/descriptor"
"google/protobuf/field_mask.proto" = "github.com/gogo/protobuf/types"
"google/protobuf/timestamp.proto" = "github.com/gogo/protobuf/types"

View file

@ -2,6 +2,7 @@ package element
import (
"errors"
"sync"
"time"
"github.com/hashicorp/memberlist"
@ -19,34 +20,45 @@ var (
// Agent represents the node agent
type Agent struct {
*subscribers
config *Config
members *memberlist.Memberlist
peerUpdateChan chan bool
nodeEventChan chan *NodeEvent
registeredServices map[string]struct{}
memberConfig *memberlist.Config
state *State
}
// NewAgent returns a new node agent
func NewAgent(cfg *Config) (*Agent, error) {
updateCh := make(chan bool)
nodeEventCh := make(chan *NodeEvent)
mc, err := cfg.memberListConfig(updateCh, nodeEventCh)
func NewAgent(info *Peer, cfg *Config) (*Agent, error) {
var (
updateCh = make(chan bool, 64)
nodeEventCh = make(chan *NodeEvent, 64)
)
a := &Agent{
subscribers: newSubscribers(),
config: cfg,
peerUpdateChan: updateCh,
nodeEventChan: nodeEventCh,
state: &State{
Self: info,
Peers: make(map[string]*Peer),
},
}
mc, err := cfg.memberListConfig(a)
if err != nil {
return nil, err
}
ml, err := memberlist.Create(mc)
if err != nil {
return nil, err
}
return &Agent{
config: cfg,
members: ml,
peerUpdateChan: updateCh,
nodeEventChan: nodeEventCh,
memberConfig: mc,
}, nil
a.members = ml
a.memberConfig = mc
return a, nil
}
// SyncInterval returns the cluster sync interval
@ -54,14 +66,42 @@ func (a *Agent) SyncInterval() time.Duration {
return a.memberConfig.PushPullInterval
}
func newSubscribers() *subscribers {
return &subscribers{
subs: make(map[chan *NodeEvent]struct{}),
}
}
type subscribers struct {
mu sync.Mutex
subs map[chan *NodeEvent]struct{}
}
// Subscribe subscribes to the node event channel
func (a *Agent) Subscribe(ch chan *NodeEvent) {
go func() {
for {
func (s *subscribers) Subscribe() chan *NodeEvent {
ch := make(chan *NodeEvent, 64)
s.mu.Lock()
s.subs[ch] = struct{}{}
s.mu.Unlock()
return ch
}
// Unsubscribe removes the channel from node events
func (s *subscribers) Unsubscribe(ch chan *NodeEvent) {
s.mu.Lock()
delete(s.subs, ch)
s.mu.Unlock()
}
func (s *subscribers) send(e *NodeEvent) {
s.mu.Lock()
for ch := range s.subs {
// non-blocking send
select {
case evt := <-a.nodeEventChan:
ch <- evt
case ch <- e:
default:
}
}
}()
s.mu.Unlock()
}

View file

@ -23,10 +23,6 @@ const (
// Config is the agent config
type Config struct {
// NodeName is the name of the node. Each node must have a unique name in the cluster.
NodeName string
// Address on which the agent will serve the GRPC services
Address string
// ConnectionType is the connection type the agent will use
ConnectionType string
// ClusterAddress bind address
@ -43,7 +39,7 @@ func (a *Agent) Config() *Config {
return a.config
}
func (cfg *Config) memberListConfig(peerUpdateChan chan bool, nodeEventChan chan *NodeEvent) (*memberlist.Config, error) {
func (cfg *Config) memberListConfig(a *Agent) (*memberlist.Config, error) {
var mc *memberlist.Config
switch cfg.ConnectionType {
case string(Local):
@ -56,9 +52,9 @@ func (cfg *Config) memberListConfig(peerUpdateChan chan bool, nodeEventChan chan
return nil, ErrUnknownConnectionType
}
mc.Name = cfg.NodeName
mc.Delegate = NewAgentDelegate(cfg.NodeName, cfg.Address, peerUpdateChan, nodeEventChan)
mc.Events = NewEventHandler(nodeEventChan)
mc.Name = a.state.Self.ID
mc.Delegate = a
mc.Events = a
if !cfg.Debug {
mc.Logger = log.New(ioutil.Discard, "", 0)

View file

@ -1,52 +1,15 @@
package element
import (
"encoding/json"
"time"
"github.com/gogo/protobuf/proto"
"github.com/sirupsen/logrus"
)
type agentDelegate struct {
Name string
Addr string
Updated time.Time
Peers map[string]*PeerAgent
updateChan chan bool
nodeEventChan chan *NodeEvent
}
// NewAgentDelegate is the agent delegate used to handle cluster events
func NewAgentDelegate(name, addr string, updateCh chan bool, nodeEventCh chan *NodeEvent) *agentDelegate {
agent := &agentDelegate{
Name: name,
Addr: addr,
Peers: make(map[string]*PeerAgent),
updateChan: updateCh,
nodeEventChan: nodeEventCh,
}
// event handler
go func() {
for {
select {
case evt := <-nodeEventCh:
switch evt.EventType {
case NodeJoin:
case NodeUpdate:
case NodeLeave:
agent.removeNode(evt.Node.Name)
}
}
}
}()
return agent
}
// NodeMeta returns local node meta information
func (d *agentDelegate) NodeMeta(limit int) []byte {
data, err := json.Marshal(d.Peers)
func (a *Agent) NodeMeta(limit int) []byte {
data, err := proto.Marshal(a.state)
if err != nil {
logrus.Errorf("error serializing node meta: %s", err)
}
@ -54,45 +17,33 @@ func (d *agentDelegate) NodeMeta(limit int) []byte {
}
// NotifyMsg is used for handling cluster messages
func (d *agentDelegate) NotifyMsg(buf []byte) {
func (a *Agent) NotifyMsg(buf []byte) {
// this can be used to receive messages sent (i.e. SendReliable)
}
// GetBroadcasts is called when user messages can be broadcast
func (d *agentDelegate) GetBroadcasts(overhead, limit int) [][]byte {
func (a *Agent) GetBroadcasts(overhead, limit int) [][]byte {
return nil
}
// LocalState is the local cluster agent state
func (d *agentDelegate) LocalState(join bool) []byte {
data, err := json.Marshal(d)
func (a *Agent) LocalState(join bool) []byte {
data, err := proto.Marshal(a.state)
if err != nil {
logrus.Errorf("error serializing local state: %s", err)
}
return []byte(data)
return data
}
// MergeRemoteState is used to store remote peer information
func (d *agentDelegate) MergeRemoteState(buf []byte, join bool) {
var remoteAgent *agentDelegate
if err := json.Unmarshal(buf, &remoteAgent); err != nil {
func (a *Agent) MergeRemoteState(buf []byte, join bool) {
var state State
if err := proto.Unmarshal(buf, &state); err != nil {
logrus.Errorf("error parsing remote agent state: %s", err)
return
}
d.Updated = time.Now()
d.Peers[remoteAgent.Name] = &PeerAgent{
Name: remoteAgent.Name,
Addr: remoteAgent.Addr,
Updated: time.Now(),
}
a.state.Updated = time.Now()
a.state.Peers[state.Self.ID] = state.Self
// notify update
d.updateChan <- true
}
func (d *agentDelegate) removeNode(name string) {
if _, exists := d.Peers[name]; exists {
delete(d.Peers, name)
// notify update
d.updateChan <- true
}
a.peerUpdateChan <- true
}

179
element.pb.go Normal file
View file

@ -0,0 +1,179 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: github.com/ehazlett/element/element.proto
package element
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
// skipping weak import gogoproto "github.com/gogo/protobuf/gogoproto"
import types "github.com/gogo/protobuf/types"
import time "time"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
var _ = time.Kitchen
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type Peer struct {
ID string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"`
Labels map[string]string `protobuf:"bytes,3,rep,name=labels" json:"labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Payload *types.Any `protobuf:"bytes,4,opt,name=payload" json:"payload,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Peer) Reset() { *m = Peer{} }
func (m *Peer) String() string { return proto.CompactTextString(m) }
func (*Peer) ProtoMessage() {}
func (*Peer) Descriptor() ([]byte, []int) {
return fileDescriptor_element_429f8a8a8de0c3bf, []int{0}
}
func (m *Peer) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Peer.Unmarshal(m, b)
}
func (m *Peer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Peer.Marshal(b, m, deterministic)
}
func (dst *Peer) XXX_Merge(src proto.Message) {
xxx_messageInfo_Peer.Merge(dst, src)
}
func (m *Peer) XXX_Size() int {
return xxx_messageInfo_Peer.Size(m)
}
func (m *Peer) XXX_DiscardUnknown() {
xxx_messageInfo_Peer.DiscardUnknown(m)
}
var xxx_messageInfo_Peer proto.InternalMessageInfo
func (m *Peer) GetID() string {
if m != nil {
return m.ID
}
return ""
}
func (m *Peer) GetAddress() string {
if m != nil {
return m.Address
}
return ""
}
func (m *Peer) GetLabels() map[string]string {
if m != nil {
return m.Labels
}
return nil
}
func (m *Peer) GetPayload() *types.Any {
if m != nil {
return m.Payload
}
return nil
}
type State struct {
Self *Peer `protobuf:"bytes,1,opt,name=self" json:"self,omitempty"`
Updated time.Time `protobuf:"bytes,2,opt,name=updated,stdtime" json:"updated"`
Peers map[string]*Peer `protobuf:"bytes,3,rep,name=peers" json:"peers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *State) Reset() { *m = State{} }
func (m *State) String() string { return proto.CompactTextString(m) }
func (*State) ProtoMessage() {}
func (*State) Descriptor() ([]byte, []int) {
return fileDescriptor_element_429f8a8a8de0c3bf, []int{1}
}
func (m *State) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_State.Unmarshal(m, b)
}
func (m *State) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_State.Marshal(b, m, deterministic)
}
func (dst *State) XXX_Merge(src proto.Message) {
xxx_messageInfo_State.Merge(dst, src)
}
func (m *State) XXX_Size() int {
return xxx_messageInfo_State.Size(m)
}
func (m *State) XXX_DiscardUnknown() {
xxx_messageInfo_State.DiscardUnknown(m)
}
var xxx_messageInfo_State proto.InternalMessageInfo
func (m *State) GetSelf() *Peer {
if m != nil {
return m.Self
}
return nil
}
func (m *State) GetUpdated() time.Time {
if m != nil {
return m.Updated
}
return time.Time{}
}
func (m *State) GetPeers() map[string]*Peer {
if m != nil {
return m.Peers
}
return nil
}
func init() {
proto.RegisterType((*Peer)(nil), "io.stellar.element.v1.Peer")
proto.RegisterMapType((map[string]string)(nil), "io.stellar.element.v1.Peer.LabelsEntry")
proto.RegisterType((*State)(nil), "io.stellar.element.v1.State")
proto.RegisterMapType((map[string]*Peer)(nil), "io.stellar.element.v1.State.PeersEntry")
}
func init() {
proto.RegisterFile("github.com/ehazlett/element/element.proto", fileDescriptor_element_429f8a8a8de0c3bf)
}
var fileDescriptor_element_429f8a8a8de0c3bf = []byte{
// 368 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x90, 0xcf, 0x4e, 0x83, 0x40,
0x10, 0xc6, 0x0b, 0xfd, 0xa7, 0xcb, 0xc5, 0x6c, 0xaa, 0x41, 0x3c, 0xd0, 0xf4, 0x62, 0xbd, 0x2c,
0x29, 0x5e, 0xd4, 0x44, 0x8d, 0x8d, 0x1e, 0x4c, 0x3c, 0x18, 0xd4, 0xc4, 0xeb, 0x22, 0x53, 0x4a,
0x5c, 0x58, 0x02, 0x4b, 0x13, 0x7c, 0x08, 0xe3, 0x63, 0xf9, 0x14, 0x35, 0x31, 0x3e, 0x88, 0x61,
0x01, 0xdb, 0x68, 0x63, 0x4f, 0xcc, 0x30, 0xdf, 0x37, 0xfb, 0x9b, 0x0f, 0x1d, 0xf8, 0x81, 0x98,
0x66, 0x2e, 0x79, 0xe2, 0xa1, 0x05, 0x53, 0xfa, 0xc2, 0x40, 0x08, 0x0b, 0x18, 0x84, 0x10, 0xfd,
0x7c, 0x49, 0x9c, 0x70, 0xc1, 0xf1, 0x76, 0xc0, 0x49, 0x2a, 0x80, 0x31, 0x9a, 0x90, 0x7a, 0x32,
0x1b, 0x19, 0x3d, 0x9f, 0xfb, 0x5c, 0x2a, 0xac, 0xa2, 0x2a, 0xc5, 0xc6, 0xae, 0xcf, 0xb9, 0xcf,
0xc0, 0x92, 0x9d, 0x9b, 0x4d, 0x2c, 0x1a, 0xe5, 0xd5, 0xc8, 0xfc, 0x3d, 0x12, 0x41, 0x08, 0xa9,
0xa0, 0x61, 0x5c, 0x0a, 0x06, 0x5f, 0x0a, 0x6a, 0xdd, 0x02, 0x24, 0x78, 0x07, 0xa9, 0x81, 0xa7,
0x2b, 0x7d, 0x65, 0xb8, 0x39, 0xee, 0x7c, 0xce, 0x4d, 0xf5, 0xfa, 0xd2, 0x51, 0x03, 0x0f, 0xeb,
0xa8, 0x4b, 0x3d, 0x2f, 0x81, 0x34, 0xd5, 0xd5, 0x62, 0xe8, 0xd4, 0x2d, 0x3e, 0x47, 0x1d, 0x46,
0x5d, 0x60, 0xa9, 0xde, 0xec, 0x37, 0x87, 0x9a, 0xbd, 0x4f, 0x56, 0x42, 0x93, 0x62, 0x3d, 0xb9,
0x91, 0xca, 0xab, 0x48, 0x24, 0xb9, 0x53, 0xd9, 0x30, 0x41, 0xdd, 0x98, 0xe6, 0x8c, 0x53, 0x4f,
0x6f, 0xf5, 0x95, 0xa1, 0x66, 0xf7, 0x48, 0x89, 0x4b, 0x6a, 0x5c, 0x72, 0x11, 0xe5, 0x4e, 0x2d,
0x32, 0x8e, 0x91, 0xb6, 0xb4, 0x06, 0x6f, 0xa1, 0xe6, 0x33, 0xe4, 0x25, 0xb2, 0x53, 0x94, 0xb8,
0x87, 0xda, 0x33, 0xca, 0x32, 0xa8, 0x48, 0xcb, 0xe6, 0x44, 0x3d, 0x52, 0x06, 0xaf, 0x2a, 0x6a,
0xdf, 0x09, 0x2a, 0x00, 0x5b, 0xa8, 0x95, 0x02, 0x9b, 0x48, 0x9b, 0x66, 0xef, 0xfd, 0xc3, 0xec,
0x48, 0x21, 0x3e, 0x43, 0xdd, 0x2c, 0xf6, 0xa8, 0x00, 0x4f, 0xae, 0xd5, 0x6c, 0xe3, 0x0f, 0xe5,
0x7d, 0x1d, 0xea, 0x78, 0xe3, 0x7d, 0x6e, 0x36, 0xde, 0x3e, 0x4c, 0xc5, 0xa9, 0x4d, 0xf8, 0x14,
0xb5, 0x63, 0x80, 0x64, 0x5d, 0x4a, 0x92, 0x4e, 0xbe, 0x5b, 0xa5, 0x54, 0xba, 0x8c, 0x07, 0x84,
0x16, 0x3f, 0x57, 0xdc, 0x3c, 0x5a, 0xbe, 0x79, 0xcd, 0x41, 0x8b, 0x40, 0x1e, 0x1b, 0x6e, 0x47,
0xe2, 0x1f, 0x7e, 0x07, 0x00, 0x00, 0xff, 0xff, 0xec, 0x93, 0xe7, 0x1d, 0x96, 0x02, 0x00, 0x00,
}

20
element.proto Normal file
View file

@ -0,0 +1,20 @@
syntax = "proto3";
package io.stellar.element.v1;
import weak "gogoproto/gogo.proto";
import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
message Peer {
string id = 1 [(gogoproto.customname) = "ID"];
string address = 2;
map<string, string> labels = 3;
google.protobuf.Any payload = 4;
}
message State {
Peer self = 1;
google.protobuf.Timestamp updated = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
map<string, Peer> peers = 3;
}

View file

@ -19,42 +19,33 @@ const (
// NodeEvent stores the event type and node information
type NodeEvent struct {
// EventType is the type of event fired
EventType NodeEventType
Type NodeEventType
// Node is the internal cluster node
Node *memberlist.Node
}
// EventHandler is used for event handling
type EventHandler struct {
ch chan *NodeEvent
}
// NewEventHandler returns an EventHandler that is used to perform actions for the specified event
func NewEventHandler(ch chan *NodeEvent) *EventHandler {
return &EventHandler{
ch: ch,
}
}
// NotifyJoin notifies when a node joins the cluster
func (h *EventHandler) NotifyJoin(n *memberlist.Node) {
go h.notify(NodeJoin, n)
func (a *Agent) NotifyJoin(n *memberlist.Node) {
a.send(&NodeEvent{
Type: NodeJoin,
Node: n,
})
}
// NotifyLeave notifies when a node leaves the cluster
func (h *EventHandler) NotifyLeave(n *memberlist.Node) {
go h.notify(NodeLeave, n)
func (a *Agent) NotifyLeave(n *memberlist.Node) {
delete(a.state.Peers, n.Name)
a.peerUpdateChan <- true
a.send(&NodeEvent{
Type: NodeLeave,
Node: n,
})
}
// NotifyUpdate notifies when a node is updated in the cluster
func (h *EventHandler) NotifyUpdate(n *memberlist.Node) {
go h.notify(NodeUpdate, n)
}
func (h *EventHandler) notify(t NodeEventType, n *memberlist.Node) {
// TODO: use context WithTimeout to enable cancel
h.ch <- &NodeEvent{
EventType: t,
func (a *Agent) NotifyUpdate(n *memberlist.Node) {
a.send(&NodeEvent{
Type: NodeUpdate,
Node: n,
}
})
}

View file

@ -1,40 +1,22 @@
package element
import (
"encoding/json"
"time"
)
// PeerAgent is the peer information for an agent in the cluster including name and GRPC address
type PeerAgent struct {
Name string
Addr string
Updated time.Time
}
import "github.com/gogo/protobuf/proto"
// Peers returns all known peers in the cluster
func (a *Agent) Peers() ([]*PeerAgent, error) {
func (a *Agent) Peers() ([]*Peer, error) {
self := a.members.LocalNode()
var (
peerAgents map[string]*PeerAgent
peers []*PeerAgent
)
if err := json.Unmarshal(self.Meta, &peerAgents); err != nil {
var state State
if err := proto.Unmarshal(self.Meta, &state); err != nil {
return nil, err
}
for _, p := range peerAgents {
var peers []*Peer
for _, p := range state.Peers {
peers = append(peers, p)
}
return peers, nil
}
// LocalNode returns local node peer info
func (a *Agent) LocalNode() (*PeerAgent, error) {
return &PeerAgent{
Name: a.config.NodeName,
Addr: a.config.Address,
Updated: time.Now(),
}, nil
// Self returns the local peer information
func (a *Agent) Self() *Peer {
return a.state.Self
}