add preshared key based peer auth

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>
This commit is contained in:
Evan Hazlett 2019-10-03 10:10:50 -04:00
parent 562f1caa54
commit f0719ee8a9
No known key found for this signature in database
GPG key ID: A519480096146526
9 changed files with 92 additions and 40 deletions

View file

@ -23,6 +23,7 @@ package server
import (
"context"
"errors"
"github.com/gogo/protobuf/proto"
"github.com/gomodule/redigo/redis"
@ -30,9 +31,21 @@ import (
v1 "github.com/stellarproject/heimdall/api/v1"
)
var (
// ErrInvalidAuth is returned when an invalid cluster key is specified upon connect
ErrInvalidAuth = errors.New("invalid cluster key specified")
)
// Connect is called when a peer wants to connect to the node
func (s *Server) Connect(ctx context.Context, req *v1.ConnectRequest) (*v1.ConnectResponse, error) {
logrus.Debugf("connect request from %s", req.ID)
key, err := s.getClusterKey(ctx)
if err != nil {
return nil, err
}
if req.ClusterKey != key {
return nil, ErrInvalidAuth
}
data, err := redis.Bytes(s.local(ctx, "GET", masterKey))
if err != nil {
return nil, err

View file

@ -60,7 +60,7 @@ func (s *Server) configureNode() error {
logrus.Warn(err)
continue
}
m, err := c.Connect()
m, err := c.Connect(s.cfg.ClusterKey)
if err != nil {
c.Close()
logrus.Warn(err)
@ -149,13 +149,17 @@ func (s *Server) replicaMonitor() {
func (s *Server) masterHeartbeat() {
logrus.Debugf("starting master heartbeat: ttl=%s", heartbeatInterval)
// initial update
if err := s.updateMasterInfo(); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), heartbeatInterval)
defer cancel()
logrus.Infof("cluster master key=%s", s.cfg.ClusterKey)
if err := s.updateMasterInfo(ctx); err != nil {
logrus.Error(err)
}
t := time.NewTicker(heartbeatInterval)
for range t.C {
if err := s.updateMasterInfo(); err != nil {
if err := s.updateMasterInfo(ctx); err != nil {
logrus.Error(err)
continue
}
@ -193,8 +197,11 @@ func (s *Server) joinMaster(m *v1.Master) error {
return nil
}
func (s *Server) updateMasterInfo() error {
func (s *Server) updateMasterInfo(ctx context.Context) error {
// update master info
if _, err := s.master(ctx, "SET", clusterKey, s.cfg.ClusterKey); err != nil {
return err
}
m := &v1.Master{
ID: s.cfg.ID,
GRPCAddress: s.cfg.GRPCAddress,
@ -205,27 +212,28 @@ func (s *Server) updateMasterInfo() error {
return errors.Wrap(err, "error marshalling master info")
}
if _, err := s.master(context.Background(), "SET", masterKey, data); err != nil {
if _, err := s.master(ctx, "SET", masterKey, data); err != nil {
return errors.Wrap(err, "error setting master info")
}
if _, err := s.master(context.Background(), "EXPIRE", masterKey, int(heartbeatInterval.Seconds())); err != nil {
if _, err := s.master(ctx, "EXPIRE", masterKey, int(heartbeatInterval.Seconds())); err != nil {
return errors.Wrap(err, "error setting expire for master info")
}
return nil
}
func (s *Server) nodeHeartbeat() {
logrus.Debugf("starting node heartbeat: ttl=%s", heartbeatInterval)
t := time.NewTicker(heartbeatInterval)
logrus.Debugf("starting node heartbeat: ttl=%s", nodeHeartbeatInterval)
ctx := context.Background()
t := time.NewTicker(nodeHeartbeatInterval)
key := fmt.Sprintf("%s:%s", nodesKey, s.cfg.ID)
for range t.C {
if _, err := s.master(context.Background(), "SET", key, s.cfg.GRPCAddress); err != nil {
if _, err := s.master(ctx, "SET", key, s.cfg.GRPCAddress); err != nil {
logrus.Error(err)
continue
}
if _, err := s.master(context.Background(), "EXPIRE", key, nodeHeartbeatExpiry); err != nil {
if _, err := s.master(ctx, "EXPIRE", key, nodeHeartbeatExpiry); err != nil {
logrus.Error(err)
continue
}

View file

@ -40,14 +40,16 @@ import (
const (
masterKey = "heimdall:master"
clusterKey = "heimdall:key"
nodesKey = "heimdall:nodes"
nodeJoinKey = "heimdall:join"
)
var (
empty = &ptypes.Empty{}
heartbeatInterval = time.Second * 15
nodeHeartbeatExpiry = 86400
empty = &ptypes.Empty{}
heartbeatInterval = time.Second * 5
nodeHeartbeatInterval = time.Second * 60
nodeHeartbeatExpiry = 86400
)
type Server struct {
@ -97,7 +99,7 @@ func (s *Server) Run() error {
}
defer c.Close()
master, err := c.Connect()
master, err := c.Connect(s.cfg.ClusterKey)
if err != nil {
return err
}
@ -163,6 +165,10 @@ func (s *Server) getClient(addr string) (*client.Client, error) {
return client.NewClient(s.cfg.ID, addr)
}
func (s *Server) getClusterKey(ctx context.Context) (string, error) {
return redis.String(s.local(ctx, "GET", clusterKey))
}
func (s *Server) local(ctx context.Context, cmd string, args ...interface{}) (interface{}, error) {
return s.do(ctx, s.rpool, cmd, args...)
}