Improved push and pull with upload manager and download manager

This commit adds a transfer manager which deduplicates and schedules
transfers, and also an upload manager and download manager that build on
top of the transfer manager to provide high-level interfaces for uploads
and downloads. The push and pull code is modified to use these building
blocks.

Some benefits of the changes:

- Simplification of push/pull code
- Pushes can upload layers concurrently
- Failed downloads and uploads are retried after backoff delays
- Cancellation is supported, but individual transfers will only be
  cancelled if all pushes or pulls using them are cancelled.
- The distribution code is decoupled from Docker Engine packages and API
  conventions (i.e. streamformatter), which will make it easier to split
  out.

This commit also includes unit tests for the new distribution/xfer
package. The tests cover 87.8% of the statements in the package.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
Aaron Lehmann 2015-11-13 16:59:01 -08:00
parent 03778bd1d2
commit 00cca12e77

View file

@ -17,7 +17,6 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/distribution/reference" "github.com/docker/distribution/reference"
@ -270,7 +269,6 @@ func (r *Session) GetRemoteImageJSON(imgID, registry string) ([]byte, int64, err
// GetRemoteImageLayer retrieves an image layer from the registry // GetRemoteImageLayer retrieves an image layer from the registry
func (r *Session) GetRemoteImageLayer(imgID, registry string, imgSize int64) (io.ReadCloser, error) { func (r *Session) GetRemoteImageLayer(imgID, registry string, imgSize int64) (io.ReadCloser, error) {
var ( var (
retries = 5
statusCode = 0 statusCode = 0
res *http.Response res *http.Response
err error err error
@ -281,14 +279,9 @@ func (r *Session) GetRemoteImageLayer(imgID, registry string, imgSize int64) (io
if err != nil { if err != nil {
return nil, fmt.Errorf("Error while getting from the server: %v", err) return nil, fmt.Errorf("Error while getting from the server: %v", err)
} }
// TODO(tiborvass): why are we doing retries at this level?
// These retries should be generic to both v1 and v2
for i := 1; i <= retries; i++ {
statusCode = 0 statusCode = 0
res, err = r.client.Do(req) res, err = r.client.Do(req)
if err == nil { if err != nil {
break
}
logrus.Debugf("Error contacting registry %s: %v", registry, err) logrus.Debugf("Error contacting registry %s: %v", registry, err)
if res != nil { if res != nil {
if res.Body != nil { if res.Body != nil {
@ -296,12 +289,9 @@ func (r *Session) GetRemoteImageLayer(imgID, registry string, imgSize int64) (io
} }
statusCode = res.StatusCode statusCode = res.StatusCode
} }
if i == retries {
return nil, fmt.Errorf("Server error: Status %d while fetching image layer (%s)", return nil, fmt.Errorf("Server error: Status %d while fetching image layer (%s)",
statusCode, imgID) statusCode, imgID)
} }
time.Sleep(time.Duration(i) * 5 * time.Second)
}
if res.StatusCode != 200 { if res.StatusCode != 200 {
res.Body.Close() res.Body.Close()