registry/storage/layerupload.go

515 lines
13 KiB
Go
Raw Normal View History

Initial implementation of registry LayerService This change contains the initial implementation of the LayerService to power layer push and pulls on the storagedriver. The interfaces presented in this package will be used by the http application to drive most features around efficient pulls and resumable pushes. The file storage/layer.go defines the interface interactions. LayerService is the root type and supports methods to access Layer and LayerUpload objects. Pull operations are supported with LayerService.Fetch and push operations are supported with LayerService.Upload and LayerService.Resume. Reads and writes of layers are split between Layer and LayerUpload, respectively. LayerService is implemented internally with the layerStore object, which takes a storagedriver.StorageDriver and a pathMapper instance. LayerUploadState is currently exported and will likely continue to be as the interaction between it and layerUploadStore are better understood. Likely, the layerUploadStore lifecycle and implementation will be deferred to the application. Image pushes pulls will be implemented in a similar manner without the discrete, persistent upload. Much of this change is in place to get something running and working. Caveats of this change include the following: 1. Layer upload state storage is implemented on the local filesystem, separate from the storage driver. This must be replaced with using the proper backend and other state storage. This can be removed when we implement resumable hashing and tarsum calculations to avoid backend roundtrips. 2. Error handling is rather bespoke at this time. The http API implementation should really dictate the error return structure for the future, so we intend to refactor this heavily to support these errors. We'd also like to collect production data to understand how failures happen in the system as a while before moving to a particular edict around error handling. 3. The layerUploadStore, which manages layer upload storage and state is not currently exported. This will likely end up being split, with the file management portion being pointed at the storagedriver and the state storage elsewhere. 4. Access Control provisions are nearly completely missing from this change. There are details around how layerindex lookup works that are related with access controls. As the auth portions of the new API take shape, these provisions will become more clear. Please see TODOs for details and individual recommendations.
2014-11-18 00:29:42 +00:00
package storage
import (
"crypto/sha256"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"
"code.google.com/p/go-uuid/uuid"
"github.com/docker/docker-registry/storagedriver"
"github.com/docker/docker/pkg/tarsum"
"io"
)
// LayerUploadState captures the state serializable state of the layer upload.
type LayerUploadState struct {
// name is the primary repository under which the layer will be linked.
Name string
// tarSum identifies the target layer. Provided by the client. If the
// resulting tarSum does not match this value, an error should be
// returned.
TarSum string
// UUID identifies the upload.
UUID string
// offset contains the current progress of the upload.
Offset int64
}
// layerUploadController is used to control the various aspects of resumable
// layer upload. It implements the LayerUpload interface.
type layerUploadController struct {
LayerUploadState
layerStore *layerStore
uploadStore layerUploadStore
fp layerFile
err error // terminal error, if set, controller is closed
}
// layerFile documents the interface used while writing layer files, similar
// to *os.File. This is separate from layerReader, for now, because we want to
// store uploads on the local file system until we have write-through hashing
// support. They should be combined once this is worked out.
type layerFile interface {
io.WriteSeeker
io.Reader
io.Closer
// Sync commits the contents of the writer to storage.
Sync() (err error)
}
// layerUploadStore provides storage for temporary files and upload state of
// layers. This is be used by the LayerService to manage the state of ongoing
// uploads. This interface will definitely change and will most likely end up
// being exported to the app layer. Move the layer.go when it's ready to go.
type layerUploadStore interface {
New(name, tarSum string) (LayerUploadState, error)
Open(uuid string) (layerFile, error)
GetState(uuid string) (LayerUploadState, error)
SaveState(lus LayerUploadState) error
DeleteState(uuid string) error
}
var _ LayerUpload = &layerUploadController{}
// Name of the repository under which the layer will be linked.
func (luc *layerUploadController) Name() string {
return luc.LayerUploadState.Name
}
// TarSum identifier of the proposed layer. Resulting data must match this
// tarsum.
func (luc *layerUploadController) TarSum() string {
return luc.LayerUploadState.TarSum
}
// UUID returns the identifier for this upload.
func (luc *layerUploadController) UUID() string {
return luc.LayerUploadState.UUID
}
// Offset returns the position of the last byte written to this layer.
func (luc *layerUploadController) Offset() int64 {
return luc.LayerUploadState.Offset
}
// Finish marks the upload as completed, returning a valid handle to the
// uploaded layer. The final size and checksum are validated against the
// contents of the uploaded layer. The checksum should be provided in the
// format <algorithm>:<hex digest>.
func (luc *layerUploadController) Finish(size int64, digestStr string) (Layer, error) {
// This section is going to be pretty ugly now. We will have to read the
// file twice. First, to get the tarsum and checksum. When those are
// available, and validated, we will upload it to the blob store and link
// it into the repository. In the future, we need to use resumable hash
// calculations for tarsum and checksum that can be calculated during the
// upload. This will allow us to cut the data directly into a temporary
// directory in the storage backend.
fp, err := luc.file()
if err != nil {
// Cleanup?
return nil, err
}
digest, err := ParseDigest(digestStr)
if err != nil {
return nil, err
}
if err := luc.validateLayer(fp, size, digest); err != nil {
// Cleanup?
return nil, err
}
if err := luc.writeLayer(fp, size, digest); err != nil {
// Cleanup?
return nil, err
}
// Yes! We have written some layer data. Let's make it visible. Link the
// layer blob into the repository.
if err := luc.linkLayer(digest); err != nil {
return nil, err
}
// Ok, the upload has completed and finished. Delete the state.
if err := luc.uploadStore.DeleteState(luc.UUID()); err != nil {
// Can we ignore this error?
return nil, err
}
return luc.layerStore.Fetch(luc.TarSum())
}
// Cancel the layer upload process.
func (luc *layerUploadController) Cancel() error {
if err := luc.layerStore.uploadStore.DeleteState(luc.UUID()); err != nil {
return err
}
return luc.Close()
}
func (luc *layerUploadController) Write(p []byte) (int, error) {
wr, err := luc.file()
if err != nil {
return 0, err
}
n, err := wr.Write(p)
// Because we expect the reported offset to be consistent with the storage
// state, unfortunately, we need to Sync on every call to write.
if err := wr.Sync(); err != nil {
// Effectively, ignore the write state if the Sync fails. Report that
// no bytes were written and seek back to the starting offset.
offset, seekErr := wr.Seek(luc.Offset(), os.SEEK_SET)
if seekErr != nil {
// What do we do here? Quite disasterous.
luc.reset()
return 0, fmt.Errorf("multiple errors encounterd after Sync + Seek: %v then %v", err, seekErr)
}
if offset != luc.Offset() {
return 0, fmt.Errorf("unexpected offset after seek")
}
return 0, err
}
luc.LayerUploadState.Offset += int64(n)
if err := luc.uploadStore.SaveState(luc.LayerUploadState); err != nil {
// TODO(stevvooe): This failure case may require more thought.
return n, err
}
return n, err
}
func (luc *layerUploadController) Close() error {
if luc.err != nil {
return luc.err
}
if luc.fp != nil {
luc.err = luc.fp.Close()
}
return luc.err
}
func (luc *layerUploadController) file() (layerFile, error) {
if luc.fp != nil {
return luc.fp, nil
}
fp, err := luc.uploadStore.Open(luc.UUID())
if err != nil {
return nil, err
}
// TODO(stevvooe): We may need a more aggressive check here to ensure that
// the file length is equal to the current offset. We may want to sync the
// offset before return the layer upload to the client so it can be
// validated before proceeding with any writes.
// Seek to the current layer offset for good measure.
if _, err = fp.Seek(luc.Offset(), os.SEEK_SET); err != nil {
return nil, err
}
luc.fp = fp
return luc.fp, nil
}
// reset closes and drops the current writer.
func (luc *layerUploadController) reset() {
if luc.fp != nil {
luc.fp.Close()
luc.fp = nil
}
}
// validateLayer runs several checks on the layer file to ensure its validity.
// This is currently very expensive and relies on fast io and fast seek.
func (luc *layerUploadController) validateLayer(fp layerFile, size int64, digest Digest) error {
// First, seek to the end of the file, checking the size is as expected.
end, err := fp.Seek(0, os.SEEK_END)
if err != nil {
return err
}
if end != size {
return ErrLayerInvalidLength
}
// Now seek back to start and take care of tarsum and checksum.
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
return err
}
version, err := tarsum.GetVersionFromTarsum(luc.TarSum())
if err != nil {
return ErrLayerTarSumVersionUnsupported
}
// // We only support tarsum version 1 for now.
if version != tarsum.Version1 {
return ErrLayerTarSumVersionUnsupported
}
ts, err := tarsum.NewTarSum(fp, true, tarsum.Version1)
if err != nil {
return err
}
h := sha256.New()
// Pull the layer file through by writing it to a checksum.
nn, err := io.Copy(h, ts)
if nn != int64(size) {
return fmt.Errorf("bad read while finishing upload(%s) %v: %v != %v, err=%v", luc.UUID(), fp, nn, size, err)
}
if err != nil && err != io.EOF {
return err
}
calculatedDigest := NewDigest("sha256", h)
// Compare the digests!
if digest != calculatedDigest {
return ErrLayerInvalidChecksum
}
// Compare the tarsums!
if ts.Sum(nil) != luc.TarSum() {
return ErrLayerInvalidTarsum
}
return nil
}
// writeLayer actually writes the the layer file into its final destination.
// The layer should be validated before commencing the write.
func (luc *layerUploadController) writeLayer(fp layerFile, size int64, digest Digest) error {
blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{
alg: digest.Algorithm(),
digest: digest.Hex(),
})
if err != nil {
return err
}
// Check for existence
if _, err := luc.layerStore.driver.CurrentSize(blobPath); err != nil {
// TODO(stevvooe): This check is kind of problematic and very racy.
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // ensure that it doesn't exist.
default:
// TODO(stevvooe): This isn't actually an error: the blob store is
// content addressable and we should just use this to ensure we
// have it written. Although, we do need to verify that the
// content that is there is the correct length.
return err
}
}
// Seek our local layer file back now.
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
// Cleanup?
return err
}
// Okay: we can write the file to the blob store.
if err := luc.layerStore.driver.WriteStream(blobPath, 0, uint64(size), fp); err != nil {
return err
}
return nil
}
// linkLayer links a valid, written layer blog into the registry, first
// linking the repository namespace, then adding it to the layerindex.
func (luc *layerUploadController) linkLayer(digest Digest) error {
layerLinkPath, err := luc.layerStore.pathMapper.path(layerLinkPathSpec{
name: luc.Name(),
tarSum: luc.TarSum(),
})
if err != nil {
return err
}
if err := luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest)); err != nil {
return nil
}
// Link the layer into the name index.
layerIndexLinkPath, err := luc.layerStore.pathMapper.path(layerIndexLinkPathSpec{
tarSum: luc.TarSum(),
})
if err != nil {
return err
}
// Read back the name index file. If it exists, create it. If not, add the
// new repo to the name list.
// TODO(stevvooe): This is very racy, as well. Reconsider using list for
// this operation?
layerIndexLinkContent, err := luc.layerStore.driver.GetContent(layerIndexLinkPath)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
layerIndexLinkContent = []byte(luc.Name())
default:
return err
}
}
layerIndexLinkContent = luc.maybeAddNameToLayerIndexLinkContent(layerIndexLinkContent)
// Write the index content back to the index.
return luc.layerStore.driver.PutContent(layerIndexLinkPath, layerIndexLinkContent)
}
func (luc *layerUploadController) maybeAddNameToLayerIndexLinkContent(content []byte) []byte {
names := strings.Split(string(content), "\n")
var found bool
// Search the names and find ours
for _, name := range names {
if name == luc.Name() {
found = true
}
}
if !found {
names = append(names, luc.Name())
}
sort.Strings(names)
return []byte(strings.Join(names, "\n"))
}
// localFSLayerUploadStore implements a local layerUploadStore. There are some
// complexities around hashsums that make round tripping to the storage
// backend problematic, so we'll store and read locally for now. By GO-beta,
// this should be fully implemented on top of the backend storagedriver.
//
// For now, the directory layout is as follows:
//
// /<temp dir>/registry-layer-upload/
// <uuid>/
// -> state.json
// -> data
//
// Each upload, identified by uuid, has its own directory with a state file
// and a data file. The state file has a json representation of the current
// state. The data file is the in-progress upload data.
type localFSLayerUploadStore struct {
root string
}
func newTemporaryLocalFSLayerUploadStore() (layerUploadStore, error) {
path, err := ioutil.TempDir("", "registry-layer-upload")
if err != nil {
return nil, err
}
return &localFSLayerUploadStore{
root: path,
}, nil
}
func (llufs *localFSLayerUploadStore) New(name, tarSum string) (LayerUploadState, error) {
lus := LayerUploadState{
Name: name,
TarSum: tarSum,
UUID: uuid.New(),
}
if err := os.Mkdir(llufs.path(lus.UUID, ""), 0755); err != nil {
return lus, err
}
return lus, nil
}
func (llufs *localFSLayerUploadStore) Open(uuid string) (layerFile, error) {
fp, err := os.OpenFile(llufs.path(uuid, "data"), os.O_CREATE|os.O_APPEND|os.O_RDWR, 0644)
if err != nil {
return nil, err
}
return fp, nil
}
func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) {
// TODO(stevvoe): Storing this state on the local file system is an
// intermediate stop gap. This technique is unlikely to handle any kind of
// concurrency very well.
var lus LayerUploadState
fp, err := os.Open(llufs.path(uuid, "state.json"))
if err != nil {
if os.IsNotExist(err) {
return lus, ErrLayerUploadUnknown
}
return lus, err
}
defer fp.Close()
dec := json.NewDecoder(fp)
if err := dec.Decode(&lus); err != nil {
return lus, err
}
return lus, nil
}
func (llufs *localFSLayerUploadStore) SaveState(lus LayerUploadState) error {
p, err := json.Marshal(lus)
if err != nil {
return err
}
err = ioutil.WriteFile(llufs.path(lus.UUID, "state.json"), p, 0644)
if os.IsNotExist(err) {
return ErrLayerUploadUnknown
}
return err
}
func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error {
if err := os.RemoveAll(llufs.path(uuid, "")); err != nil {
if os.IsNotExist(err) {
return ErrLayerUploadUnknown
}
return err
}
return nil
}
func (llufs *localFSLayerUploadStore) path(uuid, file string) string {
return filepath.Join(llufs.root, uuid, file)
}