Call plugins with custom transports.
Small refactor to be able to use custom transports to call remote plugins. Signed-off-by: David Calavera <david.calavera@gmail.com>
This commit is contained in:
parent
98943aafae
commit
44005e59d4
5 changed files with 123 additions and 23 deletions
|
@ -18,10 +18,11 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/docker/docker/pkg/plugins"
|
"github.com/docker/docker/pkg/plugins"
|
||||||
"github.com/docker/go-connections/tlsconfig"
|
"github.com/docker/go-connections/tlsconfig"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"strings"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const pluginAddress = "authzplugin.sock"
|
const pluginAddress = "authzplugin.sock"
|
||||||
|
|
|
@ -6,17 +6,17 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
|
"github.com/docker/docker/pkg/plugins/transport"
|
||||||
"github.com/docker/go-connections/sockets"
|
"github.com/docker/go-connections/sockets"
|
||||||
"github.com/docker/go-connections/tlsconfig"
|
"github.com/docker/go-connections/tlsconfig"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
versionMimetype = "application/vnd.docker.plugins.v1.2+json"
|
defaultTimeOut = 30
|
||||||
defaultTimeOut = 30
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewClient creates a new plugin client (http).
|
// NewClient creates a new plugin client (http).
|
||||||
|
@ -29,23 +29,38 @@ func NewClient(addr string, tlsConfig tlsconfig.Options) (*Client, error) {
|
||||||
}
|
}
|
||||||
tr.TLSClientConfig = c
|
tr.TLSClientConfig = c
|
||||||
|
|
||||||
protoAndAddr := strings.Split(addr, "://")
|
u, err := url.Parse(addr)
|
||||||
if err := sockets.ConfigureTransport(tr, protoAndAddr[0], protoAndAddr[1]); err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
socket := u.Host
|
||||||
scheme := protoAndAddr[0]
|
if socket == "" {
|
||||||
if scheme != "https" {
|
// valid local socket addresses have the host empty.
|
||||||
scheme = "http"
|
socket = u.Path
|
||||||
|
}
|
||||||
|
if err := sockets.ConfigureTransport(tr, u.Scheme, socket); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
scheme := httpScheme(u)
|
||||||
|
|
||||||
|
clientTransport := transport.NewHTTPTransport(tr, scheme, socket)
|
||||||
|
return NewClientWithTransport(clientTransport), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClientWithTransport creates a new plugin client with a given transport.
|
||||||
|
func NewClientWithTransport(tr transport.Transport) *Client {
|
||||||
|
return &Client{
|
||||||
|
http: &http.Client{
|
||||||
|
Transport: tr,
|
||||||
|
},
|
||||||
|
requestFactory: tr,
|
||||||
}
|
}
|
||||||
return &Client{&http.Client{Transport: tr}, scheme, protoAndAddr[1]}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Client represents a plugin client.
|
// Client represents a plugin client.
|
||||||
type Client struct {
|
type Client struct {
|
||||||
http *http.Client // http client to use
|
http *http.Client // http client to use
|
||||||
scheme string // scheme protocol of the plugin
|
requestFactory transport.RequestFactory
|
||||||
addr string // http address of the plugin
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call calls the specified method with the specified arguments for the plugin.
|
// Call calls the specified method with the specified arguments for the plugin.
|
||||||
|
@ -94,13 +109,10 @@ func (c *Client) SendFile(serviceMethod string, data io.Reader, ret interface{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool) (io.ReadCloser, error) {
|
func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool) (io.ReadCloser, error) {
|
||||||
req, err := http.NewRequest("POST", "/"+serviceMethod, data)
|
req, err := c.requestFactory.NewRequest(serviceMethod, data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
req.Header.Add("Accept", versionMimetype)
|
|
||||||
req.URL.Scheme = c.scheme
|
|
||||||
req.URL.Host = c.addr
|
|
||||||
|
|
||||||
var retries int
|
var retries int
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
@ -117,7 +129,7 @@ func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
retries++
|
retries++
|
||||||
logrus.Warnf("Unable to connect to plugin: %s, retrying in %v", c.addr, timeOff)
|
logrus.Warnf("Unable to connect to plugin: %s, retrying in %v", req.URL, timeOff)
|
||||||
time.Sleep(timeOff)
|
time.Sleep(timeOff)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -163,3 +175,11 @@ func backoff(retries int) time.Duration {
|
||||||
func abort(start time.Time, timeOff time.Duration) bool {
|
func abort(start time.Time, timeOff time.Duration) bool {
|
||||||
return timeOff+time.Since(start) >= time.Duration(defaultTimeOut)*time.Second
|
return timeOff+time.Since(start) >= time.Duration(defaultTimeOut)*time.Second
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func httpScheme(u *url.URL) string {
|
||||||
|
scheme := u.Scheme
|
||||||
|
if scheme != "https" {
|
||||||
|
scheme = "http"
|
||||||
|
}
|
||||||
|
return scheme
|
||||||
|
}
|
||||||
|
|
|
@ -4,10 +4,12 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"net/url"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/docker/docker/pkg/plugins/transport"
|
||||||
"github.com/docker/go-connections/tlsconfig"
|
"github.com/docker/go-connections/tlsconfig"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -48,7 +50,7 @@ func TestEchoInputOutput(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
header := w.Header()
|
header := w.Header()
|
||||||
header.Set("Content-Type", versionMimetype)
|
header.Set("Content-Type", transport.VersionMimetype)
|
||||||
|
|
||||||
io.Copy(w, r.Body)
|
io.Copy(w, r.Body)
|
||||||
})
|
})
|
||||||
|
@ -119,9 +121,14 @@ func TestClientScheme(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for addr, scheme := range cases {
|
for addr, scheme := range cases {
|
||||||
c, _ := NewClient(addr, tlsconfig.Options{InsecureSkipVerify: true})
|
u, err := url.Parse(addr)
|
||||||
if c.scheme != scheme {
|
if err != nil {
|
||||||
t.Fatalf("URL scheme mismatch, expected %s, got %s", scheme, c.scheme)
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
s := httpScheme(u)
|
||||||
|
|
||||||
|
if s != scheme {
|
||||||
|
t.Fatalf("URL scheme mismatch, expected %s, got %s", scheme, s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
36
plugins/transport/http.go
Normal file
36
plugins/transport/http.go
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
package transport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
// httpTransport holds an http.RoundTripper
|
||||||
|
// and information about the scheme and address the transport
|
||||||
|
// sends request to.
|
||||||
|
type httpTransport struct {
|
||||||
|
http.RoundTripper
|
||||||
|
scheme string
|
||||||
|
addr string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewHTTPTransport creates a new httpTransport.
|
||||||
|
func NewHTTPTransport(r http.RoundTripper, scheme, addr string) Transport {
|
||||||
|
return httpTransport{
|
||||||
|
RoundTripper: r,
|
||||||
|
scheme: scheme,
|
||||||
|
addr: addr,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRequest creates a new http.Request and sets the URL
|
||||||
|
// scheme and address with the transport's fields.
|
||||||
|
func (t httpTransport) NewRequest(path string, data io.Reader) (*http.Request, error) {
|
||||||
|
req, err := newHTTPRequest(path, data)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.URL.Scheme = t.scheme
|
||||||
|
req.URL.Host = t.addr
|
||||||
|
return req, nil
|
||||||
|
}
|
36
plugins/transport/transport.go
Normal file
36
plugins/transport/transport.go
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
package transport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// VersionMimetype is the Content-Type the engine sends to plugins.
|
||||||
|
const VersionMimetype = "application/vnd.docker.plugins.v1.2+json"
|
||||||
|
|
||||||
|
// RequestFactory defines an interface that
|
||||||
|
// transports can implement to create new requests.
|
||||||
|
type RequestFactory interface {
|
||||||
|
NewRequest(path string, data io.Reader) (*http.Request, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Transport defines an interface that plugin transports
|
||||||
|
// must implement.
|
||||||
|
type Transport interface {
|
||||||
|
http.RoundTripper
|
||||||
|
RequestFactory
|
||||||
|
}
|
||||||
|
|
||||||
|
// newHTTPRequest creates a new request with a path and a body.
|
||||||
|
func newHTTPRequest(path string, data io.Reader) (*http.Request, error) {
|
||||||
|
if !strings.HasPrefix(path, "/") {
|
||||||
|
path = "/" + path
|
||||||
|
}
|
||||||
|
req, err := http.NewRequest("POST", path, data)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Add("Accept", VersionMimetype)
|
||||||
|
return req, nil
|
||||||
|
}
|
Loading…
Reference in a new issue