add basic client; enable node level routing

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>
This commit is contained in:
Evan Hazlett 2019-10-04 22:56:46 -04:00
parent c0515d4802
commit ed73f97bd3
No known key found for this signature in database
GPG key ID: A519480096146526
16 changed files with 1292 additions and 143 deletions

View file

@ -34,6 +34,51 @@ import (
v1 "github.com/stellarproject/heimdall/api/v1"
)
// Nodes returns a list of known nodes
func (s *Server) Nodes(ctx context.Context, req *v1.NodesRequest) (*v1.NodesResponse, error) {
nodes, err := s.getNodes(ctx)
if err != nil {
return nil, err
}
return &v1.NodesResponse{
Nodes: nodes,
}, nil
}
func (s *Server) getNodes(ctx context.Context) ([]*v1.Node, error) {
nodeKeys, err := redis.Strings(s.local(ctx, "KEYS", s.getNodeKey("*")))
if err != nil {
return nil, err
}
var nodes []*v1.Node
for _, nodeKey := range nodeKeys {
data, err := redis.Bytes(s.local(ctx, "GET", nodeKey))
if err != nil {
return nil, err
}
var node v1.Node
if err := proto.Unmarshal(data, &node); err != nil {
return nil, err
}
nodes = append(nodes, &node)
}
return nodes, nil
}
func (s *Server) getNode(ctx context.Context, id string) (*v1.Node, error) {
data, err := redis.Bytes(s.local(ctx, "GET", s.getNodeKey(id)))
if err != nil {
return nil, err
}
var node v1.Node
if err := proto.Unmarshal(data, &node); err != nil {
return nil, err
}
return &node, nil
}
func (s *Server) configureNode() error {
ctx := context.Background()
nodeKeys, err := redis.Strings(s.local(ctx, "KEYS", s.getNodeKey("*")))

View file

@ -35,6 +35,60 @@ import (
v1 "github.com/stellarproject/heimdall/api/v1"
)
// Peers returns a list of known peers
func (s *Server) Peers(ctx context.Context, req *v1.PeersRequest) (*v1.PeersResponse, error) {
peers, err := s.getPeers(ctx)
if err != nil {
return nil, err
}
return &v1.PeersResponse{
Peers: peers,
}, nil
}
func (s *Server) getPeers(ctx context.Context) ([]*v1.Peer, error) {
peerKeys, err := redis.Strings(s.local(ctx, "KEYS", s.getPeerKey("*")))
if err != nil {
return nil, err
}
var peers []*v1.Peer
for _, peerKey := range peerKeys {
data, err := redis.Bytes(s.local(ctx, "GET", peerKey))
if err != nil {
return nil, err
}
var peer v1.Peer
if err := proto.Unmarshal(data, &peer); err != nil {
return nil, err
}
peers = append(peers, &peer)
}
return peers, nil
}
func (s *Server) peerUpdater(ctx context.Context) {
logrus.Debugf("starting peer config updater: ttl=%s", peerConfigUpdateInterval)
t := time.NewTicker(peerConfigUpdateInterval)
for range t.C {
uctx, cancel := context.WithTimeout(ctx, peerConfigUpdateInterval)
if err := s.updatePeerInfo(uctx); err != nil {
logrus.Errorf("updatePeerInfo: %s", err)
cancel()
continue
}
if err := s.updatePeerConfig(uctx); err != nil {
logrus.Errorf("updatePeerConfig: %s", err)
cancel()
continue
}
cancel()
}
}
func (s *Server) updatePeerInfo(ctx context.Context) error {
keypair, err := s.getOrCreateKeyPair(ctx, s.cfg.ID)
if err != nil {
@ -43,8 +97,23 @@ func (s *Server) updatePeerInfo(ctx context.Context) error {
endpoint := fmt.Sprintf("%s:%d", s.cfg.GatewayIP, s.cfg.GatewayPort)
// TODO: build allowedIPs from routes and peer network
// build allowedIPs from routes and peer network
allowedIPs := []string{s.cfg.PeerNetwork}
routes, err := s.getRoutes(ctx)
if err != nil {
return err
}
for _, route := range routes {
// only add the route if a peer to prevent route duplicate
if route.NodeID != s.cfg.ID {
continue
}
logrus.Debugf("adding route to allowed IPs: %s", route.Network)
allowedIPs = append(allowedIPs, route.Network)
}
n := &v1.Peer{
ID: s.cfg.ID,
@ -57,7 +126,23 @@ func (s *Server) updatePeerInfo(ctx context.Context) error {
if err != nil {
return err
}
pHash := hashData(data)
key := s.getPeerKey(s.cfg.ID)
peerData, err := redis.Bytes(s.local(ctx, "GET", key))
if err != nil {
if err != redis.ErrNil {
return err
}
}
eHash := hashData(peerData)
// skip update if same
if pHash == eHash {
return nil
}
if _, err := s.master(ctx, "SET", key, data); err != nil {
return err
}
@ -84,98 +169,86 @@ func (s *Server) getPeerInfo(ctx context.Context, id string) (*v1.Peer, error) {
return &peer, nil
}
func (s *Server) updatePeerConfig(ctx context.Context) {
logrus.Debugf("starting peer config updater: ttl=%s", peerConfigUpdateInterval)
t := time.NewTicker(peerConfigUpdateInterval)
configHash := ""
for range t.C {
uctx, cancel := context.WithTimeout(ctx, peerConfigUpdateInterval)
peerKeys, err := redis.Strings(s.local(uctx, "KEYS", s.getPeerKey("*")))
if err != nil {
logrus.Error(err)
cancel()
continue
}
var peers []*v1.Peer
for _, peerKey := range peerKeys {
peerData, err := redis.Bytes(s.local(uctx, "GET", peerKey))
if err != nil {
logrus.Error(err)
cancel()
continue
}
var p v1.Peer
if err := proto.Unmarshal(peerData, &p); err != nil {
logrus.Error(err)
cancel()
continue
}
// do not add self as a peer
if p.ID == s.cfg.ID {
continue
}
peers = append(peers, &p)
}
keyPair, err := s.getOrCreateKeyPair(ctx, s.cfg.ID)
if err != nil {
logrus.Error(err)
cancel()
continue
}
gatewayIP, _, err := s.getOrAllocateIP(ctx, s.cfg.ID)
if err != nil {
logrus.Error(err)
cancel()
continue
}
wireguardCfg := &wireguardConfig{
Iface: defaultWireguardInterface,
PrivateKey: keyPair.PrivateKey,
ListenPort: s.cfg.GatewayPort,
Address: gatewayIP.String() + "/32",
Peers: peers,
}
tmpCfg, err := generateNodeWireguardConfig(wireguardCfg)
if err != nil {
logrus.Error(err)
cancel()
continue
}
h, err := hashConfig(tmpCfg)
if err != nil {
logrus.Error(err)
cancel()
continue
}
// if config has not change skip update
if h == configHash {
continue
}
logrus.Debugf("updating peer config to version %s", h)
// update wireguard config
if err := os.Rename(tmpCfg, wireguardConfigPath); err != nil {
logrus.Error(err)
cancel()
continue
}
// reload wireguard
if err := restartWireguardTunnel(ctx); err != nil {
logrus.Error(err)
cancel()
continue
}
configHash = h
func (s *Server) updatePeerConfig(ctx context.Context) error {
peerKeys, err := redis.Strings(s.local(ctx, "KEYS", s.getPeerKey("*")))
if err != nil {
return err
}
var peers []*v1.Peer
for _, peerKey := range peerKeys {
peerData, err := redis.Bytes(s.local(ctx, "GET", peerKey))
if err != nil {
return err
}
var p v1.Peer
if err := proto.Unmarshal(peerData, &p); err != nil {
return err
}
// do not add self as a peer
if p.ID == s.cfg.ID {
continue
}
peers = append(peers, &p)
}
keyPair, err := s.getOrCreateKeyPair(ctx, s.cfg.ID)
if err != nil {
return err
}
gatewayIP, _, err := s.getOrAllocateIP(ctx, s.cfg.ID)
if err != nil {
return err
}
wireguardCfg := &wireguardConfig{
Iface: defaultWireguardInterface,
PrivateKey: keyPair.PrivateKey,
ListenPort: s.cfg.GatewayPort,
Address: gatewayIP.String() + "/32",
Peers: peers,
}
tmpCfg, err := generateNodeWireguardConfig(wireguardCfg)
if err != nil {
return err
}
h, err := hashConfig(tmpCfg)
if err != nil {
return err
}
e, err := hashConfig(wireguardConfigPath)
if err != nil {
return err
}
// if config has not change skip update
if h == e {
logrus.Debugf("config not changed: hash=%s", e)
return nil
}
logrus.Debugf("updating peer config to version %s", h)
// update wireguard config
if err := os.Rename(tmpCfg, wireguardConfigPath); err != nil {
return err
}
// reload wireguard
if err := restartWireguardTunnel(ctx); err != nil {
return err
}
return nil
}
func hashData(data []byte) string {
h := sha256.New()
h.Write(data)
return fmt.Sprintf("%x", h.Sum(nil))
}
func hashConfig(cfgPath string) (string, error) {
@ -184,7 +257,5 @@ func hashConfig(cfgPath string) (string, error) {
return "", err
}
h := sha256.New()
h.Write(peerData)
return fmt.Sprintf("%x", h.Sum(nil)), nil
return hashData(peerData), nil
}

113
server/route.go Normal file
View file

@ -0,0 +1,113 @@
/*
Copyright 2019 Stellar Project
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in the
Software without restriction, including without limitation the rights to use, copy,
modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
and to permit persons to whom the Software is furnished to do so, subject to the
following conditions:
The above copyright notice and this permission notice shall be included in all copies
or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package server
import (
"context"
"github.com/gogo/protobuf/proto"
ptypes "github.com/gogo/protobuf/types"
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
v1 "github.com/stellarproject/heimdall/api/v1"
)
// CreateRoute reserves a new route
func (s *Server) CreateRoute(ctx context.Context, req *v1.CreateRouteRequest) (*ptypes.Empty, error) {
// check for existing route
routeKey := s.getRouteKey(req.Network)
routeData, err := redis.Bytes(s.local(ctx, "GET", routeKey))
if err != nil {
if err != redis.ErrNil {
return nil, err
}
}
if routeData != nil {
return nil, errors.Wrap(ErrRouteExists, req.Network)
}
// check for node id
if _, err := redis.Bytes(s.local(ctx, "GET", s.getNodeKey(req.NodeID))); err != nil {
if err == redis.ErrNil {
return nil, errors.Wrap(ErrNodeDoesNotExist, req.NodeID)
}
return nil, err
}
// save route
route := &v1.Route{
NodeID: req.NodeID,
Network: req.Network,
}
data, err := proto.Marshal(route)
if err != nil {
return nil, err
}
if _, err := s.master(ctx, "SET", routeKey, data); err != nil {
return nil, err
}
return empty, nil
}
// Delete deletes a new route
func (s *Server) DeleteRoute(ctx context.Context, req *v1.DeleteRouteRequest) (*ptypes.Empty, error) {
routeKey := s.getRouteKey(req.Network)
if _, err := s.master(ctx, "DEL", routeKey); err != nil {
return nil, err
}
return empty, nil
}
// Routes returns a list of known routes
func (s *Server) Routes(ctx context.Context, req *v1.RoutesRequest) (*v1.RoutesResponse, error) {
routes, err := s.getRoutes(ctx)
if err != nil {
return nil, err
}
return &v1.RoutesResponse{
Routes: routes,
}, nil
}
func (s *Server) getRoutes(ctx context.Context) ([]*v1.Route, error) {
routeKeys, err := redis.Strings(s.local(ctx, "KEYS", s.getRouteKey("*")))
if err != nil {
return nil, err
}
var routes []*v1.Route
for _, routeKey := range routeKeys {
data, err := redis.Bytes(s.local(ctx, "GET", routeKey))
if err != nil {
return nil, err
}
var route v1.Route
if err := proto.Unmarshal(data, &route); err != nil {
return nil, err
}
routes = append(routes, &route)
}
return routes, nil
}

View file

@ -47,6 +47,7 @@ const (
nodesKey = "heimdall:nodes"
nodeJoinKey = "heimdall:join"
peersKey = "heimdall:peers"
routesKey = "heimdall:routes"
ipsKey = "heimdall:ips"
wireguardConfigPath = "/etc/wireguard/darknet.conf"
@ -58,8 +59,14 @@ var (
nodeHeartbeatInterval = time.Second * 60
nodeHeartbeatExpiry = 86400
peerConfigUpdateInterval = time.Second * 10
// ErrRouteExists is returned when a requested route is already reserved
ErrRouteExists = errors.New("route already reserved")
// ErrNodeDoesNotExist is returned when an invalid node is requested
ErrNodeDoesNotExist = errors.New("node does not exist")
)
// Server represents the Heimdall server
type Server struct {
cfg *heimdall.Config
rpool *redis.Pool
@ -67,6 +74,7 @@ type Server struct {
replicaCh chan struct{}
}
// NewServer returns a new Heimdall server
func NewServer(cfg *heimdall.Config) (*Server, error) {
pool := getPool(cfg.RedisURL)
return &Server{
@ -130,6 +138,9 @@ func (s *Server) Run() error {
return err
}
// ensure wireguard is started
_, _ = wgquick(ctx, "up", getTunnelName())
if err := s.updatePeerInfo(ctx); err != nil {
return err
}
@ -138,7 +149,7 @@ func (s *Server) Run() error {
go s.nodeHeartbeat(ctx)
// start peer config updater to configure wireguard as peers join
go s.updatePeerConfig(ctx)
go s.peerUpdater(ctx)
// start listener for pub/sub
errCh := make(chan error, 1)
@ -219,6 +230,10 @@ func (s *Server) getNodeKey(id string) string {
return fmt.Sprintf("%s:%s", nodesKey, id)
}
func (s *Server) getRouteKey(network string) string {
return fmt.Sprintf("%s:%s", routesKey, network)
}
func (s *Server) getPeerKey(id string) string {
return fmt.Sprintf("%s:%s", peersKey, id)
}

View file

@ -107,8 +107,12 @@ func generateWireguardKeys(ctx context.Context) (string, string, error) {
return privateKey, publicKey, nil
}
func getTunnelName() string {
return strings.Replace(filepath.Base(wireguardConfigPath), filepath.Ext(filepath.Base(wireguardConfigPath)), "", 1)
}
func restartWireguardTunnel(ctx context.Context) error {
tunnelName := strings.Replace(filepath.Base(wireguardConfigPath), filepath.Ext(filepath.Base(wireguardConfigPath)), "", 1)
tunnelName := getTunnelName()
logrus.Infof("restarting tunnel %s", tunnelName)
d, err := wgquick(ctx, "down", tunnelName)
if err != nil {