Merge pull request #19167 from vieux/bring_discovery_on_par
Bring discovery on par with the one in docker/swarm
This commit is contained in:
commit
50aeb5ad91
2 changed files with 26 additions and 8 deletions
|
@ -17,7 +17,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
discoveryPath = "docker/nodes"
|
defaultDiscoveryPath = "docker/nodes"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Discovery is exported
|
// Discovery is exported
|
||||||
|
@ -62,7 +62,14 @@ func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Du
|
||||||
|
|
||||||
s.heartbeat = heartbeat
|
s.heartbeat = heartbeat
|
||||||
s.ttl = ttl
|
s.ttl = ttl
|
||||||
s.path = path.Join(s.prefix, discoveryPath)
|
|
||||||
|
// Use a custom path if specified in discovery options
|
||||||
|
dpath := defaultDiscoveryPath
|
||||||
|
if clusterOpts["kv.path"] != "" {
|
||||||
|
dpath = clusterOpts["kv.path"]
|
||||||
|
}
|
||||||
|
|
||||||
|
s.path = path.Join(s.prefix, dpath)
|
||||||
|
|
||||||
var config *store.Config
|
var config *store.Config
|
||||||
if clusterOpts["kv.cacertfile"] != "" && clusterOpts["kv.certfile"] != "" && clusterOpts["kv.keyfile"] != "" {
|
if clusterOpts["kv.cacertfile"] != "" && clusterOpts["kv.certfile"] != "" && clusterOpts["kv.keyfile"] != "" {
|
||||||
|
@ -138,6 +145,17 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c
|
||||||
// Forever: Create a store watch, watch until we get an error and then try again.
|
// Forever: Create a store watch, watch until we get an error and then try again.
|
||||||
// Will only stop if we receive a stopCh request.
|
// Will only stop if we receive a stopCh request.
|
||||||
for {
|
for {
|
||||||
|
// Create the path to watch if it does not exist yet
|
||||||
|
exists, err := s.store.Exists(s.path)
|
||||||
|
if err != nil {
|
||||||
|
errCh <- err
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
if err := s.store.Put(s.path, []byte(""), &store.WriteOptions{IsDir: true}); err != nil {
|
||||||
|
errCh <- err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Set up a watch.
|
// Set up a watch.
|
||||||
watchCh, err := s.store.WatchTree(s.path, stopCh)
|
watchCh, err := s.store.WatchTree(s.path, stopCh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -33,7 +33,7 @@ func (ds *DiscoverySuite) TestInitialize(c *check.C) {
|
||||||
s := d.store.(*FakeStore)
|
s := d.store.(*FakeStore)
|
||||||
c.Assert(s.Endpoints, check.HasLen, 1)
|
c.Assert(s.Endpoints, check.HasLen, 1)
|
||||||
c.Assert(s.Endpoints[0], check.Equals, "127.0.0.1")
|
c.Assert(s.Endpoints[0], check.Equals, "127.0.0.1")
|
||||||
c.Assert(d.path, check.Equals, discoveryPath)
|
c.Assert(d.path, check.Equals, defaultDiscoveryPath)
|
||||||
|
|
||||||
storeMock = &FakeStore{
|
storeMock = &FakeStore{
|
||||||
Endpoints: []string{"127.0.0.1:1234"},
|
Endpoints: []string{"127.0.0.1:1234"},
|
||||||
|
@ -45,7 +45,7 @@ func (ds *DiscoverySuite) TestInitialize(c *check.C) {
|
||||||
s = d.store.(*FakeStore)
|
s = d.store.(*FakeStore)
|
||||||
c.Assert(s.Endpoints, check.HasLen, 1)
|
c.Assert(s.Endpoints, check.HasLen, 1)
|
||||||
c.Assert(s.Endpoints[0], check.Equals, "127.0.0.1:1234")
|
c.Assert(s.Endpoints[0], check.Equals, "127.0.0.1:1234")
|
||||||
c.Assert(d.path, check.Equals, "path/"+discoveryPath)
|
c.Assert(d.path, check.Equals, "path/"+defaultDiscoveryPath)
|
||||||
|
|
||||||
storeMock = &FakeStore{
|
storeMock = &FakeStore{
|
||||||
Endpoints: []string{"127.0.0.1:1234", "127.0.0.2:1234", "127.0.0.3:1234"},
|
Endpoints: []string{"127.0.0.1:1234", "127.0.0.2:1234", "127.0.0.3:1234"},
|
||||||
|
@ -60,7 +60,7 @@ func (ds *DiscoverySuite) TestInitialize(c *check.C) {
|
||||||
c.Assert(s.Endpoints[1], check.Equals, "127.0.0.2:1234")
|
c.Assert(s.Endpoints[1], check.Equals, "127.0.0.2:1234")
|
||||||
c.Assert(s.Endpoints[2], check.Equals, "127.0.0.3:1234")
|
c.Assert(s.Endpoints[2], check.Equals, "127.0.0.3:1234")
|
||||||
|
|
||||||
c.Assert(d.path, check.Equals, "path/"+discoveryPath)
|
c.Assert(d.path, check.Equals, "path/"+defaultDiscoveryPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extremely limited mock store so we can test initialization
|
// Extremely limited mock store so we can test initialization
|
||||||
|
@ -224,8 +224,8 @@ func (ds *DiscoverySuite) TestWatch(c *check.C) {
|
||||||
&discovery.Entry{Host: "2.2.2.2", Port: "2222"},
|
&discovery.Entry{Host: "2.2.2.2", Port: "2222"},
|
||||||
}
|
}
|
||||||
kvs := []*store.KVPair{
|
kvs := []*store.KVPair{
|
||||||
{Key: path.Join("path", discoveryPath, "1.1.1.1"), Value: []byte("1.1.1.1:1111")},
|
{Key: path.Join("path", defaultDiscoveryPath, "1.1.1.1"), Value: []byte("1.1.1.1:1111")},
|
||||||
{Key: path.Join("path", discoveryPath, "2.2.2.2"), Value: []byte("2.2.2.2:2222")},
|
{Key: path.Join("path", defaultDiscoveryPath, "2.2.2.2"), Value: []byte("2.2.2.2:2222")},
|
||||||
}
|
}
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
|
@ -245,7 +245,7 @@ func (ds *DiscoverySuite) TestWatch(c *check.C) {
|
||||||
|
|
||||||
// Add a new entry.
|
// Add a new entry.
|
||||||
expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"})
|
expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"})
|
||||||
kvs = append(kvs, &store.KVPair{Key: path.Join("path", discoveryPath, "3.3.3.3"), Value: []byte("3.3.3.3:3333")})
|
kvs = append(kvs, &store.KVPair{Key: path.Join("path", defaultDiscoveryPath, "3.3.3.3"), Value: []byte("3.3.3.3:3333")})
|
||||||
mockCh <- kvs
|
mockCh <- kvs
|
||||||
c.Assert(<-ch, check.DeepEquals, expected)
|
c.Assert(<-ch, check.DeepEquals, expected)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue