149 lines
3.5 KiB
Go
149 lines
3.5 KiB
Go
|
package kv
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"path"
|
||
|
"strings"
|
||
|
"time"
|
||
|
|
||
|
log "github.com/Sirupsen/logrus"
|
||
|
"github.com/docker/docker/pkg/discovery"
|
||
|
"github.com/docker/libkv"
|
||
|
"github.com/docker/libkv/store"
|
||
|
"github.com/docker/libkv/store/consul"
|
||
|
"github.com/docker/libkv/store/etcd"
|
||
|
"github.com/docker/libkv/store/zookeeper"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
discoveryPath = "docker/nodes"
|
||
|
)
|
||
|
|
||
|
// Discovery is exported
|
||
|
type Discovery struct {
|
||
|
backend store.Backend
|
||
|
store store.Store
|
||
|
heartbeat time.Duration
|
||
|
ttl time.Duration
|
||
|
prefix string
|
||
|
path string
|
||
|
}
|
||
|
|
||
|
func init() {
|
||
|
Init()
|
||
|
}
|
||
|
|
||
|
// Init is exported
|
||
|
func Init() {
|
||
|
// Register to libkv
|
||
|
zookeeper.Register()
|
||
|
consul.Register()
|
||
|
etcd.Register()
|
||
|
|
||
|
// Register to internal discovery service
|
||
|
discovery.Register("zk", &Discovery{backend: store.ZK})
|
||
|
discovery.Register("consul", &Discovery{backend: store.CONSUL})
|
||
|
discovery.Register("etcd", &Discovery{backend: store.ETCD})
|
||
|
}
|
||
|
|
||
|
// Initialize is exported
|
||
|
func (s *Discovery) Initialize(uris string, heartbeat time.Duration, ttl time.Duration) error {
|
||
|
var (
|
||
|
parts = strings.SplitN(uris, "/", 2)
|
||
|
addrs = strings.Split(parts[0], ",")
|
||
|
err error
|
||
|
)
|
||
|
|
||
|
// A custom prefix to the path can be optionally used.
|
||
|
if len(parts) == 2 {
|
||
|
s.prefix = parts[1]
|
||
|
}
|
||
|
|
||
|
s.heartbeat = heartbeat
|
||
|
s.ttl = ttl
|
||
|
s.path = path.Join(s.prefix, discoveryPath)
|
||
|
|
||
|
// Creates a new store, will ignore options given
|
||
|
// if not supported by the chosen store
|
||
|
s.store, err = libkv.NewStore(s.backend, addrs, nil)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// Watch the store until either there's a store error or we receive a stop request.
|
||
|
// Returns false if we shouldn't attempt watching the store anymore (stop request received).
|
||
|
func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KVPair, discoveryCh chan discovery.Entries, errCh chan error) bool {
|
||
|
for {
|
||
|
select {
|
||
|
case pairs := <-watchCh:
|
||
|
if pairs == nil {
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
log.WithField("discovery", s.backend).Debugf("Watch triggered with %d nodes", len(pairs))
|
||
|
|
||
|
// Convert `KVPair` into `discovery.Entry`.
|
||
|
addrs := make([]string, len(pairs))
|
||
|
for _, pair := range pairs {
|
||
|
addrs = append(addrs, string(pair.Value))
|
||
|
}
|
||
|
|
||
|
entries, err := discovery.CreateEntries(addrs)
|
||
|
if err != nil {
|
||
|
errCh <- err
|
||
|
} else {
|
||
|
discoveryCh <- entries
|
||
|
}
|
||
|
case <-stopCh:
|
||
|
// We were requested to stop watching.
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Watch is exported
|
||
|
func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
|
||
|
ch := make(chan discovery.Entries)
|
||
|
errCh := make(chan error)
|
||
|
|
||
|
go func() {
|
||
|
defer close(ch)
|
||
|
defer close(errCh)
|
||
|
|
||
|
// Forever: Create a store watch, watch until we get an error and then try again.
|
||
|
// Will only stop if we receive a stopCh request.
|
||
|
for {
|
||
|
// Set up a watch.
|
||
|
watchCh, err := s.store.WatchTree(s.path, stopCh)
|
||
|
if err != nil {
|
||
|
errCh <- err
|
||
|
} else {
|
||
|
if !s.watchOnce(stopCh, watchCh, ch, errCh) {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// If we get here it means the store watch channel was closed. This
|
||
|
// is unexpected so let's retry later.
|
||
|
errCh <- fmt.Errorf("Unexpected watch error")
|
||
|
time.Sleep(s.heartbeat)
|
||
|
}
|
||
|
}()
|
||
|
return ch, errCh
|
||
|
}
|
||
|
|
||
|
// Register is exported
|
||
|
func (s *Discovery) Register(addr string) error {
|
||
|
opts := &store.WriteOptions{TTL: s.ttl}
|
||
|
return s.store.Put(path.Join(s.path, addr), []byte(addr), opts)
|
||
|
}
|
||
|
|
||
|
// Store returns the underlying store used by KV discovery.
|
||
|
func (s *Discovery) Store() store.Store {
|
||
|
return s.store
|
||
|
}
|
||
|
|
||
|
// Prefix returns the store prefix
|
||
|
func (s *Discovery) Prefix() string {
|
||
|
return s.prefix
|
||
|
}
|