added server; initial cluster state
Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>
This commit is contained in:
parent
6fed0fa858
commit
562f1caa54
704 changed files with 361956 additions and 534 deletions
48
server/connect.go
Normal file
48
server/connect.go
Normal file
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
Copyright 2019 Stellar Project
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the "Software"), to deal in the
|
||||
Software without restriction, including without limitation the rights to use, copy,
|
||||
modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
|
||||
and to permit persons to whom the Software is furnished to do so, subject to the
|
||||
following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies
|
||||
or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
|
||||
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
|
||||
PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
|
||||
FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
|
||||
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||
USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/sirupsen/logrus"
|
||||
v1 "github.com/stellarproject/heimdall/api/v1"
|
||||
)
|
||||
|
||||
// Connect is called when a peer wants to connect to the node
|
||||
func (s *Server) Connect(ctx context.Context, req *v1.ConnectRequest) (*v1.ConnectResponse, error) {
|
||||
logrus.Debugf("connect request from %s", req.ID)
|
||||
data, err := redis.Bytes(s.local(ctx, "GET", masterKey))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var master v1.Master
|
||||
if err := proto.Unmarshal(data, &master); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &v1.ConnectResponse{
|
||||
Master: &master,
|
||||
}, nil
|
||||
}
|
233
server/node.go
Normal file
233
server/node.go
Normal file
|
@ -0,0 +1,233 @@
|
|||
/*
|
||||
Copyright 2019 Stellar Project
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the "Software"), to deal in the
|
||||
Software without restriction, including without limitation the rights to use, copy,
|
||||
modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
|
||||
and to permit persons to whom the Software is furnished to do so, subject to the
|
||||
following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies
|
||||
or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
|
||||
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
|
||||
PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
|
||||
FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
|
||||
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||
USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
v1 "github.com/stellarproject/heimdall/api/v1"
|
||||
)
|
||||
|
||||
func (s *Server) configureNode() error {
|
||||
ctx := context.Background()
|
||||
nodes, err := redis.Strings(s.local(ctx, "KEYS", fmt.Sprintf("%s:*", nodesKey)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// attempt to connect to existing
|
||||
if len(nodes) > 0 {
|
||||
for _, node := range nodes {
|
||||
addr, err := redis.String(s.local(ctx, "GET", node))
|
||||
if err != nil {
|
||||
logrus.Warn(err)
|
||||
continue
|
||||
}
|
||||
// ignore self
|
||||
if addr == s.cfg.GRPCAddress {
|
||||
continue
|
||||
}
|
||||
|
||||
logrus.Infof("attempting to join existing node %s", addr)
|
||||
c, err := s.getClient(addr)
|
||||
if err != nil {
|
||||
logrus.Warn(err)
|
||||
continue
|
||||
}
|
||||
m, err := c.Connect()
|
||||
if err != nil {
|
||||
c.Close()
|
||||
logrus.Warn(err)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := s.joinMaster(m); err != nil {
|
||||
c.Close()
|
||||
logrus.Warn(err)
|
||||
continue
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
data, err := redis.Bytes(s.local(context.Background(), "GET", masterKey))
|
||||
if err != nil {
|
||||
if err != redis.ErrNil {
|
||||
return err
|
||||
}
|
||||
logrus.Infof("starting as master id=%s", s.cfg.ID)
|
||||
if _, err := s.local(context.Background(), "REPLICAOF", "NO", "ONE"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// start server heartbeat
|
||||
logrus.Debug("starting master heartbeat")
|
||||
go s.masterHeartbeat()
|
||||
|
||||
// reset replica settings when promoting to master
|
||||
logrus.Debug("disabling replica status")
|
||||
s.disableReplica()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var master v1.Master
|
||||
if err := proto.Unmarshal(data, &master); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.joinMaster(&master); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go s.replicaMonitor()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) disableReplica() {
|
||||
s.wpool = getPool(s.cfg.RedisURL)
|
||||
|
||||
// signal replica monitor to stop if started as a peer
|
||||
close(s.replicaCh)
|
||||
|
||||
// unset peer
|
||||
s.cfg.GRPCPeerAddress = ""
|
||||
}
|
||||
|
||||
func (s *Server) replicaMonitor() {
|
||||
logrus.Debugf("starting replica monitor: ttl=%s", heartbeatInterval)
|
||||
s.replicaCh = make(chan struct{}, 1)
|
||||
t := time.NewTicker(heartbeatInterval)
|
||||
go func() {
|
||||
for range t.C {
|
||||
if _, err := redis.Bytes(s.local(context.Background(), "GET", masterKey)); err != nil {
|
||||
if err == redis.ErrNil {
|
||||
if err := s.configureNode(); err != nil {
|
||||
logrus.Error(err)
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
logrus.Error(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
<-s.replicaCh
|
||||
logrus.Debug("stopping replica monitor")
|
||||
t.Stop()
|
||||
}
|
||||
|
||||
func (s *Server) masterHeartbeat() {
|
||||
logrus.Debugf("starting master heartbeat: ttl=%s", heartbeatInterval)
|
||||
// initial update
|
||||
if err := s.updateMasterInfo(); err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
|
||||
t := time.NewTicker(heartbeatInterval)
|
||||
for range t.C {
|
||||
if err := s.updateMasterInfo(); err != nil {
|
||||
logrus.Error(err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) joinMaster(m *v1.Master) error {
|
||||
// configure replica
|
||||
logrus.Infof("configuring node as replica of %+v", m.ID)
|
||||
conn, err := redis.DialURL(s.cfg.RedisURL)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to connect to redis")
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
u, err := url.Parse(m.RedisURL)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error parsing master redis url")
|
||||
}
|
||||
hostPort := strings.SplitN(u.Host, ":", 2)
|
||||
host := hostPort[0]
|
||||
port := hostPort[1]
|
||||
if _, err := conn.Do("REPLICAOF", host, port); err != nil {
|
||||
return err
|
||||
}
|
||||
// auth
|
||||
auth, ok := u.User.Password()
|
||||
if ok {
|
||||
if _, err := conn.Do("CONFIG", "SET", "MASTERAUTH", auth); err != nil {
|
||||
return errors.Wrap(err, "error authenticating to redis")
|
||||
}
|
||||
}
|
||||
|
||||
s.wpool = getPool(m.RedisURL)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) updateMasterInfo() error {
|
||||
// update master info
|
||||
m := &v1.Master{
|
||||
ID: s.cfg.ID,
|
||||
GRPCAddress: s.cfg.GRPCAddress,
|
||||
RedisURL: s.cfg.AdvertiseRedisURL,
|
||||
}
|
||||
data, err := proto.Marshal(m)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error marshalling master info")
|
||||
}
|
||||
|
||||
if _, err := s.master(context.Background(), "SET", masterKey, data); err != nil {
|
||||
return errors.Wrap(err, "error setting master info")
|
||||
}
|
||||
|
||||
if _, err := s.master(context.Background(), "EXPIRE", masterKey, int(heartbeatInterval.Seconds())); err != nil {
|
||||
return errors.Wrap(err, "error setting expire for master info")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) nodeHeartbeat() {
|
||||
logrus.Debugf("starting node heartbeat: ttl=%s", heartbeatInterval)
|
||||
t := time.NewTicker(heartbeatInterval)
|
||||
key := fmt.Sprintf("%s:%s", nodesKey, s.cfg.ID)
|
||||
for range t.C {
|
||||
if _, err := s.master(context.Background(), "SET", key, s.cfg.GRPCAddress); err != nil {
|
||||
logrus.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err := s.master(context.Background(), "EXPIRE", key, nodeHeartbeatExpiry); err != nil {
|
||||
logrus.Error(err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
182
server/server.go
Normal file
182
server/server.go
Normal file
|
@ -0,0 +1,182 @@
|
|||
/*
|
||||
Copyright 2019 Stellar Project
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the "Software"), to deal in the
|
||||
Software without restriction, including without limitation the rights to use, copy,
|
||||
modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
|
||||
and to permit persons to whom the Software is furnished to do so, subject to the
|
||||
following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies
|
||||
or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
|
||||
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
|
||||
PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
|
||||
FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
|
||||
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||
USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
"time"
|
||||
|
||||
ptypes "github.com/gogo/protobuf/types"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/stellarproject/heimdall"
|
||||
v1 "github.com/stellarproject/heimdall/api/v1"
|
||||
"github.com/stellarproject/heimdall/client"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
const (
|
||||
masterKey = "heimdall:master"
|
||||
nodesKey = "heimdall:nodes"
|
||||
nodeJoinKey = "heimdall:join"
|
||||
)
|
||||
|
||||
var (
|
||||
empty = &ptypes.Empty{}
|
||||
heartbeatInterval = time.Second * 15
|
||||
nodeHeartbeatExpiry = 86400
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
cfg *heimdall.Config
|
||||
rpool *redis.Pool
|
||||
wpool *redis.Pool
|
||||
replicaCh chan struct{}
|
||||
}
|
||||
|
||||
func NewServer(cfg *heimdall.Config) (*Server, error) {
|
||||
pool := getPool(cfg.RedisURL)
|
||||
return &Server{
|
||||
cfg: cfg,
|
||||
rpool: pool,
|
||||
wpool: pool,
|
||||
replicaCh: make(chan struct{}, 1),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Register enables callers to register this service with an existing GRPC server
|
||||
func (s *Server) Register(server *grpc.Server) error {
|
||||
v1.RegisterHeimdallServer(server, s)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GenerateProfile generates a new Go profile
|
||||
func (s *Server) GenerateProfile() (string, error) {
|
||||
tmpfile, err := ioutil.TempFile("", "heimdall-profile-")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
runtime.GC()
|
||||
if err := pprof.WriteHeapProfile(tmpfile); err != nil {
|
||||
return "", err
|
||||
}
|
||||
tmpfile.Close()
|
||||
return tmpfile.Name(), nil
|
||||
}
|
||||
|
||||
func (s *Server) Run() error {
|
||||
// check peer address and make a grpc request for master info if present
|
||||
if s.cfg.GRPCPeerAddress != "" {
|
||||
logrus.Debugf("joining %s", s.cfg.GRPCPeerAddress)
|
||||
c, err := s.getClient(s.cfg.GRPCPeerAddress)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
master, err := c.Connect()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logrus.Debugf("master info received: %+v", master)
|
||||
if err := s.joinMaster(master); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go s.replicaMonitor()
|
||||
} else {
|
||||
// starting as master; remove existing key
|
||||
if err := s.configureNode(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
go s.nodeHeartbeat()
|
||||
|
||||
// start listener for pub/sub
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
c := s.rpool.Get()
|
||||
defer c.Close()
|
||||
|
||||
psc := redis.PubSubConn{Conn: c}
|
||||
psc.Subscribe(nodeJoinKey)
|
||||
for {
|
||||
switch v := psc.Receive().(type) {
|
||||
case redis.Message:
|
||||
// TODO: handle join notify
|
||||
logrus.Debug("join notify")
|
||||
case redis.Subscription:
|
||||
default:
|
||||
logrus.Debugf("unknown message type %T", v)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
err := <-errCh
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Server) Stop() error {
|
||||
s.rpool.Close()
|
||||
s.wpool.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func getPool(u string) *redis.Pool {
|
||||
pool := redis.NewPool(func() (redis.Conn, error) {
|
||||
conn, err := redis.DialURL(u)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to connect to redis")
|
||||
}
|
||||
return conn, nil
|
||||
}, 10)
|
||||
|
||||
return pool
|
||||
}
|
||||
|
||||
func (s *Server) getClient(addr string) (*client.Client, error) {
|
||||
return client.NewClient(s.cfg.ID, addr)
|
||||
}
|
||||
|
||||
func (s *Server) local(ctx context.Context, cmd string, args ...interface{}) (interface{}, error) {
|
||||
return s.do(ctx, s.rpool, cmd, args...)
|
||||
}
|
||||
|
||||
func (s *Server) master(ctx context.Context, cmd string, args ...interface{}) (interface{}, error) {
|
||||
return s.do(ctx, s.wpool, cmd, args...)
|
||||
}
|
||||
|
||||
func (s *Server) do(ctx context.Context, pool *redis.Pool, cmd string, args ...interface{}) (interface{}, error) {
|
||||
conn, err := pool.GetContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer conn.Close()
|
||||
r, err := conn.Do(cmd, args...)
|
||||
return r, err
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue