Initial implementation of registry LayerService

This change contains the initial implementation of the LayerService to power
layer push and pulls on the storagedriver. The interfaces presented in this
package will be used by the http application to drive most features around
efficient pulls and resumable pushes.

The file storage/layer.go defines the interface interactions. LayerService is
the root type and supports methods to access Layer and LayerUpload objects.
Pull operations are supported with LayerService.Fetch and push operations are
supported with LayerService.Upload and LayerService.Resume. Reads and writes of
layers are split between Layer and LayerUpload, respectively.

LayerService is implemented internally with the layerStore object, which takes
a storagedriver.StorageDriver and a pathMapper instance.

LayerUploadState is currently exported and will likely continue to be as the
interaction between it and layerUploadStore are better understood. Likely, the
layerUploadStore lifecycle and implementation will be deferred to the
application.

Image pushes pulls will be implemented in a similar manner without the
discrete, persistent upload.

Much of this change is in place to get something running and working. Caveats
of this change include the following:

1. Layer upload state storage is implemented on the local filesystem, separate
   from the storage driver. This must be replaced with using the proper backend
   and other state storage. This can be removed when we implement resumable
   hashing and tarsum calculations to avoid backend roundtrips.
2. Error handling is rather bespoke at this time. The http API implementation
   should really dictate the error return structure for the future, so we
   intend to refactor this heavily to support these errors. We'd also like to
   collect production data to understand how failures happen in the system as
   a while before moving to a particular edict around error handling.
3. The layerUploadStore, which manages layer upload storage and state is not
   currently exported. This will likely end up being split, with the file
   management portion being pointed at the storagedriver and the state storage
   elsewhere.
4. Access Control provisions are nearly completely missing from this change.
   There are details around how layerindex lookup works that are related with
   access controls. As the auth portions of the new API take shape, these
   provisions will become more clear.

Please see TODOs for details and individual recommendations.
This commit is contained in:
Stephen J Day 2014-11-17 16:29:42 -08:00
parent de4e976ef2
commit 2637e29e18
7 changed files with 1538 additions and 0 deletions

59
storage/digest.go Normal file
View File

@ -0,0 +1,59 @@
package storage
import (
"fmt"
"hash"
"strings"
)
// Digest allows simple protection of hex formatted digest strings, prefixed
// by their algorithm. Strings of type Digest have some guarantee of being in
// the correct format and it provides quick access to the components of a
// digest string.
//
// The following is an example of the contents of Digest types:
//
// sha256:7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc
//
type Digest string
// NewDigest returns a Digest from alg and a hash.Hash object.
func NewDigest(alg string, h hash.Hash) Digest {
return Digest(fmt.Sprintf("%s:%x", alg, h.Sum(nil)))
}
var (
// ErrDigestInvalidFormat returned when digest format invalid.
ErrDigestInvalidFormat = fmt.Errorf("invalid checksum digest format")
// ErrDigestUnsupported returned when the digest algorithm is unsupported by registry.
ErrDigestUnsupported = fmt.Errorf("unsupported digest algorithm")
)
// ParseDigest parses s and returns the validated digest object. An error will
// be returned if the format is invalid.
func ParseDigest(s string) (Digest, error) {
parts := strings.SplitN(s, ":", 2)
if len(parts) != 2 {
return "", ErrDigestInvalidFormat
}
switch parts[0] {
case "sha256":
break
default:
return "", ErrDigestUnsupported
}
return Digest(s), nil
}
// Algorithm returns the algorithm portion of the digest.
func (d Digest) Algorithm() string {
return strings.SplitN(string(d), ":", 2)[0]
}
// Hex returns the hex digest portion of the digest.
func (d Digest) Hex() string {
return strings.SplitN(string(d), ":", 2)[1]
}

96
storage/layer.go Normal file
View File

@ -0,0 +1,96 @@
package storage
import (
"fmt"
"io"
"time"
)
// LayerService provides operations on layer files in a backend storage.
type LayerService interface {
// Exists returns true if the layer exists.
Exists(tarSum string) (bool, error)
// Fetch the layer identifed by TarSum.
Fetch(tarSum string) (Layer, error)
// Upload begins a layer upload, returning a handle. If the layer upload
// is already in progress or the layer has already been uploaded, this
// will return an error.
Upload(name, tarSum string) (LayerUpload, error)
// Resume continues an in progress layer upload, returning the current
// state of the upload.
Resume(name, tarSum, uuid string) (LayerUpload, error)
}
// Layer provides a readable and seekable layer object. Typically,
// implementations are *not* goroutine safe.
type Layer interface {
// http.ServeContent requires an efficient implementation of
// ReadSeeker.Seek(0, os.SEEK_END).
io.ReadSeeker
io.Closer
// Name returns the repository under which this layer is linked.
Name() string // TODO(stevvooe): struggling with nomenclature: should this be "repo" or "name"?
// TarSum returns the unique tarsum of the layer.
TarSum() string
// CreatedAt returns the time this layer was created. Until we implement
// Stat call on storagedriver, this just returns the zero time.
CreatedAt() time.Time
}
// LayerUpload provides a handle for working with in-progress uploads.
// Instances can be obtained from the LayerService.Upload and
// LayerService.Resume.
type LayerUpload interface {
io.WriteCloser
// UUID returns the identifier for this upload.
UUID() string
// Name of the repository under which the layer will be linked.
Name() string
// TarSum identifier of the proposed layer. Resulting data must match this
// tarsum.
TarSum() string
// Offset returns the position of the last byte written to this layer.
Offset() int64
// Finish marks the upload as completed, returning a valid handle to the
// uploaded layer. The final size and checksum are validated against the
// contents of the uploaded layer. The checksum should be provided in the
// format <algorithm>:<hex digest>.
Finish(size int64, digest string) (Layer, error)
// Cancel the layer upload process.
Cancel() error
}
var (
// ErrLayerUnknown returned when layer cannot be found.
ErrLayerUnknown = fmt.Errorf("unknown layer")
// ErrLayerExists returned when layer already exists
ErrLayerExists = fmt.Errorf("layer exists")
// ErrLayerTarSumVersionUnsupported when tarsum is unsupported version.
ErrLayerTarSumVersionUnsupported = fmt.Errorf("unsupported tarsum version")
// ErrLayerUploadUnknown returned when upload is not found.
ErrLayerUploadUnknown = fmt.Errorf("layer upload unknown")
// ErrLayerInvalidChecksum returned when checksum/digest check fails.
ErrLayerInvalidChecksum = fmt.Errorf("invalid layer checksum")
// ErrLayerInvalidTarsum returned when tarsum check fails.
ErrLayerInvalidTarsum = fmt.Errorf("invalid layer tarsum")
// ErrLayerInvalidLength returned when length check fails.
ErrLayerInvalidLength = fmt.Errorf("invalid layer length")
)

450
storage/layer_test.go Normal file
View File

@ -0,0 +1,450 @@
package storage
import (
"archive/tar"
"bytes"
"crypto/rand"
"crypto/sha256"
"fmt"
"io"
"io/ioutil"
mrand "math/rand"
"os"
"testing"
"time"
"github.com/docker/docker/pkg/tarsum"
"github.com/docker/docker-registry/storagedriver"
"github.com/docker/docker-registry/storagedriver/inmemory"
)
// TestSimpleLayerUpload covers the layer upload process, exercising common
// error paths that might be seen during an upload.
func TestSimpleLayerUpload(t *testing.T) {
randomDataReader, tarSum, err := createRandomReader()
if err != nil {
t.Fatalf("error creating random reader: %v", err)
}
uploadStore, err := newTemporaryLocalFSLayerUploadStore()
if err != nil {
t.Fatalf("error allocating upload store: %v", err)
}
imageName := "foo/bar"
driver := inmemory.New()
ls := &layerStore{
driver: driver,
pathMapper: &pathMapper{
root: "/storage/testing",
version: storagePathVersion,
},
uploadStore: uploadStore,
}
h := sha256.New()
rd := io.TeeReader(randomDataReader, h)
layerUpload, err := ls.Upload(imageName, tarSum)
if err != nil {
t.Fatalf("unexpected error starting layer upload: %s", err)
}
// Cancel the upload then restart it
if err := layerUpload.Cancel(); err != nil {
t.Fatalf("unexpected error during upload cancellation: %v", err)
}
// Do a resume, get unknown upload
layerUpload, err = ls.Resume(imageName, tarSum, layerUpload.UUID())
if err != ErrLayerUploadUnknown {
t.Fatalf("unexpected error resuming upload, should be unkown: %v", err)
}
// Restart!
layerUpload, err = ls.Upload(imageName, tarSum)
if err != nil {
t.Fatalf("unexpected error starting layer upload: %s", err)
}
// Get the size of our random tarfile
randomDataSize, err := seekerSize(randomDataReader)
if err != nil {
t.Fatalf("error getting seeker size of random data: %v", err)
}
nn, err := io.Copy(layerUpload, rd)
if err != nil {
t.Fatalf("unexpected error uploading layer data: %v", err)
}
if nn != randomDataSize {
t.Fatalf("layer data write incomplete")
}
if layerUpload.Offset() != nn {
t.Fatalf("layerUpload not updated with correct offset: %v != %v", layerUpload.Offset(), nn)
}
layerUpload.Close()
// Do a resume, for good fun
layerUpload, err = ls.Resume(imageName, tarSum, layerUpload.UUID())
if err != nil {
t.Fatalf("unexpected error resuming upload: %v", err)
}
digest := NewDigest("sha256", h)
layer, err := layerUpload.Finish(randomDataSize, string(digest))
if err != nil {
t.Fatalf("unexpected error finishing layer upload: %v", err)
}
// After finishing an upload, it should no longer exist.
if _, err := ls.Resume(imageName, tarSum, layerUpload.UUID()); err != ErrLayerUploadUnknown {
t.Fatalf("expected layer upload to be unknown, got %v", err)
}
// Test for existence.
exists, err := ls.Exists(layer.TarSum())
if err != nil {
t.Fatalf("unexpected error checking for existence: %v", err)
}
if !exists {
t.Fatalf("layer should now exist")
}
h.Reset()
nn, err = io.Copy(h, layer)
if err != nil {
t.Fatalf("error reading layer: %v", err)
}
if nn != randomDataSize {
t.Fatalf("incorrect read length")
}
if NewDigest("sha256", h) != digest {
t.Fatalf("unexpected digest from uploaded layer: %q != %q", NewDigest("sha256", h), digest)
}
}
// TestSimpleLayerRead just creates a simple layer file and ensures that basic
// open, read, seek, read works. More specific edge cases should be covered in
// other tests.
func TestSimpleLayerRead(t *testing.T) {
imageName := "foo/bar"
driver := inmemory.New()
ls := &layerStore{
driver: driver,
pathMapper: &pathMapper{
root: "/storage/testing",
version: storagePathVersion,
},
}
randomLayerReader, tarSum, err := createRandomReader()
if err != nil {
t.Fatalf("error creating random data: %v", err)
}
// Test for existence.
exists, err := ls.Exists(tarSum)
if err != nil {
t.Fatalf("unexpected error checking for existence: %v", err)
}
if exists {
t.Fatalf("layer should not exist")
}
// Try to get the layer and make sure we get a not found error
layer, err := ls.Fetch(tarSum)
if err == nil {
t.Fatalf("error expected fetching unknown layer")
}
if err != ErrLayerUnknown {
t.Fatalf("unexpected error fetching non-existent layer: %v", err)
} else {
err = nil
}
randomLayerDigest, err := writeTestLayer(driver, ls.pathMapper, imageName, tarSum, randomLayerReader)
if err != nil {
t.Fatalf("unexpected error writing test layer: %v", err)
}
randomLayerSize, err := seekerSize(randomLayerReader)
if err != nil {
t.Fatalf("error getting seeker size for random layer: %v", err)
}
layer, err = ls.Fetch(tarSum)
if err != nil {
t.Fatal(err)
}
defer layer.Close()
// Now check the sha digest and ensure its the same
h := sha256.New()
nn, err := io.Copy(h, layer)
if err != nil && err != io.EOF {
t.Fatalf("unexpected error copying to hash: %v", err)
}
if nn != randomLayerSize {
t.Fatalf("stored incorrect number of bytes in layer: %d != %d", nn, randomLayerSize)
}
digest := NewDigest("sha256", h)
if digest != randomLayerDigest {
t.Fatalf("fetched digest does not match: %q != %q", digest, randomLayerDigest)
}
// Now seek back the layer, read the whole thing and check against randomLayerData
offset, err := layer.Seek(0, os.SEEK_SET)
if err != nil {
t.Fatalf("error seeking layer: %v", err)
}
if offset != 0 {
t.Fatalf("seek failed: expected 0 offset, got %d", offset)
}
p, err := ioutil.ReadAll(layer)
if err != nil {
t.Fatalf("error reading all of layer: %v", err)
}
if len(p) != int(randomLayerSize) {
t.Fatalf("layer data read has different length: %v != %v", len(p), randomLayerSize)
}
// Reset the randomLayerReader and read back the buffer
_, err = randomLayerReader.Seek(0, os.SEEK_SET)
if err != nil {
t.Fatalf("error resetting layer reader: %v", err)
}
randomLayerData, err := ioutil.ReadAll(randomLayerReader)
if err != nil {
t.Fatalf("random layer read failed: %v", err)
}
if !bytes.Equal(p, randomLayerData) {
t.Fatalf("layer data not equal")
}
}
func TestLayerReaderSeek(t *testing.T) {
// TODO(stevvooe): Ensure that all relative seeks work as advertised.
// Readers must close and re-open on command. This is important to support
// resumable and concurrent downloads via HTTP range requests.
}
// TestLayerReadErrors covers the various error return type for different
// conditions that can arise when reading a layer.
func TestLayerReadErrors(t *testing.T) {
// TODO(stevvooe): We need to cover error return types, driven by the
// errors returned via the HTTP API. For now, here is a incomplete list:
//
// 1. Layer Not Found: returned when layer is not found or access is
// denied.
// 2. Layer Unavailable: returned when link references are unresolved,
// but layer is known to the registry.
// 3. Layer Invalid: This may more split into more errors, but should be
// returned when name or tarsum does not reference a valid error. We
// may also need something to communication layer verification errors
// for the inline tarsum check.
// 4. Timeout: timeouts to backend. Need to better understand these
// failure cases and how the storage driver propagates these errors
// up the stack.
}
// writeRandomLayer creates a random layer under name and tarSum using driver
// and pathMapper. An io.ReadSeeker with the data is returned, along with the
// sha256 hex digest.
func writeRandomLayer(driver storagedriver.StorageDriver, pathMapper *pathMapper, name string) (rs io.ReadSeeker, tarSum string, digest Digest, err error) {
reader, tarSum, err := createRandomReader()
if err != nil {
return nil, "", "", err
}
// Now, actually create the layer.
randomLayerDigest, err := writeTestLayer(driver, pathMapper, name, tarSum, ioutil.NopCloser(reader))
if _, err := reader.Seek(0, os.SEEK_SET); err != nil {
return nil, "", "", err
}
return reader, tarSum, randomLayerDigest, err
}
// seekerSize seeks to the end of seeker, checks the size and returns it to
// the original state, returning the size. The state of the seeker should be
// treated as unknown if an error is returned.
func seekerSize(seeker io.ReadSeeker) (int64, error) {
current, err := seeker.Seek(0, os.SEEK_CUR)
if err != nil {
return 0, err
}
end, err := seeker.Seek(0, os.SEEK_END)
if err != nil {
return 0, err
}
resumed, err := seeker.Seek(current, os.SEEK_SET)
if err != nil {
return 0, err
}
if resumed != current {
return 0, fmt.Errorf("error returning seeker to original state, could not seek back to original location")
}
return end, nil
}
// createRandomReader returns a random read seeker and its tarsum. The
// returned content will be a valid tar file with a random number of files and
// content.
func createRandomReader() (rs io.ReadSeeker, tarSum string, err error) {
nFiles := mrand.Intn(10) + 10
target := &bytes.Buffer{}
wr := tar.NewWriter(target)
// Perturb this on each iteration of the loop below.
header := &tar.Header{
Mode: 0644,
ModTime: time.Now(),
Typeflag: tar.TypeReg,
Uname: "randocalrissian",
Gname: "cloudcity",
AccessTime: time.Now(),
ChangeTime: time.Now(),
}
for fileNumber := 0; fileNumber < nFiles; fileNumber++ {
fileSize := mrand.Int63n(1<<20) + 1<<20
header.Name = fmt.Sprint(fileNumber)
header.Size = fileSize
if err := wr.WriteHeader(header); err != nil {
return nil, "", err
}
randomData := make([]byte, fileSize)
// Fill up the buffer with some random data.
n, err := rand.Read(randomData)
if n != len(randomData) {
return nil, "", fmt.Errorf("short read creating random reader: %v bytes != %v bytes", n, len(randomData))
}
if err != nil {
return nil, "", err
}
nn, err := io.Copy(wr, bytes.NewReader(randomData))
if nn != fileSize {
return nil, "", fmt.Errorf("short copy writing random file to tar")
}
if err != nil {
return nil, "", err
}
if err := wr.Flush(); err != nil {
return nil, "", err
}
}
if err := wr.Close(); err != nil {
return nil, "", err
}
reader := bytes.NewReader(target.Bytes())
// A tar builder that supports tarsum inline calculation would be awesome
// here.
ts, err := tarsum.NewTarSum(reader, true, tarsum.Version1)
if err != nil {
return nil, "", err
}
nn, err := io.Copy(ioutil.Discard, ts)
if nn != int64(len(target.Bytes())) {
return nil, "", fmt.Errorf("short copy when getting tarsum of random layer: %v != %v", nn, len(target.Bytes()))
}
if err != nil {
return nil, "", err
}
return bytes.NewReader(target.Bytes()), ts.Sum(nil), nil
}
// createTestLayer creates a simple test layer in the provided driver under
// tarsum, returning the string digest. This is implemented peicemeal and
// should probably be replaced by the uploader when it's ready.
func writeTestLayer(driver storagedriver.StorageDriver, pathMapper *pathMapper, name, tarSum string, content io.Reader) (Digest, error) {
h := sha256.New()
rd := io.TeeReader(content, h)
p, err := ioutil.ReadAll(rd)
if err != nil {
return "", nil
}
digest := NewDigest("sha256", h)
blobPath, err := pathMapper.path(blobPathSpec{
alg: digest.Algorithm(),
digest: digest.Hex(),
})
if err := driver.PutContent(blobPath, p); err != nil {
return "", err
}
layerIndexLinkPath, err := pathMapper.path(layerIndexLinkPathSpec{
tarSum: tarSum,
})
if err != nil {
return "", err
}
layerLinkPath, err := pathMapper.path(layerLinkPathSpec{
name: name,
tarSum: tarSum,
})
if err != nil {
return "", err
}
if err != nil {
return "", err
}
if err := driver.PutContent(layerLinkPath, []byte(string(NewDigest("sha256", h)))); err != nil {
return "", nil
}
if err = driver.PutContent(layerIndexLinkPath, []byte(name)); err != nil {
return "", nil
}
return NewDigest("sha256", h), err
}

172
storage/layerreader.go Normal file
View File

@ -0,0 +1,172 @@
package storage
import (
"bufio"
"fmt"
"io"
"os"
"time"
)
// layerReadSeeker implements Layer and provides facilities for reading and
// seeking.
type layerReader struct {
layerStore *layerStore
rc io.ReadCloser
brd *bufio.Reader
name string // repo name of this layer
tarSum string
path string
createdAt time.Time
// offset is the current read offset
offset int64
// size is the total layer size, if available.
size int64
closedErr error // terminal error, if set, reader is closed
}
var _ Layer = &layerReader{}
func (lrs *layerReader) Name() string {
return lrs.name
}
func (lrs *layerReader) TarSum() string {
return lrs.tarSum
}
func (lrs *layerReader) CreatedAt() time.Time {
return lrs.createdAt
}
func (lrs *layerReader) Read(p []byte) (n int, err error) {
if err := lrs.closed(); err != nil {
return 0, err
}
rd, err := lrs.reader()
if err != nil {
return 0, err
}
n, err = rd.Read(p)
lrs.offset += int64(n)
// Simulate io.EOR error if we reach filesize.
if err == nil && lrs.offset >= lrs.size {
err = io.EOF
}
// TODO(stevvooe): More error checking is required here. If the reader
// times out for some reason, we should reset the reader so we re-open the
// connection.
return n, err
}
func (lrs *layerReader) Seek(offset int64, whence int) (int64, error) {
if err := lrs.closed(); err != nil {
return 0, err
}
var err error
newOffset := lrs.offset
switch whence {
case os.SEEK_CUR:
newOffset += int64(whence)
case os.SEEK_END:
newOffset = lrs.size + int64(whence)
case os.SEEK_SET:
newOffset = int64(whence)
}
if newOffset < 0 {
err = fmt.Errorf("cannot seek to negative position")
} else if newOffset >= lrs.size {
err = fmt.Errorf("cannot seek passed end of layer")
} else {
if lrs.offset != newOffset {
lrs.resetReader()
}
// No problems, set the offset.
lrs.offset = newOffset
}
return lrs.offset, err
}
// Close the layer. Should be called when the resource is no longer needed.
func (lrs *layerReader) Close() error {
if lrs.closedErr != nil {
return lrs.closedErr
}
// TODO(sday): Must export this error.
lrs.closedErr = fmt.Errorf("layer closed")
// close and release reader chain
if lrs.rc != nil {
lrs.rc.Close()
lrs.rc = nil
}
lrs.brd = nil
return lrs.closedErr
}
// reader prepares the current reader at the lrs offset, ensuring its buffered
// and ready to go.
func (lrs *layerReader) reader() (io.Reader, error) {
if err := lrs.closed(); err != nil {
return nil, err
}
if lrs.rc != nil {
return lrs.brd, nil
}
// If we don't have a reader, open one up.
rc, err := lrs.layerStore.driver.ReadStream(lrs.path, uint64(lrs.offset))
if err != nil {
return nil, err
}
lrs.rc = rc
if lrs.brd == nil {
// TODO(stevvooe): Set an optimal buffer size here. We'll have to
// understand the latency characteristics of the underlying network to
// set this correctly, so we may want to leave it to the driver. For
// out of process drivers, we'll have to optimize this buffer size for
// local communication.
lrs.brd = bufio.NewReader(lrs.rc)
} else {
lrs.brd.Reset(lrs.rc)
}
return lrs.brd, nil
}
// resetReader resets the reader, forcing the read method to open up a new
// connection and rebuild the buffered reader. This should be called when the
// offset and the reader will become out of sync, such as during a seek
// operation.
func (lrs *layerReader) resetReader() {
if err := lrs.closed(); err != nil {
return
}
if lrs.rc != nil {
lrs.rc.Close()
lrs.rc = nil
}
}
func (lrs *layerReader) closed() error {
return lrs.closedErr
}

203
storage/layerstore.go Normal file
View File

@ -0,0 +1,203 @@
package storage
import (
"fmt"
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker-registry/storagedriver"
)
type layerStore struct {
driver storagedriver.StorageDriver
pathMapper *pathMapper
uploadStore layerUploadStore
}
func (ls *layerStore) Exists(tarSum string) (bool, error) {
// Because this implementation just follows blob links, an existence check
// is pretty cheap by starting and closing a fetch.
_, err := ls.Fetch(tarSum)
if err != nil {
if err == ErrLayerUnknown {
return false, nil
}
return false, err
}
return true, nil
}
func (ls *layerStore) Fetch(tarSum string) (Layer, error) {
repos, err := ls.resolveContainingRepositories(tarSum)
if err != nil {
// TODO(stevvooe): Unknown tarsum error: need to wrap.
return nil, err
}
// TODO(stevvooe): Access control for layer pulls need to happen here: we
// have a list of repos that "own" the tarsum that need to be checked
// against the list of repos to which we have pull access. The argument
// repos needs to be filtered against that access list.
name, blobPath, err := ls.resolveBlobPath(repos, tarSum)
if err != nil {
// TODO(stevvooe): Map this error correctly, perhaps in the callee.
return nil, err
}
p, err := ls.pathMapper.path(blobPath)
if err != nil {
return nil, err
}
// Grab the size of the layer file, ensuring that it exists, among other
// things.
size, err := ls.driver.CurrentSize(p)
if err != nil {
// TODO(stevvooe): Handle blob/path does not exist here.
// TODO(stevvooe): Get a better understanding of the error cases here
// that don't stem from unknown path.
return nil, err
}
// Build the layer reader and return to the client.
layer := &layerReader{
layerStore: ls,
path: p,
name: name,
tarSum: tarSum,
// TODO(stevvooe): Storage backend does not support modification time
// queries yet. Layers "never" change, so just return the zero value.
createdAt: time.Time{},
offset: 0,
size: int64(size),
}
return layer, nil
}
// Upload begins a layer upload, returning a handle. If the layer upload
// is already in progress or the layer has already been uploaded, this
// will return an error.
func (ls *layerStore) Upload(name, tarSum string) (LayerUpload, error) {
exists, err := ls.Exists(tarSum)
if err != nil {
return nil, err
}
if exists {
// TODO(stevvoe): This looks simple now, but we really should only
// return the layer exists error when the layer exists AND the current
// client has access to the layer. If the client doesn't have access
// to the layer, the upload should proceed.
return nil, ErrLayerExists
}
// NOTE(stevvooe): Consider the issues with allowing concurrent upload of
// the same two layers. Should it be disallowed? For now, we allow both
// parties to proceed and the the first one uploads the layer.
lus, err := ls.uploadStore.New(name, tarSum)
if err != nil {
return nil, err
}
return ls.newLayerUpload(lus), nil
}
// Resume continues an in progress layer upload, returning the current
// state of the upload.
func (ls *layerStore) Resume(name, tarSum, uuid string) (LayerUpload, error) {
lus, err := ls.uploadStore.GetState(uuid)
if err != nil {
return nil, err
}
return ls.newLayerUpload(lus), nil
}
// newLayerUpload allocates a new upload controller with the given state.
func (ls *layerStore) newLayerUpload(lus LayerUploadState) LayerUpload {
return &layerUploadController{
LayerUploadState: lus,
layerStore: ls,
uploadStore: ls.uploadStore,
}
}
func (ls *layerStore) resolveContainingRepositories(tarSum string) ([]string, error) {
// Lookup the layer link in the index by tarsum id.
layerIndexLinkPath, err := ls.pathMapper.path(layerIndexLinkPathSpec{tarSum: tarSum})
if err != nil {
return nil, err
}
layerIndexLinkContent, err := ls.driver.GetContent(layerIndexLinkPath)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
return nil, ErrLayerUnknown
default:
return nil, err
}
}
results := strings.Split(string(layerIndexLinkContent), "\n")
// clean these up
for i, result := range results {
results[i] = strings.TrimSpace(result)
}
return results, nil
}
// resolveBlobId lookups up the tarSum in the various repos to find the blob
// link, returning the repo name and blob path spec or an error on failure.
func (ls *layerStore) resolveBlobPath(repos []string, tarSum string) (name string, bps blobPathSpec, err error) {
for _, repo := range repos {
pathSpec := layerLinkPathSpec{name: repo, tarSum: tarSum}
layerLinkPath, err := ls.pathMapper.path(pathSpec)
if err != nil {
// TODO(stevvooe): This looks very lazy, may want to collect these
// errors and report them if we exit this for loop without
// resolving the blob id.
logrus.Debugf("error building linkLayerPath (%V): %v", pathSpec, err)
continue
}
layerLinkContent, err := ls.driver.GetContent(layerLinkPath)
if err != nil {
logrus.Debugf("error getting layerLink content (%V): %v", pathSpec, err)
continue
}
// Yay! We've resolved our blob id and we're ready to go.
parts := strings.SplitN(strings.TrimSpace(string(layerLinkContent)), ":", 2)
if len(parts) != 2 {
return "", bps, fmt.Errorf("invalid blob reference: %q", string(layerLinkContent))
}
name = repo
bp := blobPathSpec{alg: parts[0], digest: parts[1]}
return repo, bp, nil
}
// TODO(stevvooe): Map this error to repo not found, but it basically
// means we exited the loop above without finding a blob link.
return "", bps, fmt.Errorf("unable to resolve blog id for repos=%v and tarSum=%q", repos, tarSum)
}

514
storage/layerupload.go Normal file
View File

@ -0,0 +1,514 @@
package storage
import (
"crypto/sha256"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"
"code.google.com/p/go-uuid/uuid"
"github.com/docker/docker-registry/storagedriver"
"github.com/docker/docker/pkg/tarsum"
"io"
)
// LayerUploadState captures the state serializable state of the layer upload.
type LayerUploadState struct {
// name is the primary repository under which the layer will be linked.
Name string
// tarSum identifies the target layer. Provided by the client. If the
// resulting tarSum does not match this value, an error should be
// returned.
TarSum string
// UUID identifies the upload.
UUID string
// offset contains the current progress of the upload.
Offset int64
}
// layerUploadController is used to control the various aspects of resumable
// layer upload. It implements the LayerUpload interface.
type layerUploadController struct {
LayerUploadState
layerStore *layerStore
uploadStore layerUploadStore
fp layerFile
err error // terminal error, if set, controller is closed
}
// layerFile documents the interface used while writing layer files, similar
// to *os.File. This is separate from layerReader, for now, because we want to
// store uploads on the local file system until we have write-through hashing
// support. They should be combined once this is worked out.
type layerFile interface {
io.WriteSeeker
io.Reader
io.Closer
// Sync commits the contents of the writer to storage.
Sync() (err error)
}
// layerUploadStore provides storage for temporary files and upload state of
// layers. This is be used by the LayerService to manage the state of ongoing
// uploads. This interface will definitely change and will most likely end up
// being exported to the app layer. Move the layer.go when it's ready to go.
type layerUploadStore interface {
New(name, tarSum string) (LayerUploadState, error)
Open(uuid string) (layerFile, error)
GetState(uuid string) (LayerUploadState, error)
SaveState(lus LayerUploadState) error
DeleteState(uuid string) error
}
var _ LayerUpload = &layerUploadController{}
// Name of the repository under which the layer will be linked.
func (luc *layerUploadController) Name() string {
return luc.LayerUploadState.Name
}
// TarSum identifier of the proposed layer. Resulting data must match this
// tarsum.
func (luc *layerUploadController) TarSum() string {
return luc.LayerUploadState.TarSum
}
// UUID returns the identifier for this upload.
func (luc *layerUploadController) UUID() string {
return luc.LayerUploadState.UUID
}
// Offset returns the position of the last byte written to this layer.
func (luc *layerUploadController) Offset() int64 {
return luc.LayerUploadState.Offset
}
// Finish marks the upload as completed, returning a valid handle to the
// uploaded layer. The final size and checksum are validated against the
// contents of the uploaded layer. The checksum should be provided in the
// format <algorithm>:<hex digest>.
func (luc *layerUploadController) Finish(size int64, digestStr string) (Layer, error) {
// This section is going to be pretty ugly now. We will have to read the
// file twice. First, to get the tarsum and checksum. When those are
// available, and validated, we will upload it to the blob store and link
// it into the repository. In the future, we need to use resumable hash
// calculations for tarsum and checksum that can be calculated during the
// upload. This will allow us to cut the data directly into a temporary
// directory in the storage backend.
fp, err := luc.file()
if err != nil {
// Cleanup?
return nil, err
}
digest, err := ParseDigest(digestStr)
if err != nil {
return nil, err
}
if err := luc.validateLayer(fp, size, digest); err != nil {
// Cleanup?
return nil, err
}
if err := luc.writeLayer(fp, size, digest); err != nil {
// Cleanup?
return nil, err
}
// Yes! We have written some layer data. Let's make it visible. Link the
// layer blob into the repository.
if err := luc.linkLayer(digest); err != nil {
return nil, err
}
// Ok, the upload has completed and finished. Delete the state.
if err := luc.uploadStore.DeleteState(luc.UUID()); err != nil {
// Can we ignore this error?
return nil, err
}
return luc.layerStore.Fetch(luc.TarSum())
}
// Cancel the layer upload process.
func (luc *layerUploadController) Cancel() error {
if err := luc.layerStore.uploadStore.DeleteState(luc.UUID()); err != nil {
return err
}
return luc.Close()
}
func (luc *layerUploadController) Write(p []byte) (int, error) {
wr, err := luc.file()
if err != nil {
return 0, err
}
n, err := wr.Write(p)
// Because we expect the reported offset to be consistent with the storage
// state, unfortunately, we need to Sync on every call to write.
if err := wr.Sync(); err != nil {
// Effectively, ignore the write state if the Sync fails. Report that
// no bytes were written and seek back to the starting offset.
offset, seekErr := wr.Seek(luc.Offset(), os.SEEK_SET)
if seekErr != nil {
// What do we do here? Quite disasterous.
luc.reset()
return 0, fmt.Errorf("multiple errors encounterd after Sync + Seek: %v then %v", err, seekErr)
}
if offset != luc.Offset() {
return 0, fmt.Errorf("unexpected offset after seek")
}
return 0, err
}
luc.LayerUploadState.Offset += int64(n)
if err := luc.uploadStore.SaveState(luc.LayerUploadState); err != nil {
// TODO(stevvooe): This failure case may require more thought.
return n, err
}
return n, err
}
func (luc *layerUploadController) Close() error {
if luc.err != nil {
return luc.err
}
if luc.fp != nil {
luc.err = luc.fp.Close()
}
return luc.err
}
func (luc *layerUploadController) file() (layerFile, error) {
if luc.fp != nil {
return luc.fp, nil
}
fp, err := luc.uploadStore.Open(luc.UUID())
if err != nil {
return nil, err
}
// TODO(stevvooe): We may need a more aggressive check here to ensure that
// the file length is equal to the current offset. We may want to sync the
// offset before return the layer upload to the client so it can be
// validated before proceeding with any writes.
// Seek to the current layer offset for good measure.
if _, err = fp.Seek(luc.Offset(), os.SEEK_SET); err != nil {
return nil, err
}
luc.fp = fp
return luc.fp, nil
}
// reset closes and drops the current writer.
func (luc *layerUploadController) reset() {
if luc.fp != nil {
luc.fp.Close()
luc.fp = nil
}
}
// validateLayer runs several checks on the layer file to ensure its validity.
// This is currently very expensive and relies on fast io and fast seek.
func (luc *layerUploadController) validateLayer(fp layerFile, size int64, digest Digest) error {
// First, seek to the end of the file, checking the size is as expected.
end, err := fp.Seek(0, os.SEEK_END)
if err != nil {
return err
}
if end != size {
return ErrLayerInvalidLength
}
// Now seek back to start and take care of tarsum and checksum.
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
return err
}
version, err := tarsum.GetVersionFromTarsum(luc.TarSum())
if err != nil {
return ErrLayerTarSumVersionUnsupported
}
// // We only support tarsum version 1 for now.
if version != tarsum.Version1 {
return ErrLayerTarSumVersionUnsupported
}
ts, err := tarsum.NewTarSum(fp, true, tarsum.Version1)
if err != nil {
return err
}
h := sha256.New()
// Pull the layer file through by writing it to a checksum.
nn, err := io.Copy(h, ts)
if nn != int64(size) {
return fmt.Errorf("bad read while finishing upload(%s) %v: %v != %v, err=%v", luc.UUID(), fp, nn, size, err)
}
if err != nil && err != io.EOF {
return err
}
calculatedDigest := NewDigest("sha256", h)
// Compare the digests!
if digest != calculatedDigest {
return ErrLayerInvalidChecksum
}
// Compare the tarsums!
if ts.Sum(nil) != luc.TarSum() {
return ErrLayerInvalidTarsum
}
return nil
}
// writeLayer actually writes the the layer file into its final destination.
// The layer should be validated before commencing the write.
func (luc *layerUploadController) writeLayer(fp layerFile, size int64, digest Digest) error {
blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{
alg: digest.Algorithm(),
digest: digest.Hex(),
})
if err != nil {
return err
}
// Check for existence
if _, err := luc.layerStore.driver.CurrentSize(blobPath); err != nil {
// TODO(stevvooe): This check is kind of problematic and very racy.
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // ensure that it doesn't exist.
default:
// TODO(stevvooe): This isn't actually an error: the blob store is
// content addressable and we should just use this to ensure we
// have it written. Although, we do need to verify that the
// content that is there is the correct length.
return err
}
}
// Seek our local layer file back now.
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
// Cleanup?
return err
}
// Okay: we can write the file to the blob store.
if err := luc.layerStore.driver.WriteStream(blobPath, 0, uint64(size), fp); err != nil {
return err
}
return nil
}
// linkLayer links a valid, written layer blog into the registry, first
// linking the repository namespace, then adding it to the layerindex.
func (luc *layerUploadController) linkLayer(digest Digest) error {
layerLinkPath, err := luc.layerStore.pathMapper.path(layerLinkPathSpec{
name: luc.Name(),
tarSum: luc.TarSum(),
})
if err != nil {
return err
}
if err := luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest)); err != nil {
return nil
}
// Link the layer into the name index.
layerIndexLinkPath, err := luc.layerStore.pathMapper.path(layerIndexLinkPathSpec{
tarSum: luc.TarSum(),
})
if err != nil {
return err
}
// Read back the name index file. If it exists, create it. If not, add the
// new repo to the name list.
// TODO(stevvooe): This is very racy, as well. Reconsider using list for
// this operation?
layerIndexLinkContent, err := luc.layerStore.driver.GetContent(layerIndexLinkPath)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
layerIndexLinkContent = []byte(luc.Name())
default:
return err
}
}
layerIndexLinkContent = luc.maybeAddNameToLayerIndexLinkContent(layerIndexLinkContent)
// Write the index content back to the index.
return luc.layerStore.driver.PutContent(layerIndexLinkPath, layerIndexLinkContent)
}
func (luc *layerUploadController) maybeAddNameToLayerIndexLinkContent(content []byte) []byte {
names := strings.Split(string(content), "\n")
var found bool
// Search the names and find ours
for _, name := range names {
if name == luc.Name() {
found = true
}
}
if !found {
names = append(names, luc.Name())
}
sort.Strings(names)
return []byte(strings.Join(names, "\n"))
}
// localFSLayerUploadStore implements a local layerUploadStore. There are some
// complexities around hashsums that make round tripping to the storage
// backend problematic, so we'll store and read locally for now. By GO-beta,
// this should be fully implemented on top of the backend storagedriver.
//
// For now, the directory layout is as follows:
//
// /<temp dir>/registry-layer-upload/
// <uuid>/
// -> state.json
// -> data
//
// Each upload, identified by uuid, has its own directory with a state file
// and a data file. The state file has a json representation of the current
// state. The data file is the in-progress upload data.
type localFSLayerUploadStore struct {
root string
}
func newTemporaryLocalFSLayerUploadStore() (layerUploadStore, error) {
path, err := ioutil.TempDir("", "registry-layer-upload")
if err != nil {
return nil, err
}
return &localFSLayerUploadStore{
root: path,
}, nil
}
func (llufs *localFSLayerUploadStore) New(name, tarSum string) (LayerUploadState, error) {
lus := LayerUploadState{
Name: name,
TarSum: tarSum,
UUID: uuid.New(),
}
if err := os.Mkdir(llufs.path(lus.UUID, ""), 0755); err != nil {
return lus, err
}
return lus, nil
}
func (llufs *localFSLayerUploadStore) Open(uuid string) (layerFile, error) {
fp, err := os.OpenFile(llufs.path(uuid, "data"), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
if err != nil {
return nil, err
}
return fp, nil
}
func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) {
// TODO(stevvoe): Storing this state on the local file system is an
// intermediate stop gap. This technique is unlikely to handle any kind of
// concurrency very well.
var lus LayerUploadState
fp, err := os.Open(llufs.path(uuid, "state.json"))
if err != nil {
if os.IsNotExist(err) {
return lus, ErrLayerUploadUnknown
}
return lus, err
}
defer fp.Close()
dec := json.NewDecoder(fp)
if err := dec.Decode(&lus); err != nil {
return lus, err
}
return lus, nil
}
func (llufs *localFSLayerUploadStore) SaveState(lus LayerUploadState) error {
p, err := json.Marshal(lus)
if err != nil {
return err
}
err = ioutil.WriteFile(llufs.path(lus.UUID, "state.json"), p, 0644)
if os.IsNotExist(err) {
return ErrLayerUploadUnknown
}
return err
}
func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error {
if err := os.RemoveAll(llufs.path(uuid, "")); err != nil {
if os.IsNotExist(err) {
return ErrLayerUploadUnknown
}
return err
}
return nil
}
func (llufs *localFSLayerUploadStore) path(uuid, file string) string {
return filepath.Join(llufs.root, uuid, file)
}

44
storage/services.go Normal file
View File

@ -0,0 +1,44 @@
package storage
import (
"github.com/docker/docker-registry/storagedriver"
)
// Services provides various services with application-level operations for
// use across backend storage drivers.
type Services struct {
driver storagedriver.StorageDriver
pathMapper *pathMapper
layerUploadStore layerUploadStore
}
// NewServices creates a new Services object to access docker objects stored
// in the underlying driver.
func NewServices(driver storagedriver.StorageDriver) *Services {
layerUploadStore, err := newTemporaryLocalFSLayerUploadStore()
if err != nil {
// TODO(stevvooe): This failure needs to be understood in the context
// of the lifecycle of the services object, which is uncertain at this
// point.
panic("unable to allocate layerUploadStore: " + err.Error())
}
return &Services{
driver: driver,
pathMapper: &pathMapper{
// TODO(sday): This should be configurable.
root: "/docker/registry/",
version: storagePathVersion,
},
layerUploadStore: layerUploadStore,
}
}
// Layers returns an instance of the LayerService. Instantiation is cheap and
// may be context sensitive in the future. The instance should be used similar
// to a request local.
func (ss *Services) Layers() LayerService {
return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper}
}