diff --git a/discovery/README.md b/discovery/README.md new file mode 100644 index 0000000..78173c8 --- /dev/null +++ b/discovery/README.md @@ -0,0 +1,41 @@ +--- +page_title: Docker discovery +page_description: discovery +page_keywords: docker, clustering, discovery +--- + +# Discovery + +Docker comes with multiple Discovery backends. + +## Backends + +### Using etcd + +Point your Docker Engine instances to a common etcd instance. You can specify +the address Docker uses to advertise the node using the `--discovery-address` +flag. + +```bash +$ docker daemon -H= --discovery-address= --discovery-backend etcd:/// +``` + +### Using consul + +Point your Docker Engine instances to a common Consul instance. You can specify +the address Docker uses to advertise the node using the `--discovery-address` +flag. + +```bash +$ docker daemon -H= --discovery-address= --discovery-backend consul:/// +``` + +### Using zookeeper + +Point your Docker Engine instances to a common Zookeeper instance. You can specify +the address Docker uses to advertise the node using the `--discovery-address` +flag. + +```bash +$ docker daemon -H= --discovery-address= --discovery-backend zk://,>/ +``` diff --git a/discovery/backends.go b/discovery/backends.go new file mode 100644 index 0000000..3da2e51 --- /dev/null +++ b/discovery/backends.go @@ -0,0 +1,53 @@ +package discovery + +import ( + "fmt" + "strings" + "time" + + log "github.com/Sirupsen/logrus" +) + +var ( + // Backends is a global map of discovery backends indexed by their + // associated scheme. + backends map[string]Backend +) + +func init() { + backends = make(map[string]Backend) +} + +// Register makes a discovery backend available by the provided scheme. +// If Register is called twice with the same scheme an error is returned. +func Register(scheme string, d Backend) error { + if _, exists := backends[scheme]; exists { + return fmt.Errorf("scheme already registered %s", scheme) + } + log.WithField("name", scheme).Debug("Registering discovery service") + backends[scheme] = d + return nil +} + +func parse(rawurl string) (string, string) { + parts := strings.SplitN(rawurl, "://", 2) + + // nodes:port,node2:port => nodes://node1:port,node2:port + if len(parts) == 1 { + return "nodes", parts[0] + } + return parts[0], parts[1] +} + +// New returns a new Discovery given a URL, heartbeat and ttl settings. +// Returns an error if the URL scheme is not supported. +func New(rawurl string, heartbeat time.Duration, ttl time.Duration) (Backend, error) { + scheme, uri := parse(rawurl) + if backend, exists := backends[scheme]; exists { + log.WithFields(log.Fields{"name": scheme, "uri": uri}).Debug("Initializing discovery service") + err := backend.Initialize(uri, heartbeat, ttl) + return backend, err + } + + return nil, ErrNotSupported +} diff --git a/discovery/discovery.go b/discovery/discovery.go new file mode 100644 index 0000000..11de2ca --- /dev/null +++ b/discovery/discovery.go @@ -0,0 +1,35 @@ +package discovery + +import ( + "errors" + "time" +) + +var ( + // ErrNotSupported is returned when a discovery service is not supported. + ErrNotSupported = errors.New("discovery service not supported") + + // ErrNotImplemented is returned when discovery feature is not implemented + // by discovery backend. + ErrNotImplemented = errors.New("not implemented in this discovery service") +) + +// Watcher provides watching over a cluster for nodes joining and leaving. +type Watcher interface { + // Watch the discovery for entry changes. + // Returns a channel that will receive changes or an error. + // Providing a non-nil stopCh can be used to stop watching. + Watch(stopCh <-chan struct{}) (<-chan Entries, <-chan error) +} + +// Backend is implemented by discovery backends which manage cluster entries. +type Backend interface { + // Watcher must be provided by every backend. + Watcher + + // Initialize the discovery with URIs, a heartbeat and a ttl. + Initialize(string, time.Duration, time.Duration) error + + // Register to the discovery. + Register(string) error +} diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go new file mode 100644 index 0000000..b7128ff --- /dev/null +++ b/discovery/discovery_test.go @@ -0,0 +1,120 @@ +package discovery + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewEntry(t *testing.T) { + entry, err := NewEntry("127.0.0.1:2375") + assert.NoError(t, err) + assert.True(t, entry.Equals(&Entry{Host: "127.0.0.1", Port: "2375"})) + assert.Equal(t, entry.String(), "127.0.0.1:2375") + + _, err = NewEntry("127.0.0.1") + assert.Error(t, err) +} + +func TestParse(t *testing.T) { + scheme, uri := parse("127.0.0.1:2375") + assert.Equal(t, scheme, "nodes") + assert.Equal(t, uri, "127.0.0.1:2375") + + scheme, uri = parse("localhost:2375") + assert.Equal(t, scheme, "nodes") + assert.Equal(t, uri, "localhost:2375") + + scheme, uri = parse("scheme://127.0.0.1:2375") + assert.Equal(t, scheme, "scheme") + assert.Equal(t, uri, "127.0.0.1:2375") + + scheme, uri = parse("scheme://localhost:2375") + assert.Equal(t, scheme, "scheme") + assert.Equal(t, uri, "localhost:2375") + + scheme, uri = parse("") + assert.Equal(t, scheme, "nodes") + assert.Equal(t, uri, "") +} + +func TestCreateEntries(t *testing.T) { + entries, err := CreateEntries(nil) + assert.Equal(t, entries, Entries{}) + assert.NoError(t, err) + + entries, err = CreateEntries([]string{"127.0.0.1:2375", "127.0.0.2:2375", ""}) + assert.NoError(t, err) + expected := Entries{ + &Entry{Host: "127.0.0.1", Port: "2375"}, + &Entry{Host: "127.0.0.2", Port: "2375"}, + } + assert.True(t, entries.Equals(expected)) + + _, err = CreateEntries([]string{"127.0.0.1", "127.0.0.2"}) + assert.Error(t, err) +} + +func TestContainsEntry(t *testing.T) { + entries, err := CreateEntries([]string{"127.0.0.1:2375", "127.0.0.2:2375", ""}) + assert.NoError(t, err) + assert.True(t, entries.Contains(&Entry{Host: "127.0.0.1", Port: "2375"})) + assert.False(t, entries.Contains(&Entry{Host: "127.0.0.3", Port: "2375"})) +} + +func TestEntriesEquality(t *testing.T) { + entries := Entries{ + &Entry{Host: "127.0.0.1", Port: "2375"}, + &Entry{Host: "127.0.0.2", Port: "2375"}, + } + + // Same + assert.True(t, entries.Equals(Entries{ + &Entry{Host: "127.0.0.1", Port: "2375"}, + &Entry{Host: "127.0.0.2", Port: "2375"}, + })) + + // Different size + assert.False(t, entries.Equals(Entries{ + &Entry{Host: "127.0.0.1", Port: "2375"}, + &Entry{Host: "127.0.0.2", Port: "2375"}, + &Entry{Host: "127.0.0.3", Port: "2375"}, + })) + + // Different content + assert.False(t, entries.Equals(Entries{ + &Entry{Host: "127.0.0.1", Port: "2375"}, + &Entry{Host: "127.0.0.42", Port: "2375"}, + })) +} + +func TestEntriesDiff(t *testing.T) { + entry1 := &Entry{Host: "1.1.1.1", Port: "1111"} + entry2 := &Entry{Host: "2.2.2.2", Port: "2222"} + entry3 := &Entry{Host: "3.3.3.3", Port: "3333"} + entries := Entries{entry1, entry2} + + // No diff + added, removed := entries.Diff(Entries{entry2, entry1}) + assert.Empty(t, added) + assert.Empty(t, removed) + + // Add + added, removed = entries.Diff(Entries{entry2, entry3, entry1}) + assert.Len(t, added, 1) + assert.True(t, added.Contains(entry3)) + assert.Empty(t, removed) + + // Remove + added, removed = entries.Diff(Entries{entry2}) + assert.Empty(t, added) + assert.Len(t, removed, 1) + assert.True(t, removed.Contains(entry1)) + + // Add and remove + added, removed = entries.Diff(Entries{entry1, entry3}) + assert.Len(t, added, 1) + assert.True(t, added.Contains(entry3)) + assert.Len(t, removed, 1) + assert.True(t, removed.Contains(entry2)) +} diff --git a/discovery/entry.go b/discovery/entry.go new file mode 100644 index 0000000..e9cee26 --- /dev/null +++ b/discovery/entry.go @@ -0,0 +1,97 @@ +package discovery + +import ( + "fmt" + "net" +) + +// NewEntry creates a new entry. +func NewEntry(url string) (*Entry, error) { + host, port, err := net.SplitHostPort(url) + if err != nil { + return nil, err + } + return &Entry{host, port}, nil +} + +// An Entry represents a host. +type Entry struct { + Host string + Port string +} + +// Equals returns true if cmp contains the same data. +func (e *Entry) Equals(cmp *Entry) bool { + return e.Host == cmp.Host && e.Port == cmp.Port +} + +// String returns the string form of an entry. +func (e *Entry) String() string { + return fmt.Sprintf("%s:%s", e.Host, e.Port) +} + +// Entries is a list of *Entry with some helpers. +type Entries []*Entry + +// Equals returns true if cmp contains the same data. +func (e Entries) Equals(cmp Entries) bool { + // Check if the file has really changed. + if len(e) != len(cmp) { + return false + } + for i := range e { + if !e[i].Equals(cmp[i]) { + return false + } + } + return true +} + +// Contains returns true if the Entries contain a given Entry. +func (e Entries) Contains(entry *Entry) bool { + for _, curr := range e { + if curr.Equals(entry) { + return true + } + } + return false +} + +// Diff compares two entries and returns the added and removed entries. +func (e Entries) Diff(cmp Entries) (Entries, Entries) { + added := Entries{} + for _, entry := range cmp { + if !e.Contains(entry) { + added = append(added, entry) + } + } + + removed := Entries{} + for _, entry := range e { + if !cmp.Contains(entry) { + removed = append(removed, entry) + } + } + + return added, removed +} + +// CreateEntries returns an array of entries based on the given addresses. +func CreateEntries(addrs []string) (Entries, error) { + entries := Entries{} + if addrs == nil { + return entries, nil + } + + for _, addr := range addrs { + if len(addr) == 0 { + continue + } + entry, err := NewEntry(addr) + if err != nil { + return nil, err + } + entries = append(entries, entry) + } + return entries, nil +} diff --git a/discovery/file/file.go b/discovery/file/file.go new file mode 100644 index 0000000..1af9305 --- /dev/null +++ b/discovery/file/file.go @@ -0,0 +1,109 @@ +package file + +import ( + "fmt" + "io/ioutil" + "strings" + "time" + + "github.com/docker/docker/pkg/discovery" +) + +// Discovery is exported +type Discovery struct { + heartbeat time.Duration + path string +} + +func init() { + Init() +} + +// Init is exported +func Init() { + discovery.Register("file", &Discovery{}) +} + +// Initialize is exported +func (s *Discovery) Initialize(path string, heartbeat time.Duration, ttl time.Duration) error { + s.path = path + s.heartbeat = heartbeat + return nil +} + +func parseFileContent(content []byte) []string { + var result []string + for _, line := range strings.Split(strings.TrimSpace(string(content)), "\n") { + line = strings.TrimSpace(line) + // Ignoring line starts with # + if strings.HasPrefix(line, "#") { + continue + } + // Inlined # comment also ignored. + if strings.Contains(line, "#") { + line = line[0:strings.Index(line, "#")] + // Trim additional spaces caused by above stripping. + line = strings.TrimSpace(line) + } + for _, ip := range discovery.Generate(line) { + result = append(result, ip) + } + } + return result +} + +func (s *Discovery) fetch() (discovery.Entries, error) { + fileContent, err := ioutil.ReadFile(s.path) + if err != nil { + return nil, fmt.Errorf("failed to read '%s': %v", s.path, err) + } + return discovery.CreateEntries(parseFileContent(fileContent)) +} + +// Watch is exported +func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { + ch := make(chan discovery.Entries) + errCh := make(chan error) + ticker := time.NewTicker(s.heartbeat) + + go func() { + defer close(errCh) + defer close(ch) + + // Send the initial entries if available. + currentEntries, err := s.fetch() + if err != nil { + errCh <- err + } else { + ch <- currentEntries + } + + // Periodically send updates. + for { + select { + case <-ticker.C: + newEntries, err := s.fetch() + if err != nil { + errCh <- err + continue + } + + // Check if the file has really changed. + if !newEntries.Equals(currentEntries) { + ch <- newEntries + } + currentEntries = newEntries + case <-stopCh: + ticker.Stop() + return + } + } + }() + + return ch, errCh +} + +// Register is exported +func (s *Discovery) Register(addr string) error { + return discovery.ErrNotImplemented +} diff --git a/discovery/file/file_test.go b/discovery/file/file_test.go new file mode 100644 index 0000000..e9e2848 --- /dev/null +++ b/discovery/file/file_test.go @@ -0,0 +1,106 @@ +package file + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/docker/docker/pkg/discovery" + "github.com/stretchr/testify/assert" +) + +func TestInitialize(t *testing.T) { + d := &Discovery{} + d.Initialize("/path/to/file", 1000, 0) + assert.Equal(t, d.path, "/path/to/file") +} + +func TestNew(t *testing.T) { + d, err := discovery.New("file:///path/to/file", 0, 0) + assert.NoError(t, err) + assert.Equal(t, d.(*Discovery).path, "/path/to/file") +} + +func TestContent(t *testing.T) { + data := ` +1.1.1.[1:2]:1111 +2.2.2.[2:4]:2222 +` + ips := parseFileContent([]byte(data)) + assert.Len(t, ips, 5) + assert.Equal(t, ips[0], "1.1.1.1:1111") + assert.Equal(t, ips[1], "1.1.1.2:1111") + assert.Equal(t, ips[2], "2.2.2.2:2222") + assert.Equal(t, ips[3], "2.2.2.3:2222") + assert.Equal(t, ips[4], "2.2.2.4:2222") +} + +func TestRegister(t *testing.T) { + discovery := &Discovery{path: "/path/to/file"} + assert.Error(t, discovery.Register("0.0.0.0")) +} + +func TestParsingContentsWithComments(t *testing.T) { + data := ` +### test ### +1.1.1.1:1111 # inline comment +# 2.2.2.2:2222 + ### empty line with comment + 3.3.3.3:3333 +### test ### +` + ips := parseFileContent([]byte(data)) + assert.Len(t, ips, 2) + assert.Equal(t, "1.1.1.1:1111", ips[0]) + assert.Equal(t, "3.3.3.3:3333", ips[1]) +} + +func TestWatch(t *testing.T) { + data := ` +1.1.1.1:1111 +2.2.2.2:2222 +` + expected := discovery.Entries{ + &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, + &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, + } + + // Create a temporary file and remove it. + tmp, err := ioutil.TempFile(os.TempDir(), "discovery-file-test") + assert.NoError(t, err) + assert.NoError(t, tmp.Close()) + assert.NoError(t, os.Remove(tmp.Name())) + + // Set up file discovery. + d := &Discovery{} + d.Initialize(tmp.Name(), 1000, 0) + stopCh := make(chan struct{}) + ch, errCh := d.Watch(stopCh) + + // Make sure it fires errors since the file doesn't exist. + assert.Error(t, <-errCh) + // We have to drain the error channel otherwise Watch will get stuck. + go func() { + for range errCh { + } + }() + + // Write the file and make sure we get the expected value back. + assert.NoError(t, ioutil.WriteFile(tmp.Name(), []byte(data), 0600)) + assert.Equal(t, expected, <-ch) + + // Add a new entry and look it up. + expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"}) + f, err := os.OpenFile(tmp.Name(), os.O_APPEND|os.O_WRONLY, 0600) + assert.NoError(t, err) + assert.NotNil(t, f) + _, err = f.WriteString("\n3.3.3.3:3333\n") + assert.NoError(t, err) + f.Close() + assert.Equal(t, expected, <-ch) + + // Stop and make sure it closes all channels. + close(stopCh) + assert.Nil(t, <-ch) + assert.Nil(t, <-errCh) +} diff --git a/discovery/generator.go b/discovery/generator.go new file mode 100644 index 0000000..d222982 --- /dev/null +++ b/discovery/generator.go @@ -0,0 +1,35 @@ +package discovery + +import ( + "fmt" + "regexp" + "strconv" +) + +// Generate takes care of IP generation +func Generate(pattern string) []string { + re, _ := regexp.Compile(`\[(.+):(.+)\]`) + submatch := re.FindStringSubmatch(pattern) + if submatch == nil { + return []string{pattern} + } + + from, err := strconv.Atoi(submatch[1]) + if err != nil { + return []string{pattern} + } + to, err := strconv.Atoi(submatch[2]) + if err != nil { + return []string{pattern} + } + + template := re.ReplaceAllString(pattern, "%d") + + var result []string + for val := from; val <= to; val++ { + entry := fmt.Sprintf(template, val) + result = append(result, entry) + } + + return result +} diff --git a/discovery/generator_test.go b/discovery/generator_test.go new file mode 100644 index 0000000..7473344 --- /dev/null +++ b/discovery/generator_test.go @@ -0,0 +1,55 @@ +package discovery + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGeneratorNotGenerate(t *testing.T) { + ips := Generate("127.0.0.1") + assert.Equal(t, len(ips), 1) + assert.Equal(t, ips[0], "127.0.0.1") +} + +func TestGeneratorWithPortNotGenerate(t *testing.T) { + ips := Generate("127.0.0.1:8080") + assert.Equal(t, len(ips), 1) + assert.Equal(t, ips[0], "127.0.0.1:8080") +} + +func TestGeneratorMatchFailedNotGenerate(t *testing.T) { + ips := Generate("127.0.0.[1]") + assert.Equal(t, len(ips), 1) + assert.Equal(t, ips[0], "127.0.0.[1]") +} + +func TestGeneratorWithPort(t *testing.T) { + ips := Generate("127.0.0.[1:11]:2375") + assert.Equal(t, len(ips), 11) + assert.Equal(t, ips[0], "127.0.0.1:2375") + assert.Equal(t, ips[1], "127.0.0.2:2375") + assert.Equal(t, ips[2], "127.0.0.3:2375") + assert.Equal(t, ips[3], "127.0.0.4:2375") + assert.Equal(t, ips[4], "127.0.0.5:2375") + assert.Equal(t, ips[5], "127.0.0.6:2375") + assert.Equal(t, ips[6], "127.0.0.7:2375") + assert.Equal(t, ips[7], "127.0.0.8:2375") + assert.Equal(t, ips[8], "127.0.0.9:2375") + assert.Equal(t, ips[9], "127.0.0.10:2375") + assert.Equal(t, ips[10], "127.0.0.11:2375") +} + +func TestGenerateWithMalformedInputAtRangeStart(t *testing.T) { + malformedInput := "127.0.0.[x:11]:2375" + ips := Generate(malformedInput) + assert.Equal(t, len(ips), 1) + assert.Equal(t, ips[0], malformedInput) +} + +func TestGenerateWithMalformedInputAtRangeEnd(t *testing.T) { + malformedInput := "127.0.0.[1:x]:2375" + ips := Generate(malformedInput) + assert.Equal(t, len(ips), 1) + assert.Equal(t, ips[0], malformedInput) +} diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go new file mode 100644 index 0000000..347e041 --- /dev/null +++ b/discovery/kv/kv.go @@ -0,0 +1,148 @@ +package kv + +import ( + "fmt" + "path" + "strings" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/docker/docker/pkg/discovery" + "github.com/docker/libkv" + "github.com/docker/libkv/store" + "github.com/docker/libkv/store/consul" + "github.com/docker/libkv/store/etcd" + "github.com/docker/libkv/store/zookeeper" +) + +const ( + discoveryPath = "docker/nodes" +) + +// Discovery is exported +type Discovery struct { + backend store.Backend + store store.Store + heartbeat time.Duration + ttl time.Duration + prefix string + path string +} + +func init() { + Init() +} + +// Init is exported +func Init() { + // Register to libkv + zookeeper.Register() + consul.Register() + etcd.Register() + + // Register to internal discovery service + discovery.Register("zk", &Discovery{backend: store.ZK}) + discovery.Register("consul", &Discovery{backend: store.CONSUL}) + discovery.Register("etcd", &Discovery{backend: store.ETCD}) +} + +// Initialize is exported +func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Duration) error { + var ( + parts = strings.SplitN(uris, "/", 2) + addrs = strings.Split(parts[0], ",") + err error + ) + + // A custom prefix to the path can be optionally used. + if len(parts) == 2 { + s.prefix = parts[1] + } + + s.heartbeat = heartbeat + s.ttl = ttl + s.path = path.Join(s.prefix, discoveryPath) + + // Creates a new store, will ignore options given + // if not supported by the chosen store + s.store, err = libkv.NewStore(s.backend, addrs, nil) + return err +} + +// Watch the store until either there's a store error or we receive a stop request. +// Returns false if we shouldn't attempt watching the store anymore (stop request received). +func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KVPair, discoveryCh chan discovery.Entries, errCh chan error) bool { + for { + select { + case pairs := <-watchCh: + if pairs == nil { + return true + } + + log.WithField("discovery", s.backend).Debugf("Watch triggered with %d nodes", len(pairs)) + + // Convert `KVPair` into `discovery.Entry`. + addrs := make([]string, len(pairs)) + for _, pair := range pairs { + addrs = append(addrs, string(pair.Value)) + } + + entries, err := discovery.CreateEntries(addrs) + if err != nil { + errCh <- err + } else { + discoveryCh <- entries + } + case <-stopCh: + // We were requested to stop watching. + return false + } + } +} + +// Watch is exported +func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { + ch := make(chan discovery.Entries) + errCh := make(chan error) + + go func() { + defer close(ch) + defer close(errCh) + + // Forever: Create a store watch, watch until we get an error and then try again. + // Will only stop if we receive a stopCh request. + for { + // Set up a watch. + watchCh, err := s.store.WatchTree(s.path, stopCh) + if err != nil { + errCh <- err + } else { + if !s.watchOnce(stopCh, watchCh, ch, errCh) { + return + } + } + + // If we get here it means the store watch channel was closed. This + // is unexpected so let's retry later. + errCh <- fmt.Errorf("Unexpected watch error") + time.Sleep(s.heartbeat) + } + }() + return ch, errCh +} + +// Register is exported +func (s *Discovery) Register(addr string) error { + opts := &store.WriteOptions{TTL: s.ttl} + return s.store.Put(path.Join(s.path, addr), []byte(addr), opts) +} + +// Store returns the underlying store used by KV discovery. +func (s *Discovery) Store() store.Store { + return s.store +} + +// Prefix returns the store prefix +func (s *Discovery) Prefix() string { + return s.prefix +} diff --git a/discovery/kv/kv_test.go b/discovery/kv/kv_test.go new file mode 100644 index 0000000..dea34d6 --- /dev/null +++ b/discovery/kv/kv_test.go @@ -0,0 +1,119 @@ +package kv + +import ( + "errors" + "path" + "testing" + "time" + + "github.com/docker/docker/pkg/discovery" + "github.com/docker/libkv/store" + libkvmock "github.com/docker/libkv/store/mock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestInitialize(t *testing.T) { + storeMock, err := libkvmock.New([]string{"127.0.0.1"}, nil) + assert.NotNil(t, storeMock) + assert.NoError(t, err) + + d := &Discovery{backend: store.CONSUL} + d.Initialize("127.0.0.1", 0, 0) + d.store = storeMock + + s := d.store.(*libkvmock.Mock) + assert.Len(t, s.Endpoints, 1) + assert.Equal(t, s.Endpoints[0], "127.0.0.1") + assert.Equal(t, d.path, discoveryPath) + + storeMock, err = libkvmock.New([]string{"127.0.0.1:1234"}, nil) + assert.NotNil(t, storeMock) + assert.NoError(t, err) + + d = &Discovery{backend: store.CONSUL} + d.Initialize("127.0.0.1:1234/path", 0, 0) + d.store = storeMock + + s = d.store.(*libkvmock.Mock) + assert.Len(t, s.Endpoints, 1) + assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234") + assert.Equal(t, d.path, "path/"+discoveryPath) + + storeMock, err = libkvmock.New([]string{"127.0.0.1:1234", "127.0.0.2:1234", "127.0.0.3:1234"}, nil) + assert.NotNil(t, storeMock) + assert.NoError(t, err) + + d = &Discovery{backend: store.CONSUL} + d.Initialize("127.0.0.1:1234,127.0.0.2:1234,127.0.0.3:1234/path", 0, 0) + d.store = storeMock + + s = d.store.(*libkvmock.Mock) + if assert.Len(t, s.Endpoints, 3) { + assert.Equal(t, s.Endpoints[0], "127.0.0.1:1234") + assert.Equal(t, s.Endpoints[1], "127.0.0.2:1234") + assert.Equal(t, s.Endpoints[2], "127.0.0.3:1234") + } + assert.Equal(t, d.path, "path/"+discoveryPath) +} + +func TestWatch(t *testing.T) { + storeMock, err := libkvmock.New([]string{"127.0.0.1:1234"}, nil) + assert.NotNil(t, storeMock) + assert.NoError(t, err) + + d := &Discovery{backend: store.CONSUL} + d.Initialize("127.0.0.1:1234/path", 0, 0) + d.store = storeMock + + s := d.store.(*libkvmock.Mock) + mockCh := make(chan []*store.KVPair) + + // The first watch will fail. + s.On("WatchTree", "path/"+discoveryPath, mock.Anything).Return(mockCh, errors.New("test error")).Once() + // The second one will succeed. + s.On("WatchTree", "path/"+discoveryPath, mock.Anything).Return(mockCh, nil).Once() + expected := discovery.Entries{ + &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, + &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, + } + kvs := []*store.KVPair{ + {Key: path.Join("path", discoveryPath, "1.1.1.1"), Value: []byte("1.1.1.1:1111")}, + {Key: path.Join("path", discoveryPath, "2.2.2.2"), Value: []byte("2.2.2.2:2222")}, + } + + stopCh := make(chan struct{}) + ch, errCh := d.Watch(stopCh) + + // It should fire an error since the first WatchTree call failed. + assert.EqualError(t, <-errCh, "test error") + // We have to drain the error channel otherwise Watch will get stuck. + go func() { + for range errCh { + } + }() + + // Push the entries into the store channel and make sure discovery emits. + mockCh <- kvs + assert.Equal(t, <-ch, expected) + + // Add a new entry. + expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"}) + kvs = append(kvs, &store.KVPair{Key: path.Join("path", discoveryPath, "3.3.3.3"), Value: []byte("3.3.3.3:3333")}) + mockCh <- kvs + assert.Equal(t, <-ch, expected) + + // Make sure that if an error occurs it retries. + // This third call to WatchTree will be checked later by AssertExpectations. + s.On("WatchTree", "path/"+discoveryPath, mock.Anything).Return(mockCh, nil) + close(mockCh) + // Give it enough time to call WatchTree. + time.Sleep(3) + + // Stop and make sure it closes all channels. + close(stopCh) + assert.Nil(t, <-ch) + assert.Nil(t, <-errCh) + + s.AssertExpectations(t) +} diff --git a/discovery/nodes/nodes.go b/discovery/nodes/nodes.go new file mode 100644 index 0000000..9da0051 --- /dev/null +++ b/discovery/nodes/nodes.go @@ -0,0 +1,54 @@ +package nodes + +import ( + "fmt" + "strings" + "time" + + "github.com/docker/docker/pkg/discovery" +) + +// Discovery is exported +type Discovery struct { + entries discovery.Entries +} + +func init() { + Init() +} + +// Init is exported +func Init() { + discovery.Register("nodes", &Discovery{}) +} + +// Initialize is exported +func (s *Discovery) Initialize(uris string, _ time.Duration, _ time.Duration) error { + for _, input := range strings.Split(uris, ",") { + for _, ip := range discovery.Generate(input) { + entry, err := discovery.NewEntry(ip) + if err != nil { + return fmt.Errorf("%s, please check you are using the correct discovery (missing token:// ?)", err.Error()) + } + s.entries = append(s.entries, entry) + } + } + + return nil +} + +// Watch is exported +func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { + ch := make(chan discovery.Entries) + go func() { + defer close(ch) + ch <- s.entries + <-stopCh + }() + return ch, nil +} + +// Register is exported +func (s *Discovery) Register(addr string) error { + return discovery.ErrNotImplemented +} diff --git a/discovery/nodes/nodes_test.go b/discovery/nodes/nodes_test.go new file mode 100644 index 0000000..4563551 --- /dev/null +++ b/discovery/nodes/nodes_test.go @@ -0,0 +1,43 @@ +package nodes + +import ( + "testing" + + "github.com/docker/docker/pkg/discovery" + "github.com/stretchr/testify/assert" +) + +func TestInitialize(t *testing.T) { + d := &Discovery{} + d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, 0) + assert.Equal(t, len(d.entries), 2) + assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111") + assert.Equal(t, d.entries[1].String(), "2.2.2.2:2222") +} + +func TestInitializeWithPattern(t *testing.T) { + d := &Discovery{} + d.Initialize("1.1.1.[1:2]:1111,2.2.2.[2:4]:2222", 0, 0) + assert.Equal(t, len(d.entries), 5) + assert.Equal(t, d.entries[0].String(), "1.1.1.1:1111") + assert.Equal(t, d.entries[1].String(), "1.1.1.2:1111") + assert.Equal(t, d.entries[2].String(), "2.2.2.2:2222") + assert.Equal(t, d.entries[3].String(), "2.2.2.3:2222") + assert.Equal(t, d.entries[4].String(), "2.2.2.4:2222") +} + +func TestWatch(t *testing.T) { + d := &Discovery{} + d.Initialize("1.1.1.1:1111,2.2.2.2:2222", 0, 0) + expected := discovery.Entries{ + &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, + &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, + } + ch, _ := d.Watch(nil) + assert.True(t, expected.Equals(<-ch)) +} + +func TestRegister(t *testing.T) { + d := &Discovery{} + assert.Error(t, d.Register("0.0.0.0")) +}