Merge pull request #20843 from calavera/plugin_any_transport

Call plugins with custom transports.
This commit is contained in:
Brian Goff 2016-03-04 11:59:32 -05:00
commit f558904849
5 changed files with 123 additions and 23 deletions

View file

@ -18,10 +18,11 @@ import (
"testing" "testing"
"bytes" "bytes"
"strings"
"github.com/docker/docker/pkg/plugins" "github.com/docker/docker/pkg/plugins"
"github.com/docker/go-connections/tlsconfig" "github.com/docker/go-connections/tlsconfig"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"strings"
) )
const pluginAddress = "authzplugin.sock" const pluginAddress = "authzplugin.sock"

View file

@ -6,17 +6,17 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strings" "net/url"
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/plugins/transport"
"github.com/docker/go-connections/sockets" "github.com/docker/go-connections/sockets"
"github.com/docker/go-connections/tlsconfig" "github.com/docker/go-connections/tlsconfig"
) )
const ( const (
versionMimetype = "application/vnd.docker.plugins.v1.2+json" defaultTimeOut = 30
defaultTimeOut = 30
) )
// NewClient creates a new plugin client (http). // NewClient creates a new plugin client (http).
@ -29,23 +29,38 @@ func NewClient(addr string, tlsConfig tlsconfig.Options) (*Client, error) {
} }
tr.TLSClientConfig = c tr.TLSClientConfig = c
protoAndAddr := strings.Split(addr, "://") u, err := url.Parse(addr)
if err := sockets.ConfigureTransport(tr, protoAndAddr[0], protoAndAddr[1]); err != nil { if err != nil {
return nil, err return nil, err
} }
socket := u.Host
scheme := protoAndAddr[0] if socket == "" {
if scheme != "https" { // valid local socket addresses have the host empty.
scheme = "http" socket = u.Path
}
if err := sockets.ConfigureTransport(tr, u.Scheme, socket); err != nil {
return nil, err
}
scheme := httpScheme(u)
clientTransport := transport.NewHTTPTransport(tr, scheme, socket)
return NewClientWithTransport(clientTransport), nil
}
// NewClientWithTransport creates a new plugin client with a given transport.
func NewClientWithTransport(tr transport.Transport) *Client {
return &Client{
http: &http.Client{
Transport: tr,
},
requestFactory: tr,
} }
return &Client{&http.Client{Transport: tr}, scheme, protoAndAddr[1]}, nil
} }
// Client represents a plugin client. // Client represents a plugin client.
type Client struct { type Client struct {
http *http.Client // http client to use http *http.Client // http client to use
scheme string // scheme protocol of the plugin requestFactory transport.RequestFactory
addr string // http address of the plugin
} }
// Call calls the specified method with the specified arguments for the plugin. // Call calls the specified method with the specified arguments for the plugin.
@ -94,13 +109,10 @@ func (c *Client) SendFile(serviceMethod string, data io.Reader, ret interface{})
} }
func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool) (io.ReadCloser, error) { func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool) (io.ReadCloser, error) {
req, err := http.NewRequest("POST", "/"+serviceMethod, data) req, err := c.requestFactory.NewRequest(serviceMethod, data)
if err != nil { if err != nil {
return nil, err return nil, err
} }
req.Header.Add("Accept", versionMimetype)
req.URL.Scheme = c.scheme
req.URL.Host = c.addr
var retries int var retries int
start := time.Now() start := time.Now()
@ -117,7 +129,7 @@ func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool)
return nil, err return nil, err
} }
retries++ retries++
logrus.Warnf("Unable to connect to plugin: %s, retrying in %v", c.addr, timeOff) logrus.Warnf("Unable to connect to plugin: %s, retrying in %v", req.URL, timeOff)
time.Sleep(timeOff) time.Sleep(timeOff)
continue continue
} }
@ -163,3 +175,11 @@ func backoff(retries int) time.Duration {
func abort(start time.Time, timeOff time.Duration) bool { func abort(start time.Time, timeOff time.Duration) bool {
return timeOff+time.Since(start) >= time.Duration(defaultTimeOut)*time.Second return timeOff+time.Since(start) >= time.Duration(defaultTimeOut)*time.Second
} }
func httpScheme(u *url.URL) string {
scheme := u.Scheme
if scheme != "https" {
scheme = "http"
}
return scheme
}

View file

@ -4,10 +4,12 @@ import (
"io" "io"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url"
"reflect" "reflect"
"testing" "testing"
"time" "time"
"github.com/docker/docker/pkg/plugins/transport"
"github.com/docker/go-connections/tlsconfig" "github.com/docker/go-connections/tlsconfig"
) )
@ -48,7 +50,7 @@ func TestEchoInputOutput(t *testing.T) {
} }
header := w.Header() header := w.Header()
header.Set("Content-Type", versionMimetype) header.Set("Content-Type", transport.VersionMimetype)
io.Copy(w, r.Body) io.Copy(w, r.Body)
}) })
@ -119,9 +121,14 @@ func TestClientScheme(t *testing.T) {
} }
for addr, scheme := range cases { for addr, scheme := range cases {
c, _ := NewClient(addr, tlsconfig.Options{InsecureSkipVerify: true}) u, err := url.Parse(addr)
if c.scheme != scheme { if err != nil {
t.Fatalf("URL scheme mismatch, expected %s, got %s", scheme, c.scheme) t.Fatal(err)
}
s := httpScheme(u)
if s != scheme {
t.Fatalf("URL scheme mismatch, expected %s, got %s", scheme, s)
} }
} }
} }

36
plugins/transport/http.go Normal file
View file

@ -0,0 +1,36 @@
package transport
import (
"io"
"net/http"
)
// httpTransport holds an http.RoundTripper
// and information about the scheme and address the transport
// sends request to.
type httpTransport struct {
http.RoundTripper
scheme string
addr string
}
// NewHTTPTransport creates a new httpTransport.
func NewHTTPTransport(r http.RoundTripper, scheme, addr string) Transport {
return httpTransport{
RoundTripper: r,
scheme: scheme,
addr: addr,
}
}
// NewRequest creates a new http.Request and sets the URL
// scheme and address with the transport's fields.
func (t httpTransport) NewRequest(path string, data io.Reader) (*http.Request, error) {
req, err := newHTTPRequest(path, data)
if err != nil {
return nil, err
}
req.URL.Scheme = t.scheme
req.URL.Host = t.addr
return req, nil
}

View file

@ -0,0 +1,36 @@
package transport
import (
"io"
"net/http"
"strings"
)
// VersionMimetype is the Content-Type the engine sends to plugins.
const VersionMimetype = "application/vnd.docker.plugins.v1.2+json"
// RequestFactory defines an interface that
// transports can implement to create new requests.
type RequestFactory interface {
NewRequest(path string, data io.Reader) (*http.Request, error)
}
// Transport defines an interface that plugin transports
// must implement.
type Transport interface {
http.RoundTripper
RequestFactory
}
// newHTTPRequest creates a new request with a path and a body.
func newHTTPRequest(path string, data io.Reader) (*http.Request, error) {
if !strings.HasPrefix(path, "/") {
path = "/" + path
}
req, err := http.NewRequest("POST", path, data)
if err != nil {
return nil, err
}
req.Header.Add("Accept", VersionMimetype)
return req, nil
}