diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index 8b7b4ed..f371c0c 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -17,7 +17,7 @@ import ( ) const ( - discoveryPath = "docker/nodes" + defaultDiscoveryPath = "docker/nodes" ) // Discovery is exported @@ -62,7 +62,14 @@ func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Du s.heartbeat = heartbeat s.ttl = ttl - s.path = path.Join(s.prefix, discoveryPath) + + // Use a custom path if specified in discovery options + dpath := defaultDiscoveryPath + if clusterOpts["kv.path"] != "" { + dpath = clusterOpts["kv.path"] + } + + s.path = path.Join(s.prefix, dpath) var config *store.Config if clusterOpts["kv.cacertfile"] != "" && clusterOpts["kv.certfile"] != "" && clusterOpts["kv.keyfile"] != "" { @@ -138,6 +145,17 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c // Forever: Create a store watch, watch until we get an error and then try again. // Will only stop if we receive a stopCh request. for { + // Create the path to watch if it does not exist yet + exists, err := s.store.Exists(s.path) + if err != nil { + errCh <- err + } + if !exists { + if err := s.store.Put(s.path, []byte(""), &store.WriteOptions{IsDir: true}); err != nil { + errCh <- err + } + } + // Set up a watch. watchCh, err := s.store.WatchTree(s.path, stopCh) if err != nil { diff --git a/discovery/kv/kv_test.go b/discovery/kv/kv_test.go index a513309..4fe5239 100644 --- a/discovery/kv/kv_test.go +++ b/discovery/kv/kv_test.go @@ -33,7 +33,7 @@ func (ds *DiscoverySuite) TestInitialize(c *check.C) { s := d.store.(*FakeStore) c.Assert(s.Endpoints, check.HasLen, 1) c.Assert(s.Endpoints[0], check.Equals, "127.0.0.1") - c.Assert(d.path, check.Equals, discoveryPath) + c.Assert(d.path, check.Equals, defaultDiscoveryPath) storeMock = &FakeStore{ Endpoints: []string{"127.0.0.1:1234"}, @@ -45,7 +45,7 @@ func (ds *DiscoverySuite) TestInitialize(c *check.C) { s = d.store.(*FakeStore) c.Assert(s.Endpoints, check.HasLen, 1) c.Assert(s.Endpoints[0], check.Equals, "127.0.0.1:1234") - c.Assert(d.path, check.Equals, "path/"+discoveryPath) + c.Assert(d.path, check.Equals, "path/"+defaultDiscoveryPath) storeMock = &FakeStore{ Endpoints: []string{"127.0.0.1:1234", "127.0.0.2:1234", "127.0.0.3:1234"}, @@ -60,7 +60,7 @@ func (ds *DiscoverySuite) TestInitialize(c *check.C) { c.Assert(s.Endpoints[1], check.Equals, "127.0.0.2:1234") c.Assert(s.Endpoints[2], check.Equals, "127.0.0.3:1234") - c.Assert(d.path, check.Equals, "path/"+discoveryPath) + c.Assert(d.path, check.Equals, "path/"+defaultDiscoveryPath) } // Extremely limited mock store so we can test initialization @@ -224,8 +224,8 @@ func (ds *DiscoverySuite) TestWatch(c *check.C) { &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, } kvs := []*store.KVPair{ - {Key: path.Join("path", discoveryPath, "1.1.1.1"), Value: []byte("1.1.1.1:1111")}, - {Key: path.Join("path", discoveryPath, "2.2.2.2"), Value: []byte("2.2.2.2:2222")}, + {Key: path.Join("path", defaultDiscoveryPath, "1.1.1.1"), Value: []byte("1.1.1.1:1111")}, + {Key: path.Join("path", defaultDiscoveryPath, "2.2.2.2"), Value: []byte("2.2.2.2:2222")}, } stopCh := make(chan struct{}) @@ -245,7 +245,7 @@ func (ds *DiscoverySuite) TestWatch(c *check.C) { // Add a new entry. expected = append(expected, &discovery.Entry{Host: "3.3.3.3", Port: "3333"}) - kvs = append(kvs, &store.KVPair{Key: path.Join("path", discoveryPath, "3.3.3.3"), Value: []byte("3.3.3.3:3333")}) + kvs = append(kvs, &store.KVPair{Key: path.Join("path", defaultDiscoveryPath, "3.3.3.3"), Value: []byte("3.3.3.3:3333")}) mockCh <- kvs c.Assert(<-ch, check.DeepEquals, expected)