Add pkg/discovery for nodes discovery

Absorb Swarm's discovery package in order to provide a common node
discovery mechanism to be used by both Swarm and networking code.

Signed-off-by: Arnaud Porterie <arnaud.porterie@docker.com>
This commit is contained in:
Arnaud Porterie 2015-08-31 13:23:17 -07:00
parent fdd5ab2fc3
commit 35c086fa6b
13 changed files with 1015 additions and 0 deletions

148
discovery/kv/kv.go Normal file
View file

@ -0,0 +1,148 @@
package kv
import (
"fmt"
"path"
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/discovery"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
"github.com/docker/libkv/store/zookeeper"
)
const (
discoveryPath = "docker/nodes"
)
// Discovery is exported
type Discovery struct {
backend store.Backend
store store.Store
heartbeat time.Duration
ttl time.Duration
prefix string
path string
}
func init() {
Init()
}
// Init is exported
func Init() {
// Register to libkv
zookeeper.Register()
consul.Register()
etcd.Register()
// Register to internal discovery service
discovery.Register("zk", &Discovery{backend: store.ZK})
discovery.Register("consul", &Discovery{backend: store.CONSUL})
discovery.Register("etcd", &Discovery{backend: store.ETCD})
}
// Initialize is exported
func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Duration) error {
var (
parts = strings.SplitN(uris, "/", 2)
addrs = strings.Split(parts[0], ",")
err error
)
// A custom prefix to the path can be optionally used.
if len(parts) == 2 {
s.prefix = parts[1]
}
s.heartbeat = heartbeat
s.ttl = ttl
s.path = path.Join(s.prefix, discoveryPath)
// Creates a new store, will ignore options given
// if not supported by the chosen store
s.store, err = libkv.NewStore(s.backend, addrs, nil)
return err
}
// Watch the store until either there's a store error or we receive a stop request.
// Returns false if we shouldn't attempt watching the store anymore (stop request received).
func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KVPair, discoveryCh chan discovery.Entries, errCh chan error) bool {
for {
select {
case pairs := <-watchCh:
if pairs == nil {
return true
}
log.WithField("discovery", s.backend).Debugf("Watch triggered with %d nodes", len(pairs))
// Convert `KVPair` into `discovery.Entry`.
addrs := make([]string, len(pairs))
for _, pair := range pairs {
addrs = append(addrs, string(pair.Value))
}
entries, err := discovery.CreateEntries(addrs)
if err != nil {
errCh <- err
} else {
discoveryCh <- entries
}
case <-stopCh:
// We were requested to stop watching.
return false
}
}
}
// Watch is exported
func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
ch := make(chan discovery.Entries)
errCh := make(chan error)
go func() {
defer close(ch)
defer close(errCh)
// Forever: Create a store watch, watch until we get an error and then try again.
// Will only stop if we receive a stopCh request.
for {
// Set up a watch.
watchCh, err := s.store.WatchTree(s.path, stopCh)
if err != nil {
errCh <- err
} else {
if !s.watchOnce(stopCh, watchCh, ch, errCh) {
return
}
}
// If we get here it means the store watch channel was closed. This
// is unexpected so let's retry later.
errCh <- fmt.Errorf("Unexpected watch error")
time.Sleep(s.heartbeat)
}
}()
return ch, errCh
}
// Register is exported
func (s *Discovery) Register(addr string) error {
opts := &store.WriteOptions{TTL: s.ttl}
return s.store.Put(path.Join(s.path, addr), []byte(addr), opts)
}
// Store returns the underlying store used by KV discovery.
func (s *Discovery) Store() store.Store {
return s.store
}
// Prefix returns the store prefix
func (s *Discovery) Prefix() string {
return s.prefix
}

119
discovery/kv/kv_test.go Normal file
View file

@ -0,0 +1,119 @@
package kv
import (
"errors"
"path"
"testing"
"time"
"github.com/docker/docker/pkg/discovery"
"github.com/docker/libkv/store"
libkvmock "github.com/docker/libkv/store/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestInitialize(t *testing.T) {
storeMock, err := libkvmock.New([]string{"127.0.0.1"}, nil)
assert.NotNil(t, storeMock)
assert.NoError(t, err)
d := &Discovery{backend: store.CONSUL}
d.Initialize("127.0.0.1", 0, 0)
d.store = storeMock
s := d.store.(*libkvmock.Mock)
assert.Len(t, s.Endpoints, 1)
assert.Equal(t, s.Endpoints[0], "127.0.0.1")
assert.Equal(t, d.path, discoveryPath)
storeMock, err = libkvmock.New([]string{"127.0.0.1:1234"}, nil)
assert.NotNil(t, storeMock)
assert.NoError(t, err)
d = &Discovery{backend: store.CONSUL}
d.Initialize("127.0.0.1:1234/path", 0, 0)
d.store = storeMock
s = d.store.(*libkvmock.Mock)
assert.Len(t, s.Endpoints, 1)
assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234")
assert.Equal(t, d.path, "path/"+discoveryPath)
storeMock, err = libkvmock.New([]string{"127.0.0.1:1234", "127.0.0.2:1234", "127.0.0.3:1234"}, nil)
assert.NotNil(t, storeMock)
assert.NoError(t, err)
d = &Discovery{backend: store.CONSUL}
d.Initialize("127.0.0.1:1234,127.0.0.2:1234,127.0.0.3:1234/path", 0, 0)
d.store = storeMock
s = d.store.(*libkvmock.Mock)
if assert.Len(t, s.Endpoints, 3) {
assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234")
assert.Equal(t, s.Endpoints[1], "127.0.0.2:1234")
assert.Equal(t, s.Endpoints[2], "127.0.0.3:1234")
}
assert.Equal(t, d.path, "path/"+discoveryPath)
}
func TestWatch(t *testing.T) {
storeMock, err := libkvmock.New([]string{"127.0.0.1:1234"}, nil)
assert.NotNil(t, storeMock)
assert.NoError(t, err)
d := &Discovery{backend: store.CONSUL}
d.Initialize("127.0.0.1:1234/path", 0, 0)
d.store = storeMock
s := d.store.(*libkvmock.Mock)
mockCh := make(chan []*store.KVPair)
// The first watch will fail.
s.On("WatchTree", "path/"+discoveryPath, mock.Anything).Return(mockCh, errors.New("test error")).Once()
// The second one will succeed.
s.On("WatchTree", "path/"+discoveryPath, mock.Anything).Return(mockCh, nil).Once()
expected := discovery.Entries{
&discovery.Entry{Host: "1.1.1.1", Port: "1111"},
&discovery.Entry{Host: "2.2.2.2", Port: "2222"},
}
kvs := []*store.KVPair{
{Key: path.Join("path", discoveryPath, "1.1.1.1"), Value: []byte("1.1.1.1:1111")},
{Key: path.Join("path", discoveryPath, "2.2.2.2"), Value: []byte("2.2.2.2:2222")},
}
stopCh := make(chan struct{})
ch, errCh := d.Watch(stopCh)
// It should fire an error since the first WatchTree call failed.
assert.EqualError(t, <-errCh, "test error")
// We have to drain the error channel otherwise Watch will get stuck.
go func() {
for range errCh {
}
}()
// Push the entries into the store channel and make sure discovery emits.
mockCh <- kvs
assert.Equal(t, <-ch, expected)
// Add a new entry.
expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"})
kvs = append(kvs, &store.KVPair{Key: path.Join("path", discoveryPath, "3.3.3.3"), Value: []byte("3.3.3.3:3333")})
mockCh <- kvs
assert.Equal(t, <-ch, expected)
// Make sure that if an error occurs it retries.
// This third call to WatchTree will be checked later by AssertExpectations.
s.On("WatchTree", "path/"+discoveryPath, mock.Anything).Return(mockCh, nil)
close(mockCh)
// Give it enough time to call WatchTree.
time.Sleep(3)
// Stop and make sure it closes all channels.
close(stopCh)
assert.Nil(t, <-ch)
assert.Nil(t, <-errCh)
s.AssertExpectations(t)
}