From 409a9da270c17711d27ee063f9296750745b29fc Mon Sep 17 00:00:00 2001 From: David Calavera Date: Wed, 6 Jan 2016 17:57:02 -0500 Subject: [PATCH] Add an unsafe memory discovery store for testing. Signed-off-by: David Calavera --- discovery/memory/memory.go | 82 +++++++++++++++++++++++++++++++++ discovery/memory/memory_test.go | 48 +++++++++++++++++++ 2 files changed, 130 insertions(+) create mode 100644 discovery/memory/memory.go create mode 100644 discovery/memory/memory_test.go diff --git a/discovery/memory/memory.go b/discovery/memory/memory.go new file mode 100644 index 0000000..f389825 --- /dev/null +++ b/discovery/memory/memory.go @@ -0,0 +1,82 @@ +package memory + +import ( + "time" + + "github.com/docker/docker/pkg/discovery" +) + +// Discovery implements a descovery backend that keeps +// data in memory. +type Discovery struct { + heartbeat time.Duration + values []string +} + +func init() { + Init() +} + +// Init registers the memory backend on demand. +func Init() { + discovery.Register("memory", &Discovery{}) +} + +// Initialize sets the heartbeat for the memory backend. +func (s *Discovery) Initialize(_ string, heartbeat time.Duration, _ time.Duration, _ map[string]string) error { + s.heartbeat = heartbeat + return nil +} + +// Watch sends periodic discovery updates to a channel. +func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { + ch := make(chan discovery.Entries) + errCh := make(chan error) + ticker := time.NewTicker(s.heartbeat) + + go func() { + defer close(errCh) + defer close(ch) + + // Send the initial entries if available. + var currentEntries discovery.Entries + if len(s.values) > 0 { + var err error + currentEntries, err = discovery.CreateEntries(s.values) + if err != nil { + errCh <- err + } else { + ch <- currentEntries + } + } + + // Periodically send updates. + for { + select { + case <-ticker.C: + newEntries, err := discovery.CreateEntries(s.values) + if err != nil { + errCh <- err + continue + } + + // Check if the file has really changed. + if !newEntries.Equals(currentEntries) { + ch <- newEntries + } + currentEntries = newEntries + case <-stopCh: + ticker.Stop() + return + } + } + }() + + return ch, errCh +} + +// Register adds a new address to the discovery. +func (s *Discovery) Register(addr string) error { + s.values = append(s.values, addr) + return nil +} diff --git a/discovery/memory/memory_test.go b/discovery/memory/memory_test.go new file mode 100644 index 0000000..c2da0a0 --- /dev/null +++ b/discovery/memory/memory_test.go @@ -0,0 +1,48 @@ +package memory + +import ( + "testing" + + "github.com/docker/docker/pkg/discovery" + "github.com/go-check/check" +) + +// Hook up gocheck into the "go test" runner. +func Test(t *testing.T) { check.TestingT(t) } + +type discoverySuite struct{} + +var _ = check.Suite(&discoverySuite{}) + +func (s *discoverySuite) TestWatch(c *check.C) { + d := &Discovery{} + d.Initialize("foo", 1000, 0, nil) + stopCh := make(chan struct{}) + ch, errCh := d.Watch(stopCh) + + // We have to drain the error channel otherwise Watch will get stuck. + go func() { + for range errCh { + } + }() + + expected := discovery.Entries{ + &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, + } + + c.Assert(d.Register("1.1.1.1:1111"), check.IsNil) + c.Assert(<-ch, check.DeepEquals, expected) + + expected = discovery.Entries{ + &discovery.Entry{Host: "1.1.1.1", Port: "1111"}, + &discovery.Entry{Host: "2.2.2.2", Port: "2222"}, + } + + c.Assert(d.Register("2.2.2.2:2222"), check.IsNil) + c.Assert(<-ch, check.DeepEquals, expected) + + // Stop and make sure it closes all channels. + close(stopCh) + c.Assert(<-ch, check.IsNil) + c.Assert(<-errCh, check.IsNil) +}