Avoid stat round-trips when fetching a blob

Without this commit, three round-trips are required to fetch a blob with
a progress bar. The first is a call to Stat (HEAD request), to determine
the size. Then Open is called, which also calls Stat, and finally
performs a GET request.

Only the GET request is actually needed. The size of the blob can be
sniffed from Content-Length in the GET response.

This commit changes HTTPReadSeeker to automatically detect the size from
Content-Length instead of requiring it to be passed in. The Stat call is
removed from Open because it is no longer necessary.

HTTPReadSeeker now takes an additional errorHandler callback argument which
translates an unsuccessful HTTP response into an appropriate API-level
error. Using a callback for this makes it possible to avoid leaking the
repsonse body to Read's caller, which would make lifecycle management
problematic.

Fixes #1223

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
Aaron Lehmann 2015-11-30 18:35:19 -08:00
parent 261ed7fa98
commit 6beeb935cd
2 changed files with 65 additions and 34 deletions

View file

@ -391,17 +391,18 @@ func (bs *blobs) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
} }
func (bs *blobs) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { func (bs *blobs) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
stat, err := bs.statter.Stat(ctx, dgst) blobURL, err := bs.ub.BuildBlobURL(bs.name, dgst)
if err != nil { if err != nil {
return nil, err return nil, err
} }
blobURL, err := bs.ub.BuildBlobURL(bs.name, stat.Digest) return transport.NewHTTPReadSeeker(bs.client, blobURL,
if err != nil { func(resp *http.Response) error {
return nil, err if resp.StatusCode == http.StatusNotFound {
} return distribution.ErrBlobUnknown
}
return transport.NewHTTPReadSeeker(bs.client, blobURL, stat.Size), nil return handleErrorResponse(resp)
}), nil
} }
func (bs *blobs) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { func (bs *blobs) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {

View file

@ -2,11 +2,9 @@ package transport
import ( import (
"bufio" "bufio"
"bytes"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"os" "os"
) )
@ -21,11 +19,11 @@ type ReadSeekCloser interface {
// request. When seeking and starting a read from a non-zero offset // request. When seeking and starting a read from a non-zero offset
// the a "Range" header will be added which sets the offset. // the a "Range" header will be added which sets the offset.
// TODO(dmcgowan): Move this into a separate utility package // TODO(dmcgowan): Move this into a separate utility package
func NewHTTPReadSeeker(client *http.Client, url string, size int64) ReadSeekCloser { func NewHTTPReadSeeker(client *http.Client, url string, errorHandler func(*http.Response) error) ReadSeekCloser {
return &httpReadSeeker{ return &httpReadSeeker{
client: client, client: client,
url: url, url: url,
size: size, errorHandler: errorHandler,
} }
} }
@ -33,12 +31,26 @@ type httpReadSeeker struct {
client *http.Client client *http.Client
url string url string
// errorHandler creates an error from an unsuccessful HTTP response.
// This allows the error to be created with the HTTP response body
// without leaking the body through a returned error.
errorHandler func(*http.Response) error
size int64 size int64
rc io.ReadCloser // remote read closer // rc is the remote read closer.
brd *bufio.Reader // internal buffered io rc io.ReadCloser
offset int64 // brd is a buffer for internal buffered io.
err error brd *bufio.Reader
// readerOffset tracks the offset as of the last read.
readerOffset int64
// seekOffset allows Seek to override the offset. Seek changes
// seekOffset instead of changing readOffset directly so that
// connection resets can be delayed and possibly avoided if the
// seek is undone (i.e. seeking to the end and then back to the
// beginning).
seekOffset int64
err error
} }
func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) { func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) {
@ -46,16 +58,29 @@ func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) {
return 0, hrs.err return 0, hrs.err
} }
// If we seeked to a different position, we need to reset the
// connection. This logic is here instead of Seek so that if
// a seek is undone before the next read, the connection doesn't
// need to be closed and reopened. A common example of this is
// seeking to the end to determine the length, and then seeking
// back to the original position.
if hrs.readerOffset != hrs.seekOffset {
hrs.reset()
}
hrs.readerOffset = hrs.seekOffset
rd, err := hrs.reader() rd, err := hrs.reader()
if err != nil { if err != nil {
return 0, err return 0, err
} }
n, err = rd.Read(p) n, err = rd.Read(p)
hrs.offset += int64(n) hrs.seekOffset += int64(n)
hrs.readerOffset += int64(n)
// Simulate io.EOF error if we reach filesize. // Simulate io.EOF error if we reach filesize.
if err == nil && hrs.offset >= hrs.size { if err == nil && hrs.size >= 0 && hrs.readerOffset >= hrs.size {
err = io.EOF err = io.EOF
} }
@ -67,13 +92,20 @@ func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) {
return 0, hrs.err return 0, hrs.err
} }
var err error _, err := hrs.reader()
newOffset := hrs.offset if err != nil {
return 0, err
}
newOffset := hrs.seekOffset
switch whence { switch whence {
case os.SEEK_CUR: case os.SEEK_CUR:
newOffset += int64(offset) newOffset += int64(offset)
case os.SEEK_END: case os.SEEK_END:
if hrs.size < 0 {
return 0, errors.New("content length not known")
}
newOffset = hrs.size + int64(offset) newOffset = hrs.size + int64(offset)
case os.SEEK_SET: case os.SEEK_SET:
newOffset = int64(offset) newOffset = int64(offset)
@ -82,15 +114,10 @@ func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) {
if newOffset < 0 { if newOffset < 0 {
err = errors.New("cannot seek to negative position") err = errors.New("cannot seek to negative position")
} else { } else {
if hrs.offset != newOffset { hrs.seekOffset = newOffset
hrs.reset()
}
// No problems, set the offset.
hrs.offset = newOffset
} }
return hrs.offset, err return hrs.seekOffset, err
} }
func (hrs *httpReadSeeker) Close() error { func (hrs *httpReadSeeker) Close() error {
@ -130,17 +157,12 @@ func (hrs *httpReadSeeker) reader() (io.Reader, error) {
return hrs.brd, nil return hrs.brd, nil
} }
// If the offset is great than or equal to size, return a empty, noop reader.
if hrs.offset >= hrs.size {
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
}
req, err := http.NewRequest("GET", hrs.url, nil) req, err := http.NewRequest("GET", hrs.url, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if hrs.offset > 0 { if hrs.readerOffset > 0 {
// TODO(stevvooe): Get this working correctly. // TODO(stevvooe): Get this working correctly.
// If we are at different offset, issue a range request from there. // If we are at different offset, issue a range request from there.
@ -158,8 +180,16 @@ func (hrs *httpReadSeeker) reader() (io.Reader, error) {
// import // import
if resp.StatusCode >= 200 && resp.StatusCode <= 399 { if resp.StatusCode >= 200 && resp.StatusCode <= 399 {
hrs.rc = resp.Body hrs.rc = resp.Body
if resp.StatusCode == http.StatusOK {
hrs.size = resp.ContentLength
} else {
hrs.size = -1
}
} else { } else {
defer resp.Body.Close() defer resp.Body.Close()
if hrs.errorHandler != nil {
return nil, hrs.errorHandler(resp)
}
return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status) return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status)
} }