Remote plugins plumbing.
Signed-off-by: David Calavera <david.calavera@gmail.com>
This commit is contained in:
parent
39a74950fe
commit
774251695e
5 changed files with 435 additions and 0 deletions
100
plugins/client.go
Normal file
100
plugins/client.go
Normal file
|
@ -0,0 +1,100 @@
|
|||
package plugins
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
versionMimetype = "appplication/vnd.docker.plugins.v1+json"
|
||||
defaultTimeOut = 120
|
||||
)
|
||||
|
||||
func NewClient(addr string) *Client {
|
||||
tr := &http.Transport{}
|
||||
protoAndAddr := strings.Split(addr, "://")
|
||||
configureTCPTransport(tr, protoAndAddr[0], protoAndAddr[1])
|
||||
return &Client{&http.Client{Transport: tr}, protoAndAddr[1]}
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
http *http.Client
|
||||
addr string
|
||||
}
|
||||
|
||||
func (c *Client) Call(serviceMethod string, args interface{}, ret interface{}) error {
|
||||
var buf bytes.Buffer
|
||||
if err := json.NewEncoder(&buf).Encode(args); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", "/"+serviceMethod, &buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Add("Accept", versionMimetype)
|
||||
req.URL.Scheme = "http"
|
||||
req.URL.Host = c.addr
|
||||
|
||||
var retries int
|
||||
start := time.Now()
|
||||
|
||||
for {
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
timeOff := backoff(retries)
|
||||
if timeOff+time.Since(start) > defaultTimeOut {
|
||||
return err
|
||||
}
|
||||
retries++
|
||||
logrus.Warn("Unable to connect to plugin: %s, retrying in %ds\n", c.addr, timeOff)
|
||||
time.Sleep(timeOff)
|
||||
continue
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
remoteErr, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Plugin Error: %s", remoteErr)
|
||||
}
|
||||
|
||||
return json.NewDecoder(resp.Body).Decode(&ret)
|
||||
}
|
||||
}
|
||||
|
||||
func backoff(retries int) time.Duration {
|
||||
b, max := float64(1), float64(defaultTimeOut)
|
||||
for b < max && retries > 0 {
|
||||
b *= 2
|
||||
retries--
|
||||
}
|
||||
if b > max {
|
||||
b = max
|
||||
}
|
||||
return time.Duration(b)
|
||||
}
|
||||
|
||||
func configureTCPTransport(tr *http.Transport, proto, addr string) {
|
||||
// Why 32? See https://github.com/docker/docker/pull/8035.
|
||||
timeout := 32 * time.Second
|
||||
if proto == "unix" {
|
||||
// No need for compression in local communications.
|
||||
tr.DisableCompression = true
|
||||
tr.Dial = func(_, _ string) (net.Conn, error) {
|
||||
return net.DialTimeout(proto, addr, timeout)
|
||||
}
|
||||
} else {
|
||||
tr.Proxy = http.ProxyFromEnvironment
|
||||
tr.Dial = (&net.Dialer{Timeout: timeout}).Dial
|
||||
}
|
||||
}
|
63
plugins/client_test.go
Normal file
63
plugins/client_test.go
Normal file
|
@ -0,0 +1,63 @@
|
|||
package plugins
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var (
|
||||
mux *http.ServeMux
|
||||
server *httptest.Server
|
||||
)
|
||||
|
||||
func setupRemotePluginServer() string {
|
||||
mux = http.NewServeMux()
|
||||
server = httptest.NewServer(mux)
|
||||
return server.URL
|
||||
}
|
||||
|
||||
func teardownRemotePluginServer() {
|
||||
if server != nil {
|
||||
server.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func TestFailedConnection(t *testing.T) {
|
||||
c := NewClient("tcp://127.0.0.1:1")
|
||||
err := c.Call("Service.Method", nil, nil)
|
||||
if err == nil {
|
||||
t.Fatal("Unexpected successful connection")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEchoInputOutput(t *testing.T) {
|
||||
addr := setupRemotePluginServer()
|
||||
defer teardownRemotePluginServer()
|
||||
|
||||
m := Manifest{[]string{"VolumeDriver", "NetworkDriver"}}
|
||||
|
||||
mux.HandleFunc("/Test.Echo", func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "POST" {
|
||||
t.Fatalf("Expected POST, got %s\n", r.Method)
|
||||
}
|
||||
|
||||
header := w.Header()
|
||||
header.Set("Content-Type", versionMimetype)
|
||||
|
||||
io.Copy(w, r.Body)
|
||||
})
|
||||
|
||||
c := NewClient(addr)
|
||||
var output Manifest
|
||||
err := c.Call("Test.Echo", m, &output)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(output, m) {
|
||||
t.Fatalf("Expected %v, was %v\n", m, output)
|
||||
}
|
||||
}
|
78
plugins/discovery.go
Normal file
78
plugins/discovery.go
Normal file
|
@ -0,0 +1,78 @@
|
|||
package plugins
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const defaultLocalRegistry = "/usr/share/docker/plugins"
|
||||
|
||||
var (
|
||||
ErrNotFound = errors.New("Plugin not found")
|
||||
)
|
||||
|
||||
type Registry interface {
|
||||
Plugins() ([]*Plugin, error)
|
||||
Plugin(name string) (*Plugin, error)
|
||||
}
|
||||
|
||||
type LocalRegistry struct {
|
||||
path string
|
||||
}
|
||||
|
||||
func newLocalRegistry(path string) *LocalRegistry {
|
||||
if len(path) == 0 {
|
||||
path = defaultLocalRegistry
|
||||
}
|
||||
|
||||
return &LocalRegistry{path}
|
||||
}
|
||||
|
||||
func (l *LocalRegistry) Plugin(name string) (*Plugin, error) {
|
||||
filepath := filepath.Join(l.path, name)
|
||||
specpath := filepath + ".spec"
|
||||
if fi, err := os.Stat(specpath); err == nil {
|
||||
return readPluginInfo(specpath, fi)
|
||||
}
|
||||
socketpath := filepath + ".sock"
|
||||
if fi, err := os.Stat(socketpath); err == nil {
|
||||
return readPluginInfo(socketpath, fi)
|
||||
}
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
func readPluginInfo(path string, fi os.FileInfo) (*Plugin, error) {
|
||||
name := strings.Split(fi.Name(), ".")[0]
|
||||
|
||||
if fi.Mode()&os.ModeSocket != 0 {
|
||||
return &Plugin{
|
||||
Name: name,
|
||||
Addr: "unix://" + path,
|
||||
}, nil
|
||||
}
|
||||
|
||||
content, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
addr := strings.TrimSpace(string(content))
|
||||
|
||||
u, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(u.Scheme) == 0 {
|
||||
return nil, fmt.Errorf("Unknown protocol")
|
||||
}
|
||||
|
||||
return &Plugin{
|
||||
Name: name,
|
||||
Addr: addr,
|
||||
}, nil
|
||||
}
|
108
plugins/discovery_test.go
Normal file
108
plugins/discovery_test.go
Normal file
|
@ -0,0 +1,108 @@
|
|||
package plugins
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestUnknownLocalPath(t *testing.T) {
|
||||
tmpdir, err := ioutil.TempDir("", "docker-test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
l := newLocalRegistry(filepath.Join(tmpdir, "unknown"))
|
||||
_, err = l.Plugin("foo")
|
||||
if err == nil || err != ErrNotFound {
|
||||
t.Fatalf("Expected error for unknown directory")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocalSocket(t *testing.T) {
|
||||
tmpdir, err := ioutil.TempDir("", "docker-test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(tmpdir)
|
||||
l, err := net.Listen("unix", filepath.Join(tmpdir, "echo.sock"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer l.Close()
|
||||
|
||||
r := newLocalRegistry(tmpdir)
|
||||
p, err := r.Plugin("echo")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
pp, err := r.Plugin("echo")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !reflect.DeepEqual(p, pp) {
|
||||
t.Fatalf("Expected %v, was %v\n", p, pp)
|
||||
}
|
||||
|
||||
if p.Name != "echo" {
|
||||
t.Fatalf("Expected plugin `echo`, got %s\n", p.Name)
|
||||
}
|
||||
|
||||
addr := fmt.Sprintf("unix://%s/echo.sock", tmpdir)
|
||||
if p.Addr != addr {
|
||||
t.Fatalf("Expected plugin addr `%s`, got %s\n", addr, p.Addr)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileSpecPlugin(t *testing.T) {
|
||||
tmpdir, err := ioutil.TempDir("", "docker-test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
path string
|
||||
name string
|
||||
addr string
|
||||
fail bool
|
||||
}{
|
||||
{filepath.Join(tmpdir, "echo.spec"), "echo", "unix://var/lib/docker/plugins/echo.sock", false},
|
||||
{filepath.Join(tmpdir, "foo.spec"), "foo", "tcp://localhost:8080", false},
|
||||
{filepath.Join(tmpdir, "bar.spec"), "bar", "localhost:8080", true}, // unknown transport
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
if err = os.MkdirAll(path.Dir(c.path), 0755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = ioutil.WriteFile(c.path, []byte(c.addr), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r := newLocalRegistry(tmpdir)
|
||||
p, err := r.Plugin(c.name)
|
||||
if c.fail && err == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if p.Name != c.name {
|
||||
t.Fatalf("Expected plugin `%s`, got %s\n", c.name, p.Name)
|
||||
}
|
||||
|
||||
if p.Addr != c.addr {
|
||||
t.Fatalf("Expected plugin addr `%s`, got %s\n", c.addr, p.Addr)
|
||||
}
|
||||
os.Remove(c.path)
|
||||
}
|
||||
}
|
86
plugins/plugins.go
Normal file
86
plugins/plugins.go
Normal file
|
@ -0,0 +1,86 @@
|
|||
package plugins
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrNotImplements = errors.New("Plugin does not implement the requested driver")
|
||||
)
|
||||
|
||||
type plugins struct {
|
||||
sync.Mutex
|
||||
plugins map[string]*Plugin
|
||||
}
|
||||
|
||||
var storage = plugins{plugins: make(map[string]*Plugin)}
|
||||
|
||||
type Manifest struct {
|
||||
Implements []string
|
||||
}
|
||||
|
||||
type Plugin struct {
|
||||
Name string
|
||||
Addr string
|
||||
Client *Client
|
||||
Manifest *Manifest
|
||||
}
|
||||
|
||||
func (p *Plugin) activate() error {
|
||||
m := new(Manifest)
|
||||
p.Client = NewClient(p.Addr)
|
||||
err := p.Client.Call("Plugin.Activate", nil, m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logrus.Debugf("%s's manifest: %v", p.Name, m)
|
||||
p.Manifest = m
|
||||
return nil
|
||||
}
|
||||
|
||||
func load(name string) (*Plugin, error) {
|
||||
registry := newLocalRegistry("")
|
||||
pl, err := registry.Plugin(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := pl.activate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pl, nil
|
||||
}
|
||||
|
||||
func get(name string) (*Plugin, error) {
|
||||
storage.Lock()
|
||||
defer storage.Unlock()
|
||||
pl, ok := storage.plugins[name]
|
||||
if ok {
|
||||
return pl, nil
|
||||
}
|
||||
pl, err := load(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logrus.Debugf("Plugin: %v", pl)
|
||||
storage.plugins[name] = pl
|
||||
return pl, nil
|
||||
}
|
||||
|
||||
func Get(name, imp string) (*Plugin, error) {
|
||||
pl, err := get(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, driver := range pl.Manifest.Implements {
|
||||
logrus.Debugf("%s implements: %s", name, driver)
|
||||
if driver == imp {
|
||||
return pl, nil
|
||||
}
|
||||
}
|
||||
return nil, ErrNotImplements
|
||||
}
|
Loading…
Reference in a new issue