b290cff825
When a plugin is first found, it is loaded into the available plugins even though it's not activated yet. If activation fails it is taken out of the list. While it is in the list, other callers may see it and try to check it's manifest. If it is not fully activated yet, the manifest will be nil and cause a panic. This is especially problematic for drivers that are down and have not been activated yet. We could just not load the plugin into the available list until it's fully active, however that will just cause multiple of the same plugin to attemp to be loaded. We could check if the manifest is nil and return early (instead of panicing on a nil manifest), but this will cause a 2nd caller to receive a response while the first caller is still waiting, which can be awkward. This change uses a condition variable to handle activation (instead of sync.Once). If the plugin is not activated, callers will all wait until it is activated and receive a broadcast from the condition variable signaling that it's ok to proceed, in which case we'll check if their was an error in activation and proceed accordingly. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
186 lines
4.5 KiB
Go
186 lines
4.5 KiB
Go
package plugins
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"io"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"net/url"
|
|
"time"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/docker/docker/pkg/plugins/transport"
|
|
"github.com/docker/go-connections/sockets"
|
|
"github.com/docker/go-connections/tlsconfig"
|
|
)
|
|
|
|
const (
|
|
defaultTimeOut = 30
|
|
)
|
|
|
|
// NewClient creates a new plugin client (http).
|
|
func NewClient(addr string, tlsConfig tlsconfig.Options) (*Client, error) {
|
|
tr := &http.Transport{}
|
|
|
|
c, err := tlsconfig.Client(tlsConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tr.TLSClientConfig = c
|
|
|
|
u, err := url.Parse(addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
socket := u.Host
|
|
if socket == "" {
|
|
// valid local socket addresses have the host empty.
|
|
socket = u.Path
|
|
}
|
|
if err := sockets.ConfigureTransport(tr, u.Scheme, socket); err != nil {
|
|
return nil, err
|
|
}
|
|
scheme := httpScheme(u)
|
|
|
|
clientTransport := transport.NewHTTPTransport(tr, scheme, socket)
|
|
return NewClientWithTransport(clientTransport), nil
|
|
}
|
|
|
|
// NewClientWithTransport creates a new plugin client with a given transport.
|
|
func NewClientWithTransport(tr transport.Transport) *Client {
|
|
return &Client{
|
|
http: &http.Client{
|
|
Transport: tr,
|
|
},
|
|
requestFactory: tr,
|
|
}
|
|
}
|
|
|
|
// Client represents a plugin client.
|
|
type Client struct {
|
|
http *http.Client // http client to use
|
|
requestFactory transport.RequestFactory
|
|
}
|
|
|
|
// Call calls the specified method with the specified arguments for the plugin.
|
|
// It will retry for 30 seconds if a failure occurs when calling.
|
|
func (c *Client) Call(serviceMethod string, args interface{}, ret interface{}) error {
|
|
var buf bytes.Buffer
|
|
if args != nil {
|
|
if err := json.NewEncoder(&buf).Encode(args); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
body, err := c.callWithRetry(serviceMethod, &buf, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer body.Close()
|
|
if ret != nil {
|
|
if err := json.NewDecoder(body).Decode(&ret); err != nil {
|
|
logrus.Errorf("%s: error reading plugin resp: %v", serviceMethod, err)
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Stream calls the specified method with the specified arguments for the plugin and returns the response body
|
|
func (c *Client) Stream(serviceMethod string, args interface{}) (io.ReadCloser, error) {
|
|
var buf bytes.Buffer
|
|
if err := json.NewEncoder(&buf).Encode(args); err != nil {
|
|
return nil, err
|
|
}
|
|
return c.callWithRetry(serviceMethod, &buf, true)
|
|
}
|
|
|
|
// SendFile calls the specified method, and passes through the IO stream
|
|
func (c *Client) SendFile(serviceMethod string, data io.Reader, ret interface{}) error {
|
|
body, err := c.callWithRetry(serviceMethod, data, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer body.Close()
|
|
if err := json.NewDecoder(body).Decode(&ret); err != nil {
|
|
logrus.Errorf("%s: error reading plugin resp: %v", serviceMethod, err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool) (io.ReadCloser, error) {
|
|
req, err := c.requestFactory.NewRequest(serviceMethod, data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var retries int
|
|
start := time.Now()
|
|
|
|
for {
|
|
resp, err := c.http.Do(req)
|
|
if err != nil {
|
|
if !retry {
|
|
return nil, err
|
|
}
|
|
|
|
timeOff := backoff(retries)
|
|
if abort(start, timeOff) {
|
|
return nil, err
|
|
}
|
|
retries++
|
|
logrus.Warnf("Unable to connect to plugin: %s:%s, retrying in %v", req.URL.Host, req.URL.Path, timeOff)
|
|
time.Sleep(timeOff)
|
|
continue
|
|
}
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
b, err := ioutil.ReadAll(resp.Body)
|
|
resp.Body.Close()
|
|
if err != nil {
|
|
return nil, &statusError{resp.StatusCode, serviceMethod, err.Error()}
|
|
}
|
|
|
|
// Plugins' Response(s) should have an Err field indicating what went
|
|
// wrong. Try to unmarshal into ResponseErr. Otherwise fallback to just
|
|
// return the string(body)
|
|
type responseErr struct {
|
|
Err string
|
|
}
|
|
remoteErr := responseErr{}
|
|
if err := json.Unmarshal(b, &remoteErr); err == nil {
|
|
if remoteErr.Err != "" {
|
|
return nil, &statusError{resp.StatusCode, serviceMethod, remoteErr.Err}
|
|
}
|
|
}
|
|
// old way...
|
|
return nil, &statusError{resp.StatusCode, serviceMethod, string(b)}
|
|
}
|
|
return resp.Body, nil
|
|
}
|
|
}
|
|
|
|
func backoff(retries int) time.Duration {
|
|
b, max := 1, defaultTimeOut
|
|
for b < max && retries > 0 {
|
|
b *= 2
|
|
retries--
|
|
}
|
|
if b > max {
|
|
b = max
|
|
}
|
|
return time.Duration(b) * time.Second
|
|
}
|
|
|
|
func abort(start time.Time, timeOff time.Duration) bool {
|
|
return timeOff+time.Since(start) >= time.Duration(defaultTimeOut)*time.Second
|
|
}
|
|
|
|
func httpScheme(u *url.URL) string {
|
|
scheme := u.Scheme
|
|
if scheme != "https" {
|
|
scheme = "http"
|
|
}
|
|
return scheme
|
|
}
|