Avoid stat round-trips when fetching a blob
Without this commit, three round-trips are required to fetch a blob with a progress bar. The first is a call to Stat (HEAD request), to determine the size. Then Open is called, which also calls Stat, and finally performs a GET request. Only the GET request is actually needed. The size of the blob can be sniffed from Content-Length in the GET response. This commit changes HTTPReadSeeker to automatically detect the size from Content-Length instead of requiring it to be passed in. The Stat call is removed from Open because it is no longer necessary. HTTPReadSeeker now takes an additional errorHandler callback argument which translates an unsuccessful HTTP response into an appropriate API-level error. Using a callback for this makes it possible to avoid leaking the repsonse body to Read's caller, which would make lifecycle management problematic. Fixes #1223 Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
parent
c0d094a72a
commit
bf2cc0a9d6
2 changed files with 65 additions and 34 deletions
|
@ -391,17 +391,18 @@ func (bs *blobs) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *blobs) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
|
func (bs *blobs) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
|
||||||
stat, err := bs.statter.Stat(ctx, dgst)
|
blobURL, err := bs.ub.BuildBlobURL(bs.name, dgst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
blobURL, err := bs.ub.BuildBlobURL(bs.name, stat.Digest)
|
return transport.NewHTTPReadSeeker(bs.client, blobURL,
|
||||||
if err != nil {
|
func(resp *http.Response) error {
|
||||||
return nil, err
|
if resp.StatusCode == http.StatusNotFound {
|
||||||
|
return distribution.ErrBlobUnknown
|
||||||
}
|
}
|
||||||
|
return handleErrorResponse(resp)
|
||||||
return transport.NewHTTPReadSeeker(bs.client, blobURL, stat.Size), nil
|
}), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *blobs) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
|
func (bs *blobs) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
|
||||||
|
|
|
@ -2,11 +2,9 @@ package transport
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
|
@ -21,11 +19,11 @@ type ReadSeekCloser interface {
|
||||||
// request. When seeking and starting a read from a non-zero offset
|
// request. When seeking and starting a read from a non-zero offset
|
||||||
// the a "Range" header will be added which sets the offset.
|
// the a "Range" header will be added which sets the offset.
|
||||||
// TODO(dmcgowan): Move this into a separate utility package
|
// TODO(dmcgowan): Move this into a separate utility package
|
||||||
func NewHTTPReadSeeker(client *http.Client, url string, size int64) ReadSeekCloser {
|
func NewHTTPReadSeeker(client *http.Client, url string, errorHandler func(*http.Response) error) ReadSeekCloser {
|
||||||
return &httpReadSeeker{
|
return &httpReadSeeker{
|
||||||
client: client,
|
client: client,
|
||||||
url: url,
|
url: url,
|
||||||
size: size,
|
errorHandler: errorHandler,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,11 +31,25 @@ type httpReadSeeker struct {
|
||||||
client *http.Client
|
client *http.Client
|
||||||
url string
|
url string
|
||||||
|
|
||||||
|
// errorHandler creates an error from an unsuccessful HTTP response.
|
||||||
|
// This allows the error to be created with the HTTP response body
|
||||||
|
// without leaking the body through a returned error.
|
||||||
|
errorHandler func(*http.Response) error
|
||||||
|
|
||||||
size int64
|
size int64
|
||||||
|
|
||||||
rc io.ReadCloser // remote read closer
|
// rc is the remote read closer.
|
||||||
brd *bufio.Reader // internal buffered io
|
rc io.ReadCloser
|
||||||
offset int64
|
// brd is a buffer for internal buffered io.
|
||||||
|
brd *bufio.Reader
|
||||||
|
// readerOffset tracks the offset as of the last read.
|
||||||
|
readerOffset int64
|
||||||
|
// seekOffset allows Seek to override the offset. Seek changes
|
||||||
|
// seekOffset instead of changing readOffset directly so that
|
||||||
|
// connection resets can be delayed and possibly avoided if the
|
||||||
|
// seek is undone (i.e. seeking to the end and then back to the
|
||||||
|
// beginning).
|
||||||
|
seekOffset int64
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,16 +58,29 @@ func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) {
|
||||||
return 0, hrs.err
|
return 0, hrs.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we seeked to a different position, we need to reset the
|
||||||
|
// connection. This logic is here instead of Seek so that if
|
||||||
|
// a seek is undone before the next read, the connection doesn't
|
||||||
|
// need to be closed and reopened. A common example of this is
|
||||||
|
// seeking to the end to determine the length, and then seeking
|
||||||
|
// back to the original position.
|
||||||
|
if hrs.readerOffset != hrs.seekOffset {
|
||||||
|
hrs.reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
hrs.readerOffset = hrs.seekOffset
|
||||||
|
|
||||||
rd, err := hrs.reader()
|
rd, err := hrs.reader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err = rd.Read(p)
|
n, err = rd.Read(p)
|
||||||
hrs.offset += int64(n)
|
hrs.seekOffset += int64(n)
|
||||||
|
hrs.readerOffset += int64(n)
|
||||||
|
|
||||||
// Simulate io.EOF error if we reach filesize.
|
// Simulate io.EOF error if we reach filesize.
|
||||||
if err == nil && hrs.offset >= hrs.size {
|
if err == nil && hrs.size >= 0 && hrs.readerOffset >= hrs.size {
|
||||||
err = io.EOF
|
err = io.EOF
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,13 +92,20 @@ func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) {
|
||||||
return 0, hrs.err
|
return 0, hrs.err
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
_, err := hrs.reader()
|
||||||
newOffset := hrs.offset
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
newOffset := hrs.seekOffset
|
||||||
|
|
||||||
switch whence {
|
switch whence {
|
||||||
case os.SEEK_CUR:
|
case os.SEEK_CUR:
|
||||||
newOffset += int64(offset)
|
newOffset += int64(offset)
|
||||||
case os.SEEK_END:
|
case os.SEEK_END:
|
||||||
|
if hrs.size < 0 {
|
||||||
|
return 0, errors.New("content length not known")
|
||||||
|
}
|
||||||
newOffset = hrs.size + int64(offset)
|
newOffset = hrs.size + int64(offset)
|
||||||
case os.SEEK_SET:
|
case os.SEEK_SET:
|
||||||
newOffset = int64(offset)
|
newOffset = int64(offset)
|
||||||
|
@ -82,15 +114,10 @@ func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) {
|
||||||
if newOffset < 0 {
|
if newOffset < 0 {
|
||||||
err = errors.New("cannot seek to negative position")
|
err = errors.New("cannot seek to negative position")
|
||||||
} else {
|
} else {
|
||||||
if hrs.offset != newOffset {
|
hrs.seekOffset = newOffset
|
||||||
hrs.reset()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// No problems, set the offset.
|
return hrs.seekOffset, err
|
||||||
hrs.offset = newOffset
|
|
||||||
}
|
|
||||||
|
|
||||||
return hrs.offset, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hrs *httpReadSeeker) Close() error {
|
func (hrs *httpReadSeeker) Close() error {
|
||||||
|
@ -130,17 +157,12 @@ func (hrs *httpReadSeeker) reader() (io.Reader, error) {
|
||||||
return hrs.brd, nil
|
return hrs.brd, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the offset is great than or equal to size, return a empty, noop reader.
|
|
||||||
if hrs.offset >= hrs.size {
|
|
||||||
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", hrs.url, nil)
|
req, err := http.NewRequest("GET", hrs.url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if hrs.offset > 0 {
|
if hrs.readerOffset > 0 {
|
||||||
// TODO(stevvooe): Get this working correctly.
|
// TODO(stevvooe): Get this working correctly.
|
||||||
|
|
||||||
// If we are at different offset, issue a range request from there.
|
// If we are at different offset, issue a range request from there.
|
||||||
|
@ -158,8 +180,16 @@ func (hrs *httpReadSeeker) reader() (io.Reader, error) {
|
||||||
// import
|
// import
|
||||||
if resp.StatusCode >= 200 && resp.StatusCode <= 399 {
|
if resp.StatusCode >= 200 && resp.StatusCode <= 399 {
|
||||||
hrs.rc = resp.Body
|
hrs.rc = resp.Body
|
||||||
|
if resp.StatusCode == http.StatusOK {
|
||||||
|
hrs.size = resp.ContentLength
|
||||||
|
} else {
|
||||||
|
hrs.size = -1
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
if hrs.errorHandler != nil {
|
||||||
|
return nil, hrs.errorHandler(resp)
|
||||||
|
}
|
||||||
return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status)
|
return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue