enable redis over wireguard

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>
This commit is contained in:
Evan Hazlett 2019-10-07 23:37:13 -04:00
parent 190ec3130d
commit 48d43ee2ee
8 changed files with 210 additions and 85 deletions

View file

@ -30,6 +30,7 @@ import (
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type subnetRange struct {
@ -39,6 +40,7 @@ type subnetRange struct {
}
func (s *Server) updateNodeNetwork(ctx context.Context, id string, subnet string) error {
logrus.Debugf("updating node network: id=%s subnet=%s", id, subnet)
if _, err := s.master(ctx, "SET", s.getNodeNetworkKey(id), subnet); err != nil {
return err
}

View file

@ -115,12 +115,14 @@ func (s *Server) configureNode() error {
continue
}
// TODO: start tunnel
// start tunnel
if err := s.updatePeerConfig(ctx, r.Node, r.Peers); err != nil {
return err
}
// TODO: wait for tunnel to come up
time.Sleep(time.Second * 20)
// wait for tunnel to come up
if err := s.waitForMaster(ctx, r.Master); err != nil {
return err
}
if err := s.joinMaster(r.Master); err != nil {
c.Close()
@ -128,6 +130,11 @@ func (s *Server) configureNode() error {
continue
}
logrus.Debug("waiting for redis sync")
if err := s.waitForRedisSync(ctx); err != nil {
return err
}
go s.replicaMonitor()
return nil
@ -200,10 +207,12 @@ func (s *Server) replicaMonitor() {
logrus.Error(err)
continue
}
logrus.Debugf("replica monitor: node=%s", n)
if n == nil || n.ID != s.cfg.ID {
logrus.Debugf("waiting for new master to initialize: %s", n.ID)
logrus.Debug("waiting for new master to initialize")
continue
}
logrus.Debugf("replica monitor: configuring node with master %+v", n)
if err := s.configureNode(); err != nil {
logrus.Error(err)
continue
@ -285,11 +294,13 @@ func (s *Server) joinMaster(m *v1.Master) error {
func (s *Server) updateMasterInfo(ctx context.Context) error {
// update master info
if _, err := s.master(ctx, "SET", clusterKey, s.cfg.ClusterKey); err != nil {
logrus.Error("updateMasterInfo.setClusterKey")
return err
}
// build redis url with gateway ip
gatewayIP, _, err := s.getNodeIP(ctx, s.cfg.ID)
if err != nil {
logrus.Error("updateMasterInfo.getNodeIP")
return err
}
u, err := url.Parse(s.cfg.RedisURL)
@ -302,6 +313,7 @@ func (s *Server) updateMasterInfo(ctx context.Context) error {
ID: s.cfg.ID,
GRPCAddress: s.cfg.AdvertiseGRPCAddress,
RedisURL: u.String(),
GatewayIP: gatewayIP.String(),
}
data, err := proto.Marshal(m)
if err != nil {
@ -334,11 +346,11 @@ func (s *Server) updateLocalNodeInfo(ctx context.Context) error {
key := s.getNodeKey(s.cfg.ID)
keyPair, err := s.getOrCreateKeyPair(ctx, s.cfg.ID)
if err != nil {
return err
return errors.Wrapf(err, "error getting keypair for %s", s.cfg.ID)
}
nodeIP, _, err := s.getNodeIP(ctx, s.cfg.ID)
if err != nil {
return err
return errors.Wrapf(err, "error getting node IP for %s", s.cfg.ID)
}
node := &v1.Node{
Updated: time.Now(),

View file

@ -234,15 +234,10 @@ func (s *Server) updatePeerConfig(ctx context.Context, node *v1.Node, peers []*v
nodePeers = append(nodePeers, peer)
}
keyPair, err := s.getOrCreateKeyPair(ctx, node.ID)
if err != nil {
return err
}
//size, _ := gatewayNet.Mask.Size()
wireguardCfg := &wg.Config{
Iface: node.InterfaceName,
PrivateKey: keyPair.PrivateKey,
PrivateKey: node.KeyPair.PrivateKey,
ListenPort: int(node.EndpointPort),
Address: fmt.Sprintf("%s/%d", node.GatewayIP, 16),
Peers: nodePeers,

View file

@ -22,15 +22,20 @@
package server
import (
"bufio"
"bytes"
"context"
"fmt"
"io/ioutil"
"net"
"net/url"
"path/filepath"
"runtime"
"runtime/pprof"
"strings"
"time"
ping "github.com/digineo/go-ping"
"github.com/gogo/protobuf/proto"
ptypes "github.com/gogo/protobuf/types"
"github.com/gomodule/redigo/redis"
@ -137,19 +142,27 @@ func (s *Server) Run() error {
return err
}
logrus.Debugf("response: %+v", r)
logrus.Debugf("master info received: %+v", r)
// start tunnel
if err := s.updatePeerConfig(ctx, r.Node, r.Peers); err != nil {
return errors.Wrap(err, "error updating peer config")
}
// TODO: wait for tunnel to come up
time.Sleep(time.Second * 20)
logrus.Debugf("master info received: %+v", r)
// wait for tunnel to come up
if err := s.waitForMaster(ctx, r.Master); err != nil {
return errors.Wrap(err, "error waiting for master")
}
logrus.Debugf("joining master: %s", r.Master.ID)
if err := s.joinMaster(r.Master); err != nil {
return err
}
logrus.Debug("waiting for redis sync")
if err := s.waitForRedisSync(ctx); err != nil {
return err
}
go s.replicaMonitor()
} else {
if err := s.configureNode(); err != nil {
@ -251,6 +264,80 @@ func getPool(redisUrl string) (*redis.Pool, error) {
return pool, nil
}
func (s *Server) waitForMaster(ctx context.Context, m *v1.Master) error {
p, err := ping.New("0.0.0.0", "")
if err != nil {
return err
}
defer p.Close()
doneCh := make(chan time.Duration)
errCh := make(chan error)
go func() {
for {
ip, err := net.ResolveIPAddr("ip4", m.GatewayIP)
if err != nil {
errCh <- err
return
}
rtt, err := p.Ping(ip, time.Second*30)
if err != nil {
errCh <- err
}
doneCh <- rtt
}
}()
select {
case rtt := <-doneCh:
logrus.Debugf("rtt master ping: %s", rtt)
return nil
case err := <-errCh:
return err
}
return nil
}
func (s *Server) waitForRedisSync(ctx context.Context) error {
doneCh := make(chan bool)
errCh := make(chan error)
go func() {
for {
info, err := redis.String(s.local(ctx, "INFO", "REPLICATION"))
if err != nil {
logrus.Warn(err)
continue
}
b := bytes.NewBufferString(info)
s := bufio.NewScanner(b)
for s.Scan() {
v := s.Text()
parts := strings.SplitN(v, ":", 2)
if parts[0] == "master_link_status" {
if parts[1] == "up" {
doneCh <- true
return
}
}
}
time.Sleep(time.Second * 1)
}
}()
select {
case <-doneCh:
return nil
case err := <-errCh:
return err
case <-time.After(nodeHeartbeatInterval):
return fmt.Errorf("timeout waiting on sync")
}
}
func (s *Server) ensureNetworkSubnet(ctx context.Context, id string) error {
network, err := redis.String(s.local(ctx, "GET", s.getNodeNetworkKey(id)))
if err != nil {
@ -267,6 +354,7 @@ func (s *Server) ensureNetworkSubnet(ctx context.Context, id string) error {
if err != nil {
return err
}
logrus.Debugf("node networks: %s", nodeNetworkKeys)
lookup := map[string]struct{}{}
for _, netKey := range nodeNetworkKeys {
n, err := redis.String(s.local(ctx, "GET", netKey))
@ -276,6 +364,8 @@ func (s *Server) ensureNetworkSubnet(ctx context.Context, id string) error {
lookup[n] = struct{}{}
}
logrus.Debugf("lookup: %+v", lookup)
subnet := r.Subnet
size, _ := subnet.Mask.Size()