diff --git a/plugins/client.go b/plugins/client.go index 54c386e..add1361 100644 --- a/plugins/client.go +++ b/plugins/client.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "strings" @@ -58,19 +59,41 @@ type Client struct { // Call calls the specified method with the specified arguments for the plugin. // It will retry for 30 seconds if a failure occurs when calling. func (c *Client) Call(serviceMethod string, args interface{}, ret interface{}) error { - return c.callWithRetry(serviceMethod, args, ret, true) -} - -func (c *Client) callWithRetry(serviceMethod string, args interface{}, ret interface{}, retry bool) error { var buf bytes.Buffer if err := json.NewEncoder(&buf).Encode(args); err != nil { return err } - - req, err := http.NewRequest("POST", "/"+serviceMethod, &buf) + body, err := c.callWithRetry(serviceMethod, &buf, true) if err != nil { return err } + defer body.Close() + return json.NewDecoder(body).Decode(&ret) +} + +// Stream calls the specified method with the specified arguments for the plugin and returns the response body +func (c *Client) Stream(serviceMethod string, args interface{}) (io.ReadCloser, error) { + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(args); err != nil { + return nil, err + } + return c.callWithRetry(serviceMethod, &buf, true) +} + +// SendFile calls the specified method, and passes through the IO stream +func (c *Client) SendFile(serviceMethod string, data io.Reader, ret interface{}) error { + body, err := c.callWithRetry(serviceMethod, data, true) + if err != nil { + return err + } + return json.NewDecoder(body).Decode(&ret) +} + +func (c *Client) callWithRetry(serviceMethod string, data io.Reader, retry bool) (io.ReadCloser, error) { + req, err := http.NewRequest("POST", "/"+serviceMethod, data) + if err != nil { + return nil, err + } req.Header.Add("Accept", versionMimetype) req.URL.Scheme = c.scheme req.URL.Host = c.addr @@ -82,12 +105,12 @@ func (c *Client) callWithRetry(serviceMethod string, args interface{}, ret inter resp, err := c.http.Do(req) if err != nil { if !retry { - return err + return nil, err } timeOff := backoff(retries) if abort(start, timeOff) { - return err + return nil, err } retries++ logrus.Warnf("Unable to connect to plugin: %s, retrying in %v", c.addr, timeOff) @@ -95,16 +118,14 @@ func (c *Client) callWithRetry(serviceMethod string, args interface{}, ret inter continue } - defer resp.Body.Close() if resp.StatusCode != http.StatusOK { remoteErr, err := ioutil.ReadAll(resp.Body) if err != nil { - return &remoteError{err.Error(), serviceMethod} + return nil, &remoteError{err.Error(), serviceMethod} } - return &remoteError{string(remoteErr), serviceMethod} + return nil, &remoteError{string(remoteErr), serviceMethod} } - - return json.NewDecoder(resp.Body).Decode(&ret) + return resp.Body, nil } } diff --git a/plugins/client_test.go b/plugins/client_test.go index 60f1263..4fed491 100644 --- a/plugins/client_test.go +++ b/plugins/client_test.go @@ -30,7 +30,7 @@ func teardownRemotePluginServer() { func TestFailedConnection(t *testing.T) { c, _ := NewClient("tcp://127.0.0.1:1", tlsconfig.Options{InsecureSkipVerify: true}) - err := c.callWithRetry("Service.Method", nil, nil, false) + _, err := c.callWithRetry("Service.Method", nil, false) if err == nil { t.Fatal("Unexpected successful connection") }