Merge pull request #741 from stevvooe/layer-service
Initial implementation of registry LayerService
This commit is contained in:
commit
9dc6fa3765
7 changed files with 1538 additions and 0 deletions
59
storage/digest.go
Normal file
59
storage/digest.go
Normal file
|
@ -0,0 +1,59 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"hash"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Digest allows simple protection of hex formatted digest strings, prefixed
|
||||||
|
// by their algorithm. Strings of type Digest have some guarantee of being in
|
||||||
|
// the correct format and it provides quick access to the components of a
|
||||||
|
// digest string.
|
||||||
|
//
|
||||||
|
// The following is an example of the contents of Digest types:
|
||||||
|
//
|
||||||
|
// sha256:7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc
|
||||||
|
//
|
||||||
|
type Digest string
|
||||||
|
|
||||||
|
// NewDigest returns a Digest from alg and a hash.Hash object.
|
||||||
|
func NewDigest(alg string, h hash.Hash) Digest {
|
||||||
|
return Digest(fmt.Sprintf("%s:%x", alg, h.Sum(nil)))
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrDigestInvalidFormat returned when digest format invalid.
|
||||||
|
ErrDigestInvalidFormat = fmt.Errorf("invalid checksum digest format")
|
||||||
|
|
||||||
|
// ErrDigestUnsupported returned when the digest algorithm is unsupported by registry.
|
||||||
|
ErrDigestUnsupported = fmt.Errorf("unsupported digest algorithm")
|
||||||
|
)
|
||||||
|
|
||||||
|
// ParseDigest parses s and returns the validated digest object. An error will
|
||||||
|
// be returned if the format is invalid.
|
||||||
|
func ParseDigest(s string) (Digest, error) {
|
||||||
|
parts := strings.SplitN(s, ":", 2)
|
||||||
|
if len(parts) != 2 {
|
||||||
|
return "", ErrDigestInvalidFormat
|
||||||
|
}
|
||||||
|
|
||||||
|
switch parts[0] {
|
||||||
|
case "sha256":
|
||||||
|
break
|
||||||
|
default:
|
||||||
|
return "", ErrDigestUnsupported
|
||||||
|
}
|
||||||
|
|
||||||
|
return Digest(s), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Algorithm returns the algorithm portion of the digest.
|
||||||
|
func (d Digest) Algorithm() string {
|
||||||
|
return strings.SplitN(string(d), ":", 2)[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hex returns the hex digest portion of the digest.
|
||||||
|
func (d Digest) Hex() string {
|
||||||
|
return strings.SplitN(string(d), ":", 2)[1]
|
||||||
|
}
|
96
storage/layer.go
Normal file
96
storage/layer.go
Normal file
|
@ -0,0 +1,96 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// LayerService provides operations on layer files in a backend storage.
|
||||||
|
type LayerService interface {
|
||||||
|
// Exists returns true if the layer exists.
|
||||||
|
Exists(tarSum string) (bool, error)
|
||||||
|
|
||||||
|
// Fetch the layer identifed by TarSum.
|
||||||
|
Fetch(tarSum string) (Layer, error)
|
||||||
|
|
||||||
|
// Upload begins a layer upload, returning a handle. If the layer upload
|
||||||
|
// is already in progress or the layer has already been uploaded, this
|
||||||
|
// will return an error.
|
||||||
|
Upload(name, tarSum string) (LayerUpload, error)
|
||||||
|
|
||||||
|
// Resume continues an in progress layer upload, returning the current
|
||||||
|
// state of the upload.
|
||||||
|
Resume(name, tarSum, uuid string) (LayerUpload, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Layer provides a readable and seekable layer object. Typically,
|
||||||
|
// implementations are *not* goroutine safe.
|
||||||
|
type Layer interface {
|
||||||
|
// http.ServeContent requires an efficient implementation of
|
||||||
|
// ReadSeeker.Seek(0, os.SEEK_END).
|
||||||
|
io.ReadSeeker
|
||||||
|
io.Closer
|
||||||
|
|
||||||
|
// Name returns the repository under which this layer is linked.
|
||||||
|
Name() string // TODO(stevvooe): struggling with nomenclature: should this be "repo" or "name"?
|
||||||
|
|
||||||
|
// TarSum returns the unique tarsum of the layer.
|
||||||
|
TarSum() string
|
||||||
|
|
||||||
|
// CreatedAt returns the time this layer was created. Until we implement
|
||||||
|
// Stat call on storagedriver, this just returns the zero time.
|
||||||
|
CreatedAt() time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// LayerUpload provides a handle for working with in-progress uploads.
|
||||||
|
// Instances can be obtained from the LayerService.Upload and
|
||||||
|
// LayerService.Resume.
|
||||||
|
type LayerUpload interface {
|
||||||
|
io.WriteCloser
|
||||||
|
|
||||||
|
// UUID returns the identifier for this upload.
|
||||||
|
UUID() string
|
||||||
|
|
||||||
|
// Name of the repository under which the layer will be linked.
|
||||||
|
Name() string
|
||||||
|
|
||||||
|
// TarSum identifier of the proposed layer. Resulting data must match this
|
||||||
|
// tarsum.
|
||||||
|
TarSum() string
|
||||||
|
|
||||||
|
// Offset returns the position of the last byte written to this layer.
|
||||||
|
Offset() int64
|
||||||
|
|
||||||
|
// Finish marks the upload as completed, returning a valid handle to the
|
||||||
|
// uploaded layer. The final size and checksum are validated against the
|
||||||
|
// contents of the uploaded layer. The checksum should be provided in the
|
||||||
|
// format <algorithm>:<hex digest>.
|
||||||
|
Finish(size int64, digest string) (Layer, error)
|
||||||
|
|
||||||
|
// Cancel the layer upload process.
|
||||||
|
Cancel() error
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrLayerUnknown returned when layer cannot be found.
|
||||||
|
ErrLayerUnknown = fmt.Errorf("unknown layer")
|
||||||
|
|
||||||
|
// ErrLayerExists returned when layer already exists
|
||||||
|
ErrLayerExists = fmt.Errorf("layer exists")
|
||||||
|
|
||||||
|
// ErrLayerTarSumVersionUnsupported when tarsum is unsupported version.
|
||||||
|
ErrLayerTarSumVersionUnsupported = fmt.Errorf("unsupported tarsum version")
|
||||||
|
|
||||||
|
// ErrLayerUploadUnknown returned when upload is not found.
|
||||||
|
ErrLayerUploadUnknown = fmt.Errorf("layer upload unknown")
|
||||||
|
|
||||||
|
// ErrLayerInvalidChecksum returned when checksum/digest check fails.
|
||||||
|
ErrLayerInvalidChecksum = fmt.Errorf("invalid layer checksum")
|
||||||
|
|
||||||
|
// ErrLayerInvalidTarsum returned when tarsum check fails.
|
||||||
|
ErrLayerInvalidTarsum = fmt.Errorf("invalid layer tarsum")
|
||||||
|
|
||||||
|
// ErrLayerInvalidLength returned when length check fails.
|
||||||
|
ErrLayerInvalidLength = fmt.Errorf("invalid layer length")
|
||||||
|
)
|
450
storage/layer_test.go
Normal file
450
storage/layer_test.go
Normal file
|
@ -0,0 +1,450 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"archive/tar"
|
||||||
|
"bytes"
|
||||||
|
"crypto/rand"
|
||||||
|
"crypto/sha256"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
mrand "math/rand"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/docker/docker/pkg/tarsum"
|
||||||
|
|
||||||
|
"github.com/docker/docker-registry/storagedriver"
|
||||||
|
"github.com/docker/docker-registry/storagedriver/inmemory"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestSimpleLayerUpload covers the layer upload process, exercising common
|
||||||
|
// error paths that might be seen during an upload.
|
||||||
|
func TestSimpleLayerUpload(t *testing.T) {
|
||||||
|
randomDataReader, tarSum, err := createRandomReader()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error creating random reader: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
uploadStore, err := newTemporaryLocalFSLayerUploadStore()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error allocating upload store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
imageName := "foo/bar"
|
||||||
|
driver := inmemory.New()
|
||||||
|
|
||||||
|
ls := &layerStore{
|
||||||
|
driver: driver,
|
||||||
|
pathMapper: &pathMapper{
|
||||||
|
root: "/storage/testing",
|
||||||
|
version: storagePathVersion,
|
||||||
|
},
|
||||||
|
uploadStore: uploadStore,
|
||||||
|
}
|
||||||
|
|
||||||
|
h := sha256.New()
|
||||||
|
rd := io.TeeReader(randomDataReader, h)
|
||||||
|
|
||||||
|
layerUpload, err := ls.Upload(imageName, tarSum)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error starting layer upload: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel the upload then restart it
|
||||||
|
if err := layerUpload.Cancel(); err != nil {
|
||||||
|
t.Fatalf("unexpected error during upload cancellation: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do a resume, get unknown upload
|
||||||
|
layerUpload, err = ls.Resume(imageName, tarSum, layerUpload.UUID())
|
||||||
|
if err != ErrLayerUploadUnknown {
|
||||||
|
t.Fatalf("unexpected error resuming upload, should be unkown: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restart!
|
||||||
|
layerUpload, err = ls.Upload(imageName, tarSum)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error starting layer upload: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the size of our random tarfile
|
||||||
|
randomDataSize, err := seekerSize(randomDataReader)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error getting seeker size of random data: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
nn, err := io.Copy(layerUpload, rd)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error uploading layer data: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if nn != randomDataSize {
|
||||||
|
t.Fatalf("layer data write incomplete")
|
||||||
|
}
|
||||||
|
|
||||||
|
if layerUpload.Offset() != nn {
|
||||||
|
t.Fatalf("layerUpload not updated with correct offset: %v != %v", layerUpload.Offset(), nn)
|
||||||
|
}
|
||||||
|
layerUpload.Close()
|
||||||
|
|
||||||
|
// Do a resume, for good fun
|
||||||
|
layerUpload, err = ls.Resume(imageName, tarSum, layerUpload.UUID())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error resuming upload: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
digest := NewDigest("sha256", h)
|
||||||
|
layer, err := layerUpload.Finish(randomDataSize, string(digest))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error finishing layer upload: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// After finishing an upload, it should no longer exist.
|
||||||
|
if _, err := ls.Resume(imageName, tarSum, layerUpload.UUID()); err != ErrLayerUploadUnknown {
|
||||||
|
t.Fatalf("expected layer upload to be unknown, got %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test for existence.
|
||||||
|
exists, err := ls.Exists(layer.TarSum())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error checking for existence: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
t.Fatalf("layer should now exist")
|
||||||
|
}
|
||||||
|
|
||||||
|
h.Reset()
|
||||||
|
nn, err = io.Copy(h, layer)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error reading layer: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if nn != randomDataSize {
|
||||||
|
t.Fatalf("incorrect read length")
|
||||||
|
}
|
||||||
|
|
||||||
|
if NewDigest("sha256", h) != digest {
|
||||||
|
t.Fatalf("unexpected digest from uploaded layer: %q != %q", NewDigest("sha256", h), digest)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestSimpleLayerRead just creates a simple layer file and ensures that basic
|
||||||
|
// open, read, seek, read works. More specific edge cases should be covered in
|
||||||
|
// other tests.
|
||||||
|
func TestSimpleLayerRead(t *testing.T) {
|
||||||
|
imageName := "foo/bar"
|
||||||
|
driver := inmemory.New()
|
||||||
|
ls := &layerStore{
|
||||||
|
driver: driver,
|
||||||
|
pathMapper: &pathMapper{
|
||||||
|
root: "/storage/testing",
|
||||||
|
version: storagePathVersion,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
randomLayerReader, tarSum, err := createRandomReader()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error creating random data: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test for existence.
|
||||||
|
exists, err := ls.Exists(tarSum)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error checking for existence: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if exists {
|
||||||
|
t.Fatalf("layer should not exist")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to get the layer and make sure we get a not found error
|
||||||
|
layer, err := ls.Fetch(tarSum)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("error expected fetching unknown layer")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != ErrLayerUnknown {
|
||||||
|
t.Fatalf("unexpected error fetching non-existent layer: %v", err)
|
||||||
|
} else {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
randomLayerDigest, err := writeTestLayer(driver, ls.pathMapper, imageName, tarSum, randomLayerReader)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error writing test layer: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
randomLayerSize, err := seekerSize(randomLayerReader)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error getting seeker size for random layer: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
layer, err = ls.Fetch(tarSum)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer layer.Close()
|
||||||
|
|
||||||
|
// Now check the sha digest and ensure its the same
|
||||||
|
h := sha256.New()
|
||||||
|
nn, err := io.Copy(h, layer)
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
t.Fatalf("unexpected error copying to hash: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if nn != randomLayerSize {
|
||||||
|
t.Fatalf("stored incorrect number of bytes in layer: %d != %d", nn, randomLayerSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
digest := NewDigest("sha256", h)
|
||||||
|
if digest != randomLayerDigest {
|
||||||
|
t.Fatalf("fetched digest does not match: %q != %q", digest, randomLayerDigest)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now seek back the layer, read the whole thing and check against randomLayerData
|
||||||
|
offset, err := layer.Seek(0, os.SEEK_SET)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error seeking layer: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if offset != 0 {
|
||||||
|
t.Fatalf("seek failed: expected 0 offset, got %d", offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
p, err := ioutil.ReadAll(layer)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error reading all of layer: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(p) != int(randomLayerSize) {
|
||||||
|
t.Fatalf("layer data read has different length: %v != %v", len(p), randomLayerSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset the randomLayerReader and read back the buffer
|
||||||
|
_, err = randomLayerReader.Seek(0, os.SEEK_SET)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error resetting layer reader: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
randomLayerData, err := ioutil.ReadAll(randomLayerReader)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("random layer read failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(p, randomLayerData) {
|
||||||
|
t.Fatalf("layer data not equal")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLayerReaderSeek(t *testing.T) {
|
||||||
|
// TODO(stevvooe): Ensure that all relative seeks work as advertised.
|
||||||
|
// Readers must close and re-open on command. This is important to support
|
||||||
|
// resumable and concurrent downloads via HTTP range requests.
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestLayerReadErrors covers the various error return type for different
|
||||||
|
// conditions that can arise when reading a layer.
|
||||||
|
func TestLayerReadErrors(t *testing.T) {
|
||||||
|
// TODO(stevvooe): We need to cover error return types, driven by the
|
||||||
|
// errors returned via the HTTP API. For now, here is a incomplete list:
|
||||||
|
//
|
||||||
|
// 1. Layer Not Found: returned when layer is not found or access is
|
||||||
|
// denied.
|
||||||
|
// 2. Layer Unavailable: returned when link references are unresolved,
|
||||||
|
// but layer is known to the registry.
|
||||||
|
// 3. Layer Invalid: This may more split into more errors, but should be
|
||||||
|
// returned when name or tarsum does not reference a valid error. We
|
||||||
|
// may also need something to communication layer verification errors
|
||||||
|
// for the inline tarsum check.
|
||||||
|
// 4. Timeout: timeouts to backend. Need to better understand these
|
||||||
|
// failure cases and how the storage driver propagates these errors
|
||||||
|
// up the stack.
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeRandomLayer creates a random layer under name and tarSum using driver
|
||||||
|
// and pathMapper. An io.ReadSeeker with the data is returned, along with the
|
||||||
|
// sha256 hex digest.
|
||||||
|
func writeRandomLayer(driver storagedriver.StorageDriver, pathMapper *pathMapper, name string) (rs io.ReadSeeker, tarSum string, digest Digest, err error) {
|
||||||
|
reader, tarSum, err := createRandomReader()
|
||||||
|
if err != nil {
|
||||||
|
return nil, "", "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now, actually create the layer.
|
||||||
|
randomLayerDigest, err := writeTestLayer(driver, pathMapper, name, tarSum, ioutil.NopCloser(reader))
|
||||||
|
|
||||||
|
if _, err := reader.Seek(0, os.SEEK_SET); err != nil {
|
||||||
|
return nil, "", "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return reader, tarSum, randomLayerDigest, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// seekerSize seeks to the end of seeker, checks the size and returns it to
|
||||||
|
// the original state, returning the size. The state of the seeker should be
|
||||||
|
// treated as unknown if an error is returned.
|
||||||
|
func seekerSize(seeker io.ReadSeeker) (int64, error) {
|
||||||
|
current, err := seeker.Seek(0, os.SEEK_CUR)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
end, err := seeker.Seek(0, os.SEEK_END)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resumed, err := seeker.Seek(current, os.SEEK_SET)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resumed != current {
|
||||||
|
return 0, fmt.Errorf("error returning seeker to original state, could not seek back to original location")
|
||||||
|
}
|
||||||
|
|
||||||
|
return end, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// createRandomReader returns a random read seeker and its tarsum. The
|
||||||
|
// returned content will be a valid tar file with a random number of files and
|
||||||
|
// content.
|
||||||
|
func createRandomReader() (rs io.ReadSeeker, tarSum string, err error) {
|
||||||
|
nFiles := mrand.Intn(10) + 10
|
||||||
|
target := &bytes.Buffer{}
|
||||||
|
wr := tar.NewWriter(target)
|
||||||
|
|
||||||
|
// Perturb this on each iteration of the loop below.
|
||||||
|
header := &tar.Header{
|
||||||
|
Mode: 0644,
|
||||||
|
ModTime: time.Now(),
|
||||||
|
Typeflag: tar.TypeReg,
|
||||||
|
Uname: "randocalrissian",
|
||||||
|
Gname: "cloudcity",
|
||||||
|
AccessTime: time.Now(),
|
||||||
|
ChangeTime: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
for fileNumber := 0; fileNumber < nFiles; fileNumber++ {
|
||||||
|
fileSize := mrand.Int63n(1<<20) + 1<<20
|
||||||
|
|
||||||
|
header.Name = fmt.Sprint(fileNumber)
|
||||||
|
header.Size = fileSize
|
||||||
|
|
||||||
|
if err := wr.WriteHeader(header); err != nil {
|
||||||
|
return nil, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
randomData := make([]byte, fileSize)
|
||||||
|
|
||||||
|
// Fill up the buffer with some random data.
|
||||||
|
n, err := rand.Read(randomData)
|
||||||
|
|
||||||
|
if n != len(randomData) {
|
||||||
|
return nil, "", fmt.Errorf("short read creating random reader: %v bytes != %v bytes", n, len(randomData))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
nn, err := io.Copy(wr, bytes.NewReader(randomData))
|
||||||
|
if nn != fileSize {
|
||||||
|
return nil, "", fmt.Errorf("short copy writing random file to tar")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := wr.Flush(); err != nil {
|
||||||
|
return nil, "", err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := wr.Close(); err != nil {
|
||||||
|
return nil, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
reader := bytes.NewReader(target.Bytes())
|
||||||
|
|
||||||
|
// A tar builder that supports tarsum inline calculation would be awesome
|
||||||
|
// here.
|
||||||
|
ts, err := tarsum.NewTarSum(reader, true, tarsum.Version1)
|
||||||
|
if err != nil {
|
||||||
|
return nil, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
nn, err := io.Copy(ioutil.Discard, ts)
|
||||||
|
if nn != int64(len(target.Bytes())) {
|
||||||
|
return nil, "", fmt.Errorf("short copy when getting tarsum of random layer: %v != %v", nn, len(target.Bytes()))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return bytes.NewReader(target.Bytes()), ts.Sum(nil), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// createTestLayer creates a simple test layer in the provided driver under
|
||||||
|
// tarsum, returning the string digest. This is implemented peicemeal and
|
||||||
|
// should probably be replaced by the uploader when it's ready.
|
||||||
|
func writeTestLayer(driver storagedriver.StorageDriver, pathMapper *pathMapper, name, tarSum string, content io.Reader) (Digest, error) {
|
||||||
|
h := sha256.New()
|
||||||
|
rd := io.TeeReader(content, h)
|
||||||
|
|
||||||
|
p, err := ioutil.ReadAll(rd)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
digest := NewDigest("sha256", h)
|
||||||
|
|
||||||
|
blobPath, err := pathMapper.path(blobPathSpec{
|
||||||
|
alg: digest.Algorithm(),
|
||||||
|
digest: digest.Hex(),
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := driver.PutContent(blobPath, p); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
layerIndexLinkPath, err := pathMapper.path(layerIndexLinkPathSpec{
|
||||||
|
tarSum: tarSum,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
layerLinkPath, err := pathMapper.path(layerLinkPathSpec{
|
||||||
|
name: name,
|
||||||
|
tarSum: tarSum,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := driver.PutContent(layerLinkPath, []byte(string(NewDigest("sha256", h)))); err != nil {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = driver.PutContent(layerIndexLinkPath, []byte(name)); err != nil {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return NewDigest("sha256", h), err
|
||||||
|
}
|
172
storage/layerreader.go
Normal file
172
storage/layerreader.go
Normal file
|
@ -0,0 +1,172 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// layerReadSeeker implements Layer and provides facilities for reading and
|
||||||
|
// seeking.
|
||||||
|
type layerReader struct {
|
||||||
|
layerStore *layerStore
|
||||||
|
rc io.ReadCloser
|
||||||
|
brd *bufio.Reader
|
||||||
|
|
||||||
|
name string // repo name of this layer
|
||||||
|
tarSum string
|
||||||
|
path string
|
||||||
|
createdAt time.Time
|
||||||
|
|
||||||
|
// offset is the current read offset
|
||||||
|
offset int64
|
||||||
|
|
||||||
|
// size is the total layer size, if available.
|
||||||
|
size int64
|
||||||
|
|
||||||
|
closedErr error // terminal error, if set, reader is closed
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Layer = &layerReader{}
|
||||||
|
|
||||||
|
func (lrs *layerReader) Name() string {
|
||||||
|
return lrs.name
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lrs *layerReader) TarSum() string {
|
||||||
|
return lrs.tarSum
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lrs *layerReader) CreatedAt() time.Time {
|
||||||
|
return lrs.createdAt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lrs *layerReader) Read(p []byte) (n int, err error) {
|
||||||
|
if err := lrs.closed(); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rd, err := lrs.reader()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err = rd.Read(p)
|
||||||
|
lrs.offset += int64(n)
|
||||||
|
|
||||||
|
// Simulate io.EOR error if we reach filesize.
|
||||||
|
if err == nil && lrs.offset >= lrs.size {
|
||||||
|
err = io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(stevvooe): More error checking is required here. If the reader
|
||||||
|
// times out for some reason, we should reset the reader so we re-open the
|
||||||
|
// connection.
|
||||||
|
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lrs *layerReader) Seek(offset int64, whence int) (int64, error) {
|
||||||
|
if err := lrs.closed(); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
newOffset := lrs.offset
|
||||||
|
|
||||||
|
switch whence {
|
||||||
|
case os.SEEK_CUR:
|
||||||
|
newOffset += int64(whence)
|
||||||
|
case os.SEEK_END:
|
||||||
|
newOffset = lrs.size + int64(whence)
|
||||||
|
case os.SEEK_SET:
|
||||||
|
newOffset = int64(whence)
|
||||||
|
}
|
||||||
|
|
||||||
|
if newOffset < 0 {
|
||||||
|
err = fmt.Errorf("cannot seek to negative position")
|
||||||
|
} else if newOffset >= lrs.size {
|
||||||
|
err = fmt.Errorf("cannot seek passed end of layer")
|
||||||
|
} else {
|
||||||
|
if lrs.offset != newOffset {
|
||||||
|
lrs.resetReader()
|
||||||
|
}
|
||||||
|
|
||||||
|
// No problems, set the offset.
|
||||||
|
lrs.offset = newOffset
|
||||||
|
}
|
||||||
|
|
||||||
|
return lrs.offset, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the layer. Should be called when the resource is no longer needed.
|
||||||
|
func (lrs *layerReader) Close() error {
|
||||||
|
if lrs.closedErr != nil {
|
||||||
|
return lrs.closedErr
|
||||||
|
}
|
||||||
|
// TODO(sday): Must export this error.
|
||||||
|
lrs.closedErr = fmt.Errorf("layer closed")
|
||||||
|
|
||||||
|
// close and release reader chain
|
||||||
|
if lrs.rc != nil {
|
||||||
|
lrs.rc.Close()
|
||||||
|
lrs.rc = nil
|
||||||
|
}
|
||||||
|
lrs.brd = nil
|
||||||
|
|
||||||
|
return lrs.closedErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// reader prepares the current reader at the lrs offset, ensuring its buffered
|
||||||
|
// and ready to go.
|
||||||
|
func (lrs *layerReader) reader() (io.Reader, error) {
|
||||||
|
if err := lrs.closed(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if lrs.rc != nil {
|
||||||
|
return lrs.brd, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we don't have a reader, open one up.
|
||||||
|
rc, err := lrs.layerStore.driver.ReadStream(lrs.path, uint64(lrs.offset))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
lrs.rc = rc
|
||||||
|
|
||||||
|
if lrs.brd == nil {
|
||||||
|
// TODO(stevvooe): Set an optimal buffer size here. We'll have to
|
||||||
|
// understand the latency characteristics of the underlying network to
|
||||||
|
// set this correctly, so we may want to leave it to the driver. For
|
||||||
|
// out of process drivers, we'll have to optimize this buffer size for
|
||||||
|
// local communication.
|
||||||
|
lrs.brd = bufio.NewReader(lrs.rc)
|
||||||
|
} else {
|
||||||
|
lrs.brd.Reset(lrs.rc)
|
||||||
|
}
|
||||||
|
|
||||||
|
return lrs.brd, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// resetReader resets the reader, forcing the read method to open up a new
|
||||||
|
// connection and rebuild the buffered reader. This should be called when the
|
||||||
|
// offset and the reader will become out of sync, such as during a seek
|
||||||
|
// operation.
|
||||||
|
func (lrs *layerReader) resetReader() {
|
||||||
|
if err := lrs.closed(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if lrs.rc != nil {
|
||||||
|
lrs.rc.Close()
|
||||||
|
lrs.rc = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lrs *layerReader) closed() error {
|
||||||
|
return lrs.closedErr
|
||||||
|
}
|
203
storage/layerstore.go
Normal file
203
storage/layerstore.go
Normal file
|
@ -0,0 +1,203 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
"github.com/docker/docker-registry/storagedriver"
|
||||||
|
)
|
||||||
|
|
||||||
|
type layerStore struct {
|
||||||
|
driver storagedriver.StorageDriver
|
||||||
|
pathMapper *pathMapper
|
||||||
|
uploadStore layerUploadStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ls *layerStore) Exists(tarSum string) (bool, error) {
|
||||||
|
// Because this implementation just follows blob links, an existence check
|
||||||
|
// is pretty cheap by starting and closing a fetch.
|
||||||
|
_, err := ls.Fetch(tarSum)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if err == ErrLayerUnknown {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ls *layerStore) Fetch(tarSum string) (Layer, error) {
|
||||||
|
repos, err := ls.resolveContainingRepositories(tarSum)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
// TODO(stevvooe): Unknown tarsum error: need to wrap.
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(stevvooe): Access control for layer pulls need to happen here: we
|
||||||
|
// have a list of repos that "own" the tarsum that need to be checked
|
||||||
|
// against the list of repos to which we have pull access. The argument
|
||||||
|
// repos needs to be filtered against that access list.
|
||||||
|
|
||||||
|
name, blobPath, err := ls.resolveBlobPath(repos, tarSum)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
// TODO(stevvooe): Map this error correctly, perhaps in the callee.
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
p, err := ls.pathMapper.path(blobPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Grab the size of the layer file, ensuring that it exists, among other
|
||||||
|
// things.
|
||||||
|
size, err := ls.driver.CurrentSize(p)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
// TODO(stevvooe): Handle blob/path does not exist here.
|
||||||
|
// TODO(stevvooe): Get a better understanding of the error cases here
|
||||||
|
// that don't stem from unknown path.
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build the layer reader and return to the client.
|
||||||
|
layer := &layerReader{
|
||||||
|
layerStore: ls,
|
||||||
|
path: p,
|
||||||
|
name: name,
|
||||||
|
tarSum: tarSum,
|
||||||
|
|
||||||
|
// TODO(stevvooe): Storage backend does not support modification time
|
||||||
|
// queries yet. Layers "never" change, so just return the zero value.
|
||||||
|
createdAt: time.Time{},
|
||||||
|
|
||||||
|
offset: 0,
|
||||||
|
size: int64(size),
|
||||||
|
}
|
||||||
|
|
||||||
|
return layer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upload begins a layer upload, returning a handle. If the layer upload
|
||||||
|
// is already in progress or the layer has already been uploaded, this
|
||||||
|
// will return an error.
|
||||||
|
func (ls *layerStore) Upload(name, tarSum string) (LayerUpload, error) {
|
||||||
|
exists, err := ls.Exists(tarSum)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if exists {
|
||||||
|
// TODO(stevvoe): This looks simple now, but we really should only
|
||||||
|
// return the layer exists error when the layer exists AND the current
|
||||||
|
// client has access to the layer. If the client doesn't have access
|
||||||
|
// to the layer, the upload should proceed.
|
||||||
|
return nil, ErrLayerExists
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE(stevvooe): Consider the issues with allowing concurrent upload of
|
||||||
|
// the same two layers. Should it be disallowed? For now, we allow both
|
||||||
|
// parties to proceed and the the first one uploads the layer.
|
||||||
|
|
||||||
|
lus, err := ls.uploadStore.New(name, tarSum)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ls.newLayerUpload(lus), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resume continues an in progress layer upload, returning the current
|
||||||
|
// state of the upload.
|
||||||
|
func (ls *layerStore) Resume(name, tarSum, uuid string) (LayerUpload, error) {
|
||||||
|
lus, err := ls.uploadStore.GetState(uuid)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ls.newLayerUpload(lus), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// newLayerUpload allocates a new upload controller with the given state.
|
||||||
|
func (ls *layerStore) newLayerUpload(lus LayerUploadState) LayerUpload {
|
||||||
|
return &layerUploadController{
|
||||||
|
LayerUploadState: lus,
|
||||||
|
layerStore: ls,
|
||||||
|
uploadStore: ls.uploadStore,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ls *layerStore) resolveContainingRepositories(tarSum string) ([]string, error) {
|
||||||
|
// Lookup the layer link in the index by tarsum id.
|
||||||
|
layerIndexLinkPath, err := ls.pathMapper.path(layerIndexLinkPathSpec{tarSum: tarSum})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
layerIndexLinkContent, err := ls.driver.GetContent(layerIndexLinkPath)
|
||||||
|
if err != nil {
|
||||||
|
switch err := err.(type) {
|
||||||
|
case storagedriver.PathNotFoundError:
|
||||||
|
return nil, ErrLayerUnknown
|
||||||
|
default:
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
results := strings.Split(string(layerIndexLinkContent), "\n")
|
||||||
|
|
||||||
|
// clean these up
|
||||||
|
for i, result := range results {
|
||||||
|
results[i] = strings.TrimSpace(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
return results, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// resolveBlobId lookups up the tarSum in the various repos to find the blob
|
||||||
|
// link, returning the repo name and blob path spec or an error on failure.
|
||||||
|
func (ls *layerStore) resolveBlobPath(repos []string, tarSum string) (name string, bps blobPathSpec, err error) {
|
||||||
|
|
||||||
|
for _, repo := range repos {
|
||||||
|
pathSpec := layerLinkPathSpec{name: repo, tarSum: tarSum}
|
||||||
|
layerLinkPath, err := ls.pathMapper.path(pathSpec)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
// TODO(stevvooe): This looks very lazy, may want to collect these
|
||||||
|
// errors and report them if we exit this for loop without
|
||||||
|
// resolving the blob id.
|
||||||
|
logrus.Debugf("error building linkLayerPath (%V): %v", pathSpec, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
layerLinkContent, err := ls.driver.GetContent(layerLinkPath)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Debugf("error getting layerLink content (%V): %v", pathSpec, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Yay! We've resolved our blob id and we're ready to go.
|
||||||
|
parts := strings.SplitN(strings.TrimSpace(string(layerLinkContent)), ":", 2)
|
||||||
|
|
||||||
|
if len(parts) != 2 {
|
||||||
|
return "", bps, fmt.Errorf("invalid blob reference: %q", string(layerLinkContent))
|
||||||
|
}
|
||||||
|
|
||||||
|
name = repo
|
||||||
|
bp := blobPathSpec{alg: parts[0], digest: parts[1]}
|
||||||
|
|
||||||
|
return repo, bp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(stevvooe): Map this error to repo not found, but it basically
|
||||||
|
// means we exited the loop above without finding a blob link.
|
||||||
|
return "", bps, fmt.Errorf("unable to resolve blog id for repos=%v and tarSum=%q", repos, tarSum)
|
||||||
|
}
|
514
storage/layerupload.go
Normal file
514
storage/layerupload.go
Normal file
|
@ -0,0 +1,514 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"code.google.com/p/go-uuid/uuid"
|
||||||
|
|
||||||
|
"github.com/docker/docker-registry/storagedriver"
|
||||||
|
"github.com/docker/docker/pkg/tarsum"
|
||||||
|
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
// LayerUploadState captures the state serializable state of the layer upload.
|
||||||
|
type LayerUploadState struct {
|
||||||
|
// name is the primary repository under which the layer will be linked.
|
||||||
|
Name string
|
||||||
|
|
||||||
|
// tarSum identifies the target layer. Provided by the client. If the
|
||||||
|
// resulting tarSum does not match this value, an error should be
|
||||||
|
// returned.
|
||||||
|
TarSum string
|
||||||
|
|
||||||
|
// UUID identifies the upload.
|
||||||
|
UUID string
|
||||||
|
|
||||||
|
// offset contains the current progress of the upload.
|
||||||
|
Offset int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// layerUploadController is used to control the various aspects of resumable
|
||||||
|
// layer upload. It implements the LayerUpload interface.
|
||||||
|
type layerUploadController struct {
|
||||||
|
LayerUploadState
|
||||||
|
|
||||||
|
layerStore *layerStore
|
||||||
|
uploadStore layerUploadStore
|
||||||
|
fp layerFile
|
||||||
|
err error // terminal error, if set, controller is closed
|
||||||
|
}
|
||||||
|
|
||||||
|
// layerFile documents the interface used while writing layer files, similar
|
||||||
|
// to *os.File. This is separate from layerReader, for now, because we want to
|
||||||
|
// store uploads on the local file system until we have write-through hashing
|
||||||
|
// support. They should be combined once this is worked out.
|
||||||
|
type layerFile interface {
|
||||||
|
io.WriteSeeker
|
||||||
|
io.Reader
|
||||||
|
io.Closer
|
||||||
|
|
||||||
|
// Sync commits the contents of the writer to storage.
|
||||||
|
Sync() (err error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// layerUploadStore provides storage for temporary files and upload state of
|
||||||
|
// layers. This is be used by the LayerService to manage the state of ongoing
|
||||||
|
// uploads. This interface will definitely change and will most likely end up
|
||||||
|
// being exported to the app layer. Move the layer.go when it's ready to go.
|
||||||
|
type layerUploadStore interface {
|
||||||
|
New(name, tarSum string) (LayerUploadState, error)
|
||||||
|
Open(uuid string) (layerFile, error)
|
||||||
|
GetState(uuid string) (LayerUploadState, error)
|
||||||
|
SaveState(lus LayerUploadState) error
|
||||||
|
DeleteState(uuid string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ LayerUpload = &layerUploadController{}
|
||||||
|
|
||||||
|
// Name of the repository under which the layer will be linked.
|
||||||
|
func (luc *layerUploadController) Name() string {
|
||||||
|
return luc.LayerUploadState.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
// TarSum identifier of the proposed layer. Resulting data must match this
|
||||||
|
// tarsum.
|
||||||
|
func (luc *layerUploadController) TarSum() string {
|
||||||
|
return luc.LayerUploadState.TarSum
|
||||||
|
}
|
||||||
|
|
||||||
|
// UUID returns the identifier for this upload.
|
||||||
|
func (luc *layerUploadController) UUID() string {
|
||||||
|
return luc.LayerUploadState.UUID
|
||||||
|
}
|
||||||
|
|
||||||
|
// Offset returns the position of the last byte written to this layer.
|
||||||
|
func (luc *layerUploadController) Offset() int64 {
|
||||||
|
return luc.LayerUploadState.Offset
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finish marks the upload as completed, returning a valid handle to the
|
||||||
|
// uploaded layer. The final size and checksum are validated against the
|
||||||
|
// contents of the uploaded layer. The checksum should be provided in the
|
||||||
|
// format <algorithm>:<hex digest>.
|
||||||
|
func (luc *layerUploadController) Finish(size int64, digestStr string) (Layer, error) {
|
||||||
|
|
||||||
|
// This section is going to be pretty ugly now. We will have to read the
|
||||||
|
// file twice. First, to get the tarsum and checksum. When those are
|
||||||
|
// available, and validated, we will upload it to the blob store and link
|
||||||
|
// it into the repository. In the future, we need to use resumable hash
|
||||||
|
// calculations for tarsum and checksum that can be calculated during the
|
||||||
|
// upload. This will allow us to cut the data directly into a temporary
|
||||||
|
// directory in the storage backend.
|
||||||
|
|
||||||
|
fp, err := luc.file()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
// Cleanup?
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
digest, err := ParseDigest(digestStr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := luc.validateLayer(fp, size, digest); err != nil {
|
||||||
|
// Cleanup?
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := luc.writeLayer(fp, size, digest); err != nil {
|
||||||
|
// Cleanup?
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Yes! We have written some layer data. Let's make it visible. Link the
|
||||||
|
// layer blob into the repository.
|
||||||
|
if err := luc.linkLayer(digest); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ok, the upload has completed and finished. Delete the state.
|
||||||
|
if err := luc.uploadStore.DeleteState(luc.UUID()); err != nil {
|
||||||
|
// Can we ignore this error?
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return luc.layerStore.Fetch(luc.TarSum())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cancel the layer upload process.
|
||||||
|
func (luc *layerUploadController) Cancel() error {
|
||||||
|
if err := luc.layerStore.uploadStore.DeleteState(luc.UUID()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return luc.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (luc *layerUploadController) Write(p []byte) (int, error) {
|
||||||
|
wr, err := luc.file()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := wr.Write(p)
|
||||||
|
|
||||||
|
// Because we expect the reported offset to be consistent with the storage
|
||||||
|
// state, unfortunately, we need to Sync on every call to write.
|
||||||
|
if err := wr.Sync(); err != nil {
|
||||||
|
// Effectively, ignore the write state if the Sync fails. Report that
|
||||||
|
// no bytes were written and seek back to the starting offset.
|
||||||
|
offset, seekErr := wr.Seek(luc.Offset(), os.SEEK_SET)
|
||||||
|
if seekErr != nil {
|
||||||
|
// What do we do here? Quite disasterous.
|
||||||
|
luc.reset()
|
||||||
|
|
||||||
|
return 0, fmt.Errorf("multiple errors encounterd after Sync + Seek: %v then %v", err, seekErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if offset != luc.Offset() {
|
||||||
|
return 0, fmt.Errorf("unexpected offset after seek")
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
luc.LayerUploadState.Offset += int64(n)
|
||||||
|
|
||||||
|
if err := luc.uploadStore.SaveState(luc.LayerUploadState); err != nil {
|
||||||
|
// TODO(stevvooe): This failure case may require more thought.
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (luc *layerUploadController) Close() error {
|
||||||
|
if luc.err != nil {
|
||||||
|
return luc.err
|
||||||
|
}
|
||||||
|
|
||||||
|
if luc.fp != nil {
|
||||||
|
luc.err = luc.fp.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
return luc.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (luc *layerUploadController) file() (layerFile, error) {
|
||||||
|
if luc.fp != nil {
|
||||||
|
return luc.fp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
fp, err := luc.uploadStore.Open(luc.UUID())
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(stevvooe): We may need a more aggressive check here to ensure that
|
||||||
|
// the file length is equal to the current offset. We may want to sync the
|
||||||
|
// offset before return the layer upload to the client so it can be
|
||||||
|
// validated before proceeding with any writes.
|
||||||
|
|
||||||
|
// Seek to the current layer offset for good measure.
|
||||||
|
if _, err = fp.Seek(luc.Offset(), os.SEEK_SET); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
luc.fp = fp
|
||||||
|
|
||||||
|
return luc.fp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset closes and drops the current writer.
|
||||||
|
func (luc *layerUploadController) reset() {
|
||||||
|
if luc.fp != nil {
|
||||||
|
luc.fp.Close()
|
||||||
|
luc.fp = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// validateLayer runs several checks on the layer file to ensure its validity.
|
||||||
|
// This is currently very expensive and relies on fast io and fast seek.
|
||||||
|
func (luc *layerUploadController) validateLayer(fp layerFile, size int64, digest Digest) error {
|
||||||
|
// First, seek to the end of the file, checking the size is as expected.
|
||||||
|
end, err := fp.Seek(0, os.SEEK_END)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if end != size {
|
||||||
|
return ErrLayerInvalidLength
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now seek back to start and take care of tarsum and checksum.
|
||||||
|
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
version, err := tarsum.GetVersionFromTarsum(luc.TarSum())
|
||||||
|
if err != nil {
|
||||||
|
return ErrLayerTarSumVersionUnsupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// // We only support tarsum version 1 for now.
|
||||||
|
if version != tarsum.Version1 {
|
||||||
|
return ErrLayerTarSumVersionUnsupported
|
||||||
|
}
|
||||||
|
|
||||||
|
ts, err := tarsum.NewTarSum(fp, true, tarsum.Version1)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
h := sha256.New()
|
||||||
|
|
||||||
|
// Pull the layer file through by writing it to a checksum.
|
||||||
|
nn, err := io.Copy(h, ts)
|
||||||
|
|
||||||
|
if nn != int64(size) {
|
||||||
|
return fmt.Errorf("bad read while finishing upload(%s) %v: %v != %v, err=%v", luc.UUID(), fp, nn, size, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
calculatedDigest := NewDigest("sha256", h)
|
||||||
|
|
||||||
|
// Compare the digests!
|
||||||
|
if digest != calculatedDigest {
|
||||||
|
return ErrLayerInvalidChecksum
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compare the tarsums!
|
||||||
|
if ts.Sum(nil) != luc.TarSum() {
|
||||||
|
return ErrLayerInvalidTarsum
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeLayer actually writes the the layer file into its final destination.
|
||||||
|
// The layer should be validated before commencing the write.
|
||||||
|
func (luc *layerUploadController) writeLayer(fp layerFile, size int64, digest Digest) error {
|
||||||
|
blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{
|
||||||
|
alg: digest.Algorithm(),
|
||||||
|
digest: digest.Hex(),
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for existence
|
||||||
|
if _, err := luc.layerStore.driver.CurrentSize(blobPath); err != nil {
|
||||||
|
// TODO(stevvooe): This check is kind of problematic and very racy.
|
||||||
|
switch err := err.(type) {
|
||||||
|
case storagedriver.PathNotFoundError:
|
||||||
|
break // ensure that it doesn't exist.
|
||||||
|
default:
|
||||||
|
// TODO(stevvooe): This isn't actually an error: the blob store is
|
||||||
|
// content addressable and we should just use this to ensure we
|
||||||
|
// have it written. Although, we do need to verify that the
|
||||||
|
// content that is there is the correct length.
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Seek our local layer file back now.
|
||||||
|
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
|
||||||
|
// Cleanup?
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Okay: we can write the file to the blob store.
|
||||||
|
if err := luc.layerStore.driver.WriteStream(blobPath, 0, uint64(size), fp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// linkLayer links a valid, written layer blog into the registry, first
|
||||||
|
// linking the repository namespace, then adding it to the layerindex.
|
||||||
|
func (luc *layerUploadController) linkLayer(digest Digest) error {
|
||||||
|
layerLinkPath, err := luc.layerStore.pathMapper.path(layerLinkPathSpec{
|
||||||
|
name: luc.Name(),
|
||||||
|
tarSum: luc.TarSum(),
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest)); err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Link the layer into the name index.
|
||||||
|
layerIndexLinkPath, err := luc.layerStore.pathMapper.path(layerIndexLinkPathSpec{
|
||||||
|
tarSum: luc.TarSum(),
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read back the name index file. If it exists, create it. If not, add the
|
||||||
|
// new repo to the name list.
|
||||||
|
|
||||||
|
// TODO(stevvooe): This is very racy, as well. Reconsider using list for
|
||||||
|
// this operation?
|
||||||
|
layerIndexLinkContent, err := luc.layerStore.driver.GetContent(layerIndexLinkPath)
|
||||||
|
if err != nil {
|
||||||
|
switch err := err.(type) {
|
||||||
|
case storagedriver.PathNotFoundError:
|
||||||
|
layerIndexLinkContent = []byte(luc.Name())
|
||||||
|
default:
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
layerIndexLinkContent = luc.maybeAddNameToLayerIndexLinkContent(layerIndexLinkContent)
|
||||||
|
|
||||||
|
// Write the index content back to the index.
|
||||||
|
return luc.layerStore.driver.PutContent(layerIndexLinkPath, layerIndexLinkContent)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (luc *layerUploadController) maybeAddNameToLayerIndexLinkContent(content []byte) []byte {
|
||||||
|
names := strings.Split(string(content), "\n")
|
||||||
|
var found bool
|
||||||
|
// Search the names and find ours
|
||||||
|
for _, name := range names {
|
||||||
|
if name == luc.Name() {
|
||||||
|
found = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !found {
|
||||||
|
names = append(names, luc.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Strings(names)
|
||||||
|
|
||||||
|
return []byte(strings.Join(names, "\n"))
|
||||||
|
}
|
||||||
|
|
||||||
|
// localFSLayerUploadStore implements a local layerUploadStore. There are some
|
||||||
|
// complexities around hashsums that make round tripping to the storage
|
||||||
|
// backend problematic, so we'll store and read locally for now. By GO-beta,
|
||||||
|
// this should be fully implemented on top of the backend storagedriver.
|
||||||
|
//
|
||||||
|
// For now, the directory layout is as follows:
|
||||||
|
//
|
||||||
|
// /<temp dir>/registry-layer-upload/
|
||||||
|
// <uuid>/
|
||||||
|
// -> state.json
|
||||||
|
// -> data
|
||||||
|
//
|
||||||
|
// Each upload, identified by uuid, has its own directory with a state file
|
||||||
|
// and a data file. The state file has a json representation of the current
|
||||||
|
// state. The data file is the in-progress upload data.
|
||||||
|
type localFSLayerUploadStore struct {
|
||||||
|
root string
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTemporaryLocalFSLayerUploadStore() (layerUploadStore, error) {
|
||||||
|
path, err := ioutil.TempDir("", "registry-layer-upload")
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &localFSLayerUploadStore{
|
||||||
|
root: path,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (llufs *localFSLayerUploadStore) New(name, tarSum string) (LayerUploadState, error) {
|
||||||
|
lus := LayerUploadState{
|
||||||
|
Name: name,
|
||||||
|
TarSum: tarSum,
|
||||||
|
UUID: uuid.New(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.Mkdir(llufs.path(lus.UUID, ""), 0755); err != nil {
|
||||||
|
return lus, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return lus, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (llufs *localFSLayerUploadStore) Open(uuid string) (layerFile, error) {
|
||||||
|
fp, err := os.OpenFile(llufs.path(uuid, "data"), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return fp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) {
|
||||||
|
// TODO(stevvoe): Storing this state on the local file system is an
|
||||||
|
// intermediate stop gap. This technique is unlikely to handle any kind of
|
||||||
|
// concurrency very well.
|
||||||
|
|
||||||
|
var lus LayerUploadState
|
||||||
|
fp, err := os.Open(llufs.path(uuid, "state.json"))
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return lus, ErrLayerUploadUnknown
|
||||||
|
}
|
||||||
|
|
||||||
|
return lus, err
|
||||||
|
}
|
||||||
|
defer fp.Close()
|
||||||
|
|
||||||
|
dec := json.NewDecoder(fp)
|
||||||
|
if err := dec.Decode(&lus); err != nil {
|
||||||
|
return lus, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return lus, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (llufs *localFSLayerUploadStore) SaveState(lus LayerUploadState) error {
|
||||||
|
p, err := json.Marshal(lus)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ioutil.WriteFile(llufs.path(lus.UUID, "state.json"), p, 0644)
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return ErrLayerUploadUnknown
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error {
|
||||||
|
if err := os.RemoveAll(llufs.path(uuid, "")); err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return ErrLayerUploadUnknown
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (llufs *localFSLayerUploadStore) path(uuid, file string) string {
|
||||||
|
return filepath.Join(llufs.root, uuid, file)
|
||||||
|
}
|
44
storage/services.go
Normal file
44
storage/services.go
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/docker/docker-registry/storagedriver"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Services provides various services with application-level operations for
|
||||||
|
// use across backend storage drivers.
|
||||||
|
type Services struct {
|
||||||
|
driver storagedriver.StorageDriver
|
||||||
|
pathMapper *pathMapper
|
||||||
|
layerUploadStore layerUploadStore
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewServices creates a new Services object to access docker objects stored
|
||||||
|
// in the underlying driver.
|
||||||
|
func NewServices(driver storagedriver.StorageDriver) *Services {
|
||||||
|
|
||||||
|
layerUploadStore, err := newTemporaryLocalFSLayerUploadStore()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
// TODO(stevvooe): This failure needs to be understood in the context
|
||||||
|
// of the lifecycle of the services object, which is uncertain at this
|
||||||
|
// point.
|
||||||
|
panic("unable to allocate layerUploadStore: " + err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Services{
|
||||||
|
driver: driver,
|
||||||
|
pathMapper: &pathMapper{
|
||||||
|
// TODO(sday): This should be configurable.
|
||||||
|
root: "/docker/registry/",
|
||||||
|
version: storagePathVersion,
|
||||||
|
},
|
||||||
|
layerUploadStore: layerUploadStore,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Layers returns an instance of the LayerService. Instantiation is cheap and
|
||||||
|
// may be context sensitive in the future. The instance should be used similar
|
||||||
|
// to a request local.
|
||||||
|
func (ss *Services) Layers() LayerService {
|
||||||
|
return &layerStore{driver: ss.driver, pathMapper: ss.pathMapper}
|
||||||
|
}
|
Loading…
Reference in a new issue