Merge pull request #121 from stevvooe/address-layer-upload-errors

Address server errors received during layer upload
This commit is contained in:
Olivier Gambier 2015-02-03 11:48:34 -08:00
commit 092dadde6d
13 changed files with 395 additions and 225 deletions

View file

@ -2,14 +2,23 @@ package storage
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"time"
"github.com/docker/distribution/storagedriver"
)
// TODO(stevvooe): Set an optimal buffer size here. We'll have to
// understand the latency characteristics of the underlying network to
// set this correctly, so we may want to leave it to the driver. For
// out of process drivers, we'll have to optimize this buffer size for
// local communication.
const fileReaderBufferSize = 4 << 20
// remoteFileReader provides a read seeker interface to files stored in
// storagedriver. Used to implement part of layer interface and will be used
// to implement read side of LayerUpload.
@ -28,24 +37,40 @@ type fileReader struct {
err error // terminal error, if set, reader is closed
}
// newFileReader initializes a file reader for the remote file. The read takes
// on the offset and size at the time the reader is created. If the underlying
// file changes, one must create a new fileReader.
func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) {
rd := &fileReader{
driver: driver,
path: path,
}
// Grab the size of the layer file, ensuring existence.
fi, err := driver.Stat(path)
if fi, err := driver.Stat(path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// NOTE(stevvooe): We really don't care if the file is not
// actually present for the reader. If the caller needs to know
// whether or not the file exists, they should issue a stat call
// on the path. There is still no guarantee, since the file may be
// gone by the time the reader is created. The only correct
// behavior is to return a reader that immediately returns EOF.
default:
// Any other error we want propagated up the stack.
return nil, err
}
} else {
if fi.IsDir() {
return nil, fmt.Errorf("cannot read a directory")
}
if err != nil {
return nil, err
// Fill in file information
rd.size = fi.Size()
rd.modtime = fi.ModTime()
}
if fi.IsDir() {
return nil, fmt.Errorf("cannot read a directory")
}
return &fileReader{
driver: driver,
path: path,
size: fi.Size(),
modtime: fi.ModTime(),
}, nil
return rd, nil
}
func (fr *fileReader) Read(p []byte) (n int, err error) {
@ -88,8 +113,6 @@ func (fr *fileReader) Seek(offset int64, whence int) (int64, error) {
if newOffset < 0 {
err = fmt.Errorf("cannot seek to negative position")
} else if newOffset > fr.size {
err = fmt.Errorf("cannot seek passed end of file")
} else {
if fr.offset != newOffset {
fr.reset()
@ -134,9 +157,17 @@ func (fr *fileReader) reader() (io.Reader, error) {
// If we don't have a reader, open one up.
rc, err := fr.driver.ReadStream(fr.path, fr.offset)
if err != nil {
return nil, err
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// NOTE(stevvooe): If the path is not found, we simply return a
// reader that returns io.EOF. However, we do not set fr.rc,
// allowing future attempts at getting a reader to possibly
// succeed if the file turns up later.
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
default:
return nil, err
}
}
fr.rc = rc
@ -147,7 +178,7 @@ func (fr *fileReader) reader() (io.Reader, error) {
// set this correctly, so we may want to leave it to the driver. For
// out of process drivers, we'll have to optimize this buffer size for
// local communication.
fr.brd = bufio.NewReader(fr.rc)
fr.brd = bufio.NewReaderSize(fr.rc, fileReaderBufferSize)
} else {
fr.brd.Reset(fr.rc)
}

View file

@ -124,7 +124,7 @@ func TestFileReaderSeek(t *testing.T) {
t.Fatalf("expected to seek to end: %v != %v", end, len(content))
}
// 4. Seek past end and before start, ensure error.
// 4. Seek before start, ensure error.
// seek before start
before, err := fr.Seek(-1, os.SEEK_SET)
@ -132,9 +132,44 @@ func TestFileReaderSeek(t *testing.T) {
t.Fatalf("error expected, returned offset=%v", before)
}
after, err := fr.Seek(int64(len(content)+1), os.SEEK_END)
if err == nil {
t.Fatalf("error expected, returned offset=%v", after)
// 5. Seek after end,
after, err := fr.Seek(1, os.SEEK_END)
if err != nil {
t.Fatalf("unexpected error expected, returned offset=%v", after)
}
p := make([]byte, 16)
n, err := fr.Read(p)
if n != 0 {
t.Fatalf("bytes reads %d != %d", n, 0)
}
if err != io.EOF {
t.Fatalf("expected io.EOF, got %v", err)
}
}
// TestFileReaderNonExistentFile ensures the reader behaves as expected with a
// missing or zero-length remote file. While the file may not exist, the
// reader should not error out on creation and should return 0-bytes from the
// read method, with an io.EOF error.
func TestFileReaderNonExistentFile(t *testing.T) {
driver := inmemory.New()
fr, err := newFileReader(driver, "/doesnotexist")
if err != nil {
t.Fatalf("unexpected error initializing reader: %v", err)
}
var buf [1024]byte
n, err := fr.Read(buf[:])
if n != 0 {
t.Fatalf("non-zero byte read reported: %d != 0", n)
}
if err != io.EOF {
t.Fatalf("read on missing file should return io.EOF, got %v", err)
}
}

View file

@ -99,9 +99,6 @@ func (fw *fileWriter) Seek(offset int64, whence int) (int64, error) {
if newOffset < 0 {
err = fmt.Errorf("cannot seek to negative position")
} else if newOffset > fw.size {
fw.offset = newOffset
fw.size = newOffset
} else {
// No problems, set the offset.
fw.offset = newOffset

View file

@ -80,17 +80,10 @@ func (err ErrUnknownLayer) Error() string {
// ErrLayerInvalidDigest returned when tarsum check fails.
type ErrLayerInvalidDigest struct {
Digest digest.Digest
Reason error
}
func (err ErrLayerInvalidDigest) Error() string {
return fmt.Sprintf("invalid digest for referenced layer: %v", err.Digest)
}
// ErrLayerInvalidSize returned when length check fails.
type ErrLayerInvalidSize struct {
Size int64
}
func (err ErrLayerInvalidSize) Error() string {
return fmt.Sprintf("invalid layer size: %d", err.Size)
return fmt.Sprintf("invalid digest for referenced layer: %v, %v",
err.Digest, err.Reason)
}

View file

@ -235,6 +235,40 @@ func TestSimpleLayerRead(t *testing.T) {
}
}
// TestLayerUploadZeroLength uploads zero-length
func TestLayerUploadZeroLength(t *testing.T) {
imageName := "foo/bar"
driver := inmemory.New()
registry := NewRegistryWithDriver(driver)
ls := registry.Repository(imageName).Layers()
upload, err := ls.Upload()
if err != nil {
t.Fatalf("unexpected error starting upload: %v", err)
}
io.Copy(upload, bytes.NewReader([]byte{}))
dgst, err := digest.FromTarArchive(bytes.NewReader([]byte{}))
if err != nil {
t.Fatalf("error getting zero digest: %v", err)
}
if dgst != digest.DigestTarSumV1EmptyTar {
// sanity check on zero digest
t.Fatalf("digest not as expected: %v != %v", dgst, digest.DigestTarSumV1EmptyTar)
}
layer, err := upload.Finish(dgst)
if err != nil {
t.Fatalf("unexpected error finishing upload: %v", err)
}
if layer.Digest() != dgst {
t.Fatalf("unexpected digest: %v != %v", layer.Digest(), dgst)
}
}
// writeRandomLayer creates a random layer under name and tarSum using driver
// and pathMapper. An io.ReadSeeker with the data is returned, along with the
// sha256 hex digest.

View file

@ -1,6 +1,7 @@
package storage
import (
"fmt"
"io"
"path"
"time"
@ -89,7 +90,10 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige
case tarsum.Version1:
default:
// version 0 and dev, for now.
return "", ErrLayerTarSumVersionUnsupported
return "", ErrLayerInvalidDigest{
Digest: dgst,
Reason: ErrLayerTarSumVersionUnsupported,
}
}
digestVerifier := digest.NewDigestVerifier(dgst)
@ -117,7 +121,10 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige
}
if !digestVerifier.Verified() {
return "", ErrLayerInvalidDigest{Digest: dgst}
return "", ErrLayerInvalidDigest{
Digest: dgst,
Reason: fmt.Errorf("content does not match digest"),
}
}
return canonical, nil
@ -136,7 +143,7 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
}
// Check for existence
if _, err := luc.layerStore.repository.registry.driver.Stat(blobPath); err != nil {
if _, err := luc.driver.Stat(blobPath); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // ensure that it doesn't exist.
@ -151,6 +158,31 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
return nil
}
// If no data was received, we may not actually have a file on disk. Check
// the size here and write a zero-length file to blobPath if this is the
// case. For the most part, this should only ever happen with zero-length
// tars.
if _, err := luc.driver.Stat(luc.path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// HACK(stevvooe): This is slightly dangerous: if we verify above,
// get a hash, then the underlying file is deleted, we risk moving
// a zero-length blob into a nonzero-length blob location. To
// prevent this horrid thing, we employ the hack of only allowing
// to this happen for the zero tarsum.
if dgst == digest.DigestTarSumV1EmptyTar {
return luc.driver.PutContent(blobPath, []byte{})
}
// We let this fail during the move below.
logrus.
WithField("upload.uuid", luc.UUID()).
WithField("digest", dgst).Warnf("attempted to move zero-length content with non-zero digest")
default:
return err // unrelated error
}
}
return luc.driver.Move(luc.path, blobPath)
}

View file

@ -57,8 +57,6 @@ func (rs *revisionStore) get(revision digest.Digest) (*manifest.SignedManifest,
return nil, err
}
logrus.Infof("retrieved signatures: %v", string(signatures[0]))
jsig, err := libtrust.NewJSONSignature(content, signatures...)
if err != nil {
return nil, err