Add an unsafe memory discovery store for testing.
Signed-off-by: David Calavera <david.calavera@gmail.com>
This commit is contained in:
parent
cb3454b895
commit
409a9da270
2 changed files with 130 additions and 0 deletions
82
discovery/memory/memory.go
Normal file
82
discovery/memory/memory.go
Normal file
|
@ -0,0 +1,82 @@
|
|||
package memory
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/pkg/discovery"
|
||||
)
|
||||
|
||||
// Discovery implements a descovery backend that keeps
|
||||
// data in memory.
|
||||
type Discovery struct {
|
||||
heartbeat time.Duration
|
||||
values []string
|
||||
}
|
||||
|
||||
func init() {
|
||||
Init()
|
||||
}
|
||||
|
||||
// Init registers the memory backend on demand.
|
||||
func Init() {
|
||||
discovery.Register("memory", &Discovery{})
|
||||
}
|
||||
|
||||
// Initialize sets the heartbeat for the memory backend.
|
||||
func (s *Discovery) Initialize(_ string, heartbeat time.Duration, _ time.Duration, _ map[string]string) error {
|
||||
s.heartbeat = heartbeat
|
||||
return nil
|
||||
}
|
||||
|
||||
// Watch sends periodic discovery updates to a channel.
|
||||
func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
|
||||
ch := make(chan discovery.Entries)
|
||||
errCh := make(chan error)
|
||||
ticker := time.NewTicker(s.heartbeat)
|
||||
|
||||
go func() {
|
||||
defer close(errCh)
|
||||
defer close(ch)
|
||||
|
||||
// Send the initial entries if available.
|
||||
var currentEntries discovery.Entries
|
||||
if len(s.values) > 0 {
|
||||
var err error
|
||||
currentEntries, err = discovery.CreateEntries(s.values)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
} else {
|
||||
ch <- currentEntries
|
||||
}
|
||||
}
|
||||
|
||||
// Periodically send updates.
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
newEntries, err := discovery.CreateEntries(s.values)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if the file has really changed.
|
||||
if !newEntries.Equals(currentEntries) {
|
||||
ch <- newEntries
|
||||
}
|
||||
currentEntries = newEntries
|
||||
case <-stopCh:
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return ch, errCh
|
||||
}
|
||||
|
||||
// Register adds a new address to the discovery.
|
||||
func (s *Discovery) Register(addr string) error {
|
||||
s.values = append(s.values, addr)
|
||||
return nil
|
||||
}
|
48
discovery/memory/memory_test.go
Normal file
48
discovery/memory/memory_test.go
Normal file
|
@ -0,0 +1,48 @@
|
|||
package memory
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/docker/docker/pkg/discovery"
|
||||
"github.com/go-check/check"
|
||||
)
|
||||
|
||||
// Hook up gocheck into the "go test" runner.
|
||||
func Test(t *testing.T) { check.TestingT(t) }
|
||||
|
||||
type discoverySuite struct{}
|
||||
|
||||
var _ = check.Suite(&discoverySuite{})
|
||||
|
||||
func (s *discoverySuite) TestWatch(c *check.C) {
|
||||
d := &Discovery{}
|
||||
d.Initialize("foo", 1000, 0, nil)
|
||||
stopCh := make(chan struct{})
|
||||
ch, errCh := d.Watch(stopCh)
|
||||
|
||||
// We have to drain the error channel otherwise Watch will get stuck.
|
||||
go func() {
|
||||
for range errCh {
|
||||
}
|
||||
}()
|
||||
|
||||
expected := discovery.Entries{
|
||||
&discovery.Entry{Host: "1.1.1.1", Port: "1111"},
|
||||
}
|
||||
|
||||
c.Assert(d.Register("1.1.1.1:1111"), check.IsNil)
|
||||
c.Assert(<-ch, check.DeepEquals, expected)
|
||||
|
||||
expected = discovery.Entries{
|
||||
&discovery.Entry{Host: "1.1.1.1", Port: "1111"},
|
||||
&discovery.Entry{Host: "2.2.2.2", Port: "2222"},
|
||||
}
|
||||
|
||||
c.Assert(d.Register("2.2.2.2:2222"), check.IsNil)
|
||||
c.Assert(<-ch, check.DeepEquals, expected)
|
||||
|
||||
// Stop and make sure it closes all channels.
|
||||
close(stopCh)
|
||||
c.Assert(<-ch, check.IsNil)
|
||||
c.Assert(<-errCh, check.IsNil)
|
||||
}
|
Loading…
Reference in a new issue