Merge pull request #13161 from calavera/plugin_discovery

Proposal: Volume refactor and external volume plugins
This commit is contained in:
Arnaud Porterie 2015-05-23 18:44:18 -07:00
commit f7309796db
2 changed files with 59 additions and 5 deletions

View file

@ -31,6 +31,10 @@ type Client struct {
} }
func (c *Client) Call(serviceMethod string, args interface{}, ret interface{}) error { func (c *Client) Call(serviceMethod string, args interface{}, ret interface{}) error {
return c.callWithRetry(serviceMethod, args, ret, true)
}
func (c *Client) callWithRetry(serviceMethod string, args interface{}, ret interface{}, retry bool) error {
var buf bytes.Buffer var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(args); err != nil { if err := json.NewEncoder(&buf).Encode(args); err != nil {
return err return err
@ -50,12 +54,16 @@ func (c *Client) Call(serviceMethod string, args interface{}, ret interface{}) e
for { for {
resp, err := c.http.Do(req) resp, err := c.http.Do(req)
if err != nil { if err != nil {
if !retry {
return err
}
timeOff := backoff(retries) timeOff := backoff(retries)
if timeOff+time.Since(start) > defaultTimeOut { if abort(start, timeOff) {
return err return err
} }
retries++ retries++
logrus.Warn("Unable to connect to plugin: %s, retrying in %ds\n", c.addr, timeOff) logrus.Warnf("Unable to connect to plugin: %s, retrying in %v", c.addr, timeOff)
time.Sleep(timeOff) time.Sleep(timeOff)
continue continue
} }
@ -73,7 +81,7 @@ func (c *Client) Call(serviceMethod string, args interface{}, ret interface{}) e
} }
func backoff(retries int) time.Duration { func backoff(retries int) time.Duration {
b, max := float64(1), float64(defaultTimeOut) b, max := 1, defaultTimeOut
for b < max && retries > 0 { for b < max && retries > 0 {
b *= 2 b *= 2
retries-- retries--
@ -81,7 +89,11 @@ func backoff(retries int) time.Duration {
if b > max { if b > max {
b = max b = max
} }
return time.Duration(b) return time.Duration(b) * time.Second
}
func abort(start time.Time, timeOff time.Duration) bool {
return timeOff+time.Since(start) > time.Duration(defaultTimeOut)*time.Second
} }
func configureTCPTransport(tr *http.Transport, proto, addr string) { func configureTCPTransport(tr *http.Transport, proto, addr string) {

View file

@ -6,6 +6,7 @@ import (
"net/http/httptest" "net/http/httptest"
"reflect" "reflect"
"testing" "testing"
"time"
) )
var ( var (
@ -27,7 +28,7 @@ func teardownRemotePluginServer() {
func TestFailedConnection(t *testing.T) { func TestFailedConnection(t *testing.T) {
c := NewClient("tcp://127.0.0.1:1") c := NewClient("tcp://127.0.0.1:1")
err := c.Call("Service.Method", nil, nil) err := c.callWithRetry("Service.Method", nil, nil, false)
if err == nil { if err == nil {
t.Fatal("Unexpected successful connection") t.Fatal("Unexpected successful connection")
} }
@ -61,3 +62,44 @@ func TestEchoInputOutput(t *testing.T) {
t.Fatalf("Expected %v, was %v\n", m, output) t.Fatalf("Expected %v, was %v\n", m, output)
} }
} }
func TestBackoff(t *testing.T) {
cases := []struct {
retries int
expTimeOff time.Duration
}{
{0, time.Duration(1)},
{1, time.Duration(2)},
{2, time.Duration(4)},
{4, time.Duration(16)},
{6, time.Duration(30)},
{10, time.Duration(30)},
}
for _, c := range cases {
s := c.expTimeOff * time.Second
if d := backoff(c.retries); d != s {
t.Fatalf("Retry %v, expected %v, was %v\n", c.retries, s, d)
}
}
}
func TestAbortRetry(t *testing.T) {
cases := []struct {
timeOff time.Duration
expAbort bool
}{
{time.Duration(1), false},
{time.Duration(2), false},
{time.Duration(10), false},
{time.Duration(30), true},
{time.Duration(40), true},
}
for _, c := range cases {
s := c.timeOff * time.Second
if a := abort(time.Now(), s); a != c.expAbort {
t.Fatalf("Duration %v, expected %v, was %v\n", c.timeOff, s, a)
}
}
}