enable master election
Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>
This commit is contained in:
parent
bde4dba555
commit
9b07492180
8 changed files with 112 additions and 63 deletions
|
@ -34,6 +34,8 @@ import (
|
|||
var (
|
||||
// ErrInvalidAuth is returned when an invalid cluster key is specified upon connect
|
||||
ErrInvalidAuth = errors.New("invalid cluster key specified")
|
||||
// ErrNoMaster is returned if there is no configured master yet
|
||||
ErrNoMaster = errors.New("no configured master")
|
||||
)
|
||||
|
||||
// Connect is called when a peer wants to connect to the node
|
||||
|
@ -48,6 +50,9 @@ func (s *Server) Connect(ctx context.Context, req *v1.ConnectRequest) (*v1.Conne
|
|||
}
|
||||
data, err := redis.Bytes(s.local(ctx, "GET", masterKey))
|
||||
if err != nil {
|
||||
if err == redis.ErrNil {
|
||||
return nil, ErrNoMaster
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
var master v1.Master
|
||||
|
|
|
@ -24,6 +24,7 @@ package server
|
|||
import (
|
||||
"context"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -121,6 +122,8 @@ func (s *Server) configureNode() error {
|
|||
continue
|
||||
}
|
||||
|
||||
go s.replicaMonitor()
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -178,6 +181,16 @@ func (s *Server) replicaMonitor() {
|
|||
for range t.C {
|
||||
if _, err := redis.Bytes(s.local(context.Background(), "GET", masterKey)); err != nil {
|
||||
if err == redis.ErrNil {
|
||||
// skip configure until new leader election
|
||||
n, err := s.getNextMaster(context.Background())
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
continue
|
||||
}
|
||||
if n.ID != s.cfg.ID {
|
||||
logrus.Debugf("waiting for new master to initialize: %s", n.ID)
|
||||
continue
|
||||
}
|
||||
if err := s.configureNode(); err != nil {
|
||||
logrus.Error(err)
|
||||
continue
|
||||
|
@ -194,6 +207,18 @@ func (s *Server) replicaMonitor() {
|
|||
t.Stop()
|
||||
}
|
||||
|
||||
func (s *Server) getNextMaster(ctx context.Context) (*v1.Node, error) {
|
||||
nodes, err := s.getNodes(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(nodes) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
sort.SliceStable(nodes, func(i, j int) bool { return nodes[i].Updated.Before(nodes[j].Updated) })
|
||||
return nodes[len(nodes)-1], nil
|
||||
}
|
||||
|
||||
func (s *Server) masterHeartbeat() {
|
||||
logrus.Debugf("starting master heartbeat: ttl=%s", masterHeartbeatInterval)
|
||||
// initial update
|
||||
|
@ -286,6 +311,7 @@ func (s *Server) nodeHeartbeat(ctx context.Context) {
|
|||
continue
|
||||
}
|
||||
node := &v1.Node{
|
||||
Updated: time.Now(),
|
||||
ID: s.cfg.ID,
|
||||
Addr: s.cfg.GRPCAddress,
|
||||
KeyPair: keyPair,
|
||||
|
|
|
@ -58,7 +58,7 @@ const (
|
|||
var (
|
||||
empty = &ptypes.Empty{}
|
||||
masterHeartbeatInterval = time.Second * 5
|
||||
nodeHeartbeatInterval = time.Second * 60
|
||||
nodeHeartbeatInterval = time.Second * 15
|
||||
nodeHeartbeatExpiry = 86400
|
||||
peerConfigUpdateInterval = time.Second * 10
|
||||
|
||||
|
@ -130,7 +130,6 @@ func (s *Server) Run() error {
|
|||
|
||||
go s.replicaMonitor()
|
||||
} else {
|
||||
// starting as master; remove existing key
|
||||
if err := s.configureNode(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue