Move storage package under registry package

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2015-02-10 17:41:09 -08:00
parent d6308bc62b
commit 71e7ac33ca
40 changed files with 5411 additions and 8 deletions

View file

@ -11,8 +11,8 @@ import (
"github.com/docker/distribution/registry/auth"
"github.com/docker/distribution/configuration"
ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/storage"
"github.com/docker/distribution/storage/notifications"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/notifications"
"github.com/docker/distribution/storagedriver"
"github.com/docker/distribution/storagedriver/factory"
"github.com/gorilla/mux"

View file

@ -10,7 +10,7 @@ import (
"github.com/docker/distribution/registry/api/v2"
_ "github.com/docker/distribution/registry/auth/silly"
"github.com/docker/distribution/configuration"
"github.com/docker/distribution/storage"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/storagedriver/inmemory"
"golang.org/x/net/context"
)

View file

@ -7,7 +7,7 @@ import (
"github.com/docker/distribution/registry/api/v2"
ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/storage"
"github.com/docker/distribution/registry/storage"
"golang.org/x/net/context"
)

View file

@ -9,7 +9,7 @@ import (
ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/storage"
"github.com/docker/distribution/registry/storage"
"github.com/gorilla/handlers"
)

View file

@ -6,7 +6,7 @@ import (
"github.com/docker/distribution/registry/api/v2"
ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/storage"
"github.com/docker/distribution/registry/storage"
"github.com/gorilla/handlers"
)

View file

@ -10,7 +10,7 @@ import (
"github.com/docker/distribution/registry/api/v2"
ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/storage"
"github.com/docker/distribution/registry/storage"
"github.com/gorilla/handlers"
)

View file

@ -5,7 +5,7 @@ import (
"net/http"
"github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/storage"
"github.com/docker/distribution/registry/storage"
"github.com/gorilla/handlers"
)

159
docs/storage/blobstore.go Normal file
View file

@ -0,0 +1,159 @@
package storage
import (
"fmt"
ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/storagedriver"
"golang.org/x/net/context"
)
// TODO(stevvooe): Currently, the blobStore implementation used by the
// manifest store. The layer store should be refactored to better leverage the
// blobStore, reducing duplicated code.
// blobStore implements a generalized blob store over a driver, supporting the
// read side and link management. This object is intentionally a leaky
// abstraction, providing utility methods that support creating and traversing
// backend links.
type blobStore struct {
*registry
ctx context.Context
}
// exists reports whether or not the path exists. If the driver returns error
// other than storagedriver.PathNotFound, an error may be returned.
func (bs *blobStore) exists(dgst digest.Digest) (bool, error) {
path, err := bs.path(dgst)
if err != nil {
return false, err
}
ok, err := exists(bs.driver, path)
if err != nil {
return false, err
}
return ok, nil
}
// get retrieves the blob by digest, returning it a byte slice. This should
// only be used for small objects.
func (bs *blobStore) get(dgst digest.Digest) ([]byte, error) {
bp, err := bs.path(dgst)
if err != nil {
return nil, err
}
return bs.driver.GetContent(bp)
}
// link links the path to the provided digest by writing the digest into the
// target file.
func (bs *blobStore) link(path string, dgst digest.Digest) error {
if exists, err := bs.exists(dgst); err != nil {
return err
} else if !exists {
return fmt.Errorf("cannot link non-existent blob")
}
// The contents of the "link" file are the exact string contents of the
// digest, which is specified in that package.
return bs.driver.PutContent(path, []byte(dgst))
}
// linked reads the link at path and returns the content.
func (bs *blobStore) linked(path string) ([]byte, error) {
linked, err := bs.readlink(path)
if err != nil {
return nil, err
}
return bs.get(linked)
}
// readlink returns the linked digest at path.
func (bs *blobStore) readlink(path string) (digest.Digest, error) {
content, err := bs.driver.GetContent(path)
if err != nil {
return "", err
}
linked, err := digest.ParseDigest(string(content))
if err != nil {
return "", err
}
if exists, err := bs.exists(linked); err != nil {
return "", err
} else if !exists {
return "", fmt.Errorf("link %q invalid: blob %s does not exist", path, linked)
}
return linked, nil
}
// resolve reads the digest link at path and returns the blob store link.
func (bs *blobStore) resolve(path string) (string, error) {
dgst, err := bs.readlink(path)
if err != nil {
return "", err
}
return bs.path(dgst)
}
// put stores the content p in the blob store, calculating the digest. If the
// content is already present, only the digest will be returned. This should
// only be used for small objects, such as manifests.
func (bs *blobStore) put(p []byte) (digest.Digest, error) {
dgst, err := digest.FromBytes(p)
if err != nil {
ctxu.GetLogger(bs.ctx).Errorf("error digesting content: %v, %s", err, string(p))
return "", err
}
bp, err := bs.path(dgst)
if err != nil {
return "", err
}
// If the content already exists, just return the digest.
if exists, err := bs.exists(dgst); err != nil {
return "", err
} else if exists {
return dgst, nil
}
return dgst, bs.driver.PutContent(bp, p)
}
// path returns the canonical path for the blob identified by digest. The blob
// may or may not exist.
func (bs *blobStore) path(dgst digest.Digest) (string, error) {
bp, err := bs.pm.path(blobDataPathSpec{
digest: dgst,
})
if err != nil {
return "", err
}
return bp, nil
}
// exists provides a utility method to test whether or not
func exists(driver storagedriver.StorageDriver, path string) (bool, error) {
if _, err := driver.Stat(path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
return false, nil
default:
return false, err
}
}
return true, nil
}

View file

@ -0,0 +1,122 @@
package storage
import (
"crypto/x509"
"encoding/pem"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/AdRoll/goamz/cloudfront"
"github.com/docker/distribution/storagedriver"
)
// cloudFrontLayerHandler provides an simple implementation of layerHandler that
// constructs temporary signed CloudFront URLs from the storagedriver layer URL,
// then issues HTTP Temporary Redirects to this CloudFront content URL.
type cloudFrontLayerHandler struct {
cloudfront *cloudfront.CloudFront
delegateLayerHandler *delegateLayerHandler
duration time.Duration
}
var _ LayerHandler = &cloudFrontLayerHandler{}
// newCloudFrontLayerHandler constructs and returns a new CloudFront
// LayerHandler implementation.
// Required options: baseurl, privatekey, keypairid
func newCloudFrontLayerHandler(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error) {
base, ok := options["baseurl"]
if !ok {
return nil, fmt.Errorf("No baseurl provided")
}
baseURL, ok := base.(string)
if !ok {
return nil, fmt.Errorf("baseurl must be a string")
}
pk, ok := options["privatekey"]
if !ok {
return nil, fmt.Errorf("No privatekey provided")
}
pkPath, ok := pk.(string)
if !ok {
return nil, fmt.Errorf("privatekey must be a string")
}
kpid, ok := options["keypairid"]
if !ok {
return nil, fmt.Errorf("No keypairid provided")
}
keypairID, ok := kpid.(string)
if !ok {
return nil, fmt.Errorf("keypairid must be a string")
}
pkBytes, err := ioutil.ReadFile(pkPath)
if err != nil {
return nil, fmt.Errorf("Failed to read privatekey file: %s", err)
}
block, _ := pem.Decode([]byte(pkBytes))
if block == nil {
return nil, fmt.Errorf("Failed to decode private key as an rsa private key")
}
privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes)
if err != nil {
return nil, err
}
lh, err := newDelegateLayerHandler(storageDriver, options)
if err != nil {
return nil, err
}
dlh := lh.(*delegateLayerHandler)
cf := cloudfront.New(baseURL, privateKey, keypairID)
duration := 20 * time.Minute
d, ok := options["duration"]
if ok {
switch d := d.(type) {
case time.Duration:
duration = d
case string:
dur, err := time.ParseDuration(d)
if err != nil {
return nil, fmt.Errorf("Invalid duration: %s", err)
}
duration = dur
}
}
return &cloudFrontLayerHandler{cloudfront: cf, delegateLayerHandler: dlh, duration: duration}, nil
}
// Resolve returns an http.Handler which can serve the contents of the given
// Layer, or an error if not supported by the storagedriver.
func (lh *cloudFrontLayerHandler) Resolve(layer Layer) (http.Handler, error) {
layerURLStr, err := lh.delegateLayerHandler.urlFor(layer, nil)
if err != nil {
return nil, err
}
layerURL, err := url.Parse(layerURLStr)
if err != nil {
return nil, err
}
cfURL, err := lh.cloudfront.CannedSignedURL(layerURL.Path, "", time.Now().Add(lh.duration))
if err != nil {
return nil, err
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, cfURL, http.StatusTemporaryRedirect)
}), nil
}
// init registers the cloudfront layerHandler backend.
func init() {
RegisterLayerHandler("cloudfront", LayerHandlerInitFunc(newCloudFrontLayerHandler))
}

View file

@ -0,0 +1,94 @@
package storage
import (
"fmt"
"net/http"
"time"
"github.com/docker/distribution/storagedriver"
)
// delegateLayerHandler provides a simple implementation of layerHandler that
// simply issues HTTP Temporary Redirects to the URL provided by the
// storagedriver for a given Layer.
type delegateLayerHandler struct {
storageDriver storagedriver.StorageDriver
pathMapper *pathMapper
duration time.Duration
}
var _ LayerHandler = &delegateLayerHandler{}
func newDelegateLayerHandler(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error) {
duration := 20 * time.Minute
d, ok := options["duration"]
if ok {
switch d := d.(type) {
case time.Duration:
duration = d
case string:
dur, err := time.ParseDuration(d)
if err != nil {
return nil, fmt.Errorf("Invalid duration: %s", err)
}
duration = dur
}
}
return &delegateLayerHandler{storageDriver: storageDriver, pathMapper: defaultPathMapper, duration: duration}, nil
}
// Resolve returns an http.Handler which can serve the contents of the given
// Layer, or an error if not supported by the storagedriver.
func (lh *delegateLayerHandler) Resolve(layer Layer) (http.Handler, error) {
// TODO(bbland): This is just a sanity check to ensure that the
// storagedriver supports url generation. It would be nice if we didn't have
// to do this twice for non-GET requests.
layerURL, err := lh.urlFor(layer, map[string]interface{}{"method": "GET"})
if err != nil {
return nil, err
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
layerURL, err = lh.urlFor(layer, map[string]interface{}{"method": r.Method})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
http.Redirect(w, r, layerURL, http.StatusTemporaryRedirect)
}), nil
}
// urlFor returns a download URL for the given layer, or the empty string if
// unsupported.
func (lh *delegateLayerHandler) urlFor(layer Layer, options map[string]interface{}) (string, error) {
// Crack open the layer to get at the layerStore
layerRd, ok := layer.(*layerReader)
if !ok {
// TODO(stevvooe): We probably want to find a better way to get at the
// underlying filesystem path for a given layer. Perhaps, the layer
// handler should have its own layer store but right now, it is not
// request scoped.
return "", fmt.Errorf("unsupported layer type: cannot resolve blob path: %v", layer)
}
if options == nil {
options = make(map[string]interface{})
}
options["expiry"] = time.Now().Add(lh.duration)
layerURL, err := lh.storageDriver.URLFor(layerRd.path, options)
if err != nil {
return "", err
}
return layerURL, nil
}
// init registers the delegate layerHandler backend.
func init() {
RegisterLayerHandler("delegate", LayerHandlerInitFunc(newDelegateLayerHandler))
}

3
docs/storage/doc.go Normal file
View file

@ -0,0 +1,3 @@
// Package storage contains storage services for use in the registry
// application. It should be considered an internal package, as of Go 1.4.
package storage

201
docs/storage/filereader.go Normal file
View file

@ -0,0 +1,201 @@
package storage
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"time"
"github.com/docker/distribution/storagedriver"
)
// TODO(stevvooe): Set an optimal buffer size here. We'll have to
// understand the latency characteristics of the underlying network to
// set this correctly, so we may want to leave it to the driver. For
// out of process drivers, we'll have to optimize this buffer size for
// local communication.
const fileReaderBufferSize = 4 << 20
// remoteFileReader provides a read seeker interface to files stored in
// storagedriver. Used to implement part of layer interface and will be used
// to implement read side of LayerUpload.
type fileReader struct {
driver storagedriver.StorageDriver
// identifying fields
path string
size int64 // size is the total layer size, must be set.
modtime time.Time
// mutable fields
rc io.ReadCloser // remote read closer
brd *bufio.Reader // internal buffered io
offset int64 // offset is the current read offset
err error // terminal error, if set, reader is closed
}
// newFileReader initializes a file reader for the remote file. The read takes
// on the offset and size at the time the reader is created. If the underlying
// file changes, one must create a new fileReader.
func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) {
rd := &fileReader{
driver: driver,
path: path,
}
// Grab the size of the layer file, ensuring existence.
if fi, err := driver.Stat(path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// NOTE(stevvooe): We really don't care if the file is not
// actually present for the reader. If the caller needs to know
// whether or not the file exists, they should issue a stat call
// on the path. There is still no guarantee, since the file may be
// gone by the time the reader is created. The only correct
// behavior is to return a reader that immediately returns EOF.
default:
// Any other error we want propagated up the stack.
return nil, err
}
} else {
if fi.IsDir() {
return nil, fmt.Errorf("cannot read a directory")
}
// Fill in file information
rd.size = fi.Size()
rd.modtime = fi.ModTime()
}
return rd, nil
}
func (fr *fileReader) Read(p []byte) (n int, err error) {
if fr.err != nil {
return 0, fr.err
}
rd, err := fr.reader()
if err != nil {
return 0, err
}
n, err = rd.Read(p)
fr.offset += int64(n)
// Simulate io.EOR error if we reach filesize.
if err == nil && fr.offset >= fr.size {
err = io.EOF
}
return n, err
}
func (fr *fileReader) Seek(offset int64, whence int) (int64, error) {
if fr.err != nil {
return 0, fr.err
}
var err error
newOffset := fr.offset
switch whence {
case os.SEEK_CUR:
newOffset += int64(offset)
case os.SEEK_END:
newOffset = fr.size + int64(offset)
case os.SEEK_SET:
newOffset = int64(offset)
}
if newOffset < 0 {
err = fmt.Errorf("cannot seek to negative position")
} else {
if fr.offset != newOffset {
fr.reset()
}
// No problems, set the offset.
fr.offset = newOffset
}
return fr.offset, err
}
// Close the layer. Should be called when the resource is no longer needed.
func (fr *fileReader) Close() error {
if fr.err != nil {
return fr.err
}
fr.err = ErrLayerClosed
// close and release reader chain
if fr.rc != nil {
fr.rc.Close()
}
fr.rc = nil
fr.brd = nil
return fr.err
}
// reader prepares the current reader at the lrs offset, ensuring its buffered
// and ready to go.
func (fr *fileReader) reader() (io.Reader, error) {
if fr.err != nil {
return nil, fr.err
}
if fr.rc != nil {
return fr.brd, nil
}
// If we don't have a reader, open one up.
rc, err := fr.driver.ReadStream(fr.path, fr.offset)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// NOTE(stevvooe): If the path is not found, we simply return a
// reader that returns io.EOF. However, we do not set fr.rc,
// allowing future attempts at getting a reader to possibly
// succeed if the file turns up later.
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
default:
return nil, err
}
}
fr.rc = rc
if fr.brd == nil {
// TODO(stevvooe): Set an optimal buffer size here. We'll have to
// understand the latency characteristics of the underlying network to
// set this correctly, so we may want to leave it to the driver. For
// out of process drivers, we'll have to optimize this buffer size for
// local communication.
fr.brd = bufio.NewReaderSize(fr.rc, fileReaderBufferSize)
} else {
fr.brd.Reset(fr.rc)
}
return fr.brd, nil
}
// resetReader resets the reader, forcing the read method to open up a new
// connection and rebuild the buffered reader. This should be called when the
// offset and the reader will become out of sync, such as during a seek
// operation.
func (fr *fileReader) reset() {
if fr.err != nil {
return
}
if fr.rc != nil {
fr.rc.Close()
fr.rc = nil
}
}

View file

@ -0,0 +1,193 @@
package storage
import (
"bytes"
"crypto/rand"
"io"
mrand "math/rand"
"os"
"testing"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/storagedriver/inmemory"
)
func TestSimpleRead(t *testing.T) {
content := make([]byte, 1<<20)
n, err := rand.Read(content)
if err != nil {
t.Fatalf("unexpected error building random data: %v", err)
}
if n != len(content) {
t.Fatalf("random read did't fill buffer")
}
dgst, err := digest.FromReader(bytes.NewReader(content))
if err != nil {
t.Fatalf("unexpected error digesting random content: %v", err)
}
driver := inmemory.New()
path := "/random"
if err := driver.PutContent(path, content); err != nil {
t.Fatalf("error putting patterned content: %v", err)
}
fr, err := newFileReader(driver, path)
if err != nil {
t.Fatalf("error allocating file reader: %v", err)
}
verifier := digest.NewDigestVerifier(dgst)
io.Copy(verifier, fr)
if !verifier.Verified() {
t.Fatalf("unable to verify read data")
}
}
func TestFileReaderSeek(t *testing.T) {
driver := inmemory.New()
pattern := "01234567890ab" // prime length block
repititions := 1024
path := "/patterned"
content := bytes.Repeat([]byte(pattern), repititions)
if err := driver.PutContent(path, content); err != nil {
t.Fatalf("error putting patterned content: %v", err)
}
fr, err := newFileReader(driver, path)
if err != nil {
t.Fatalf("unexpected error creating file reader: %v", err)
}
// Seek all over the place, in blocks of pattern size and make sure we get
// the right data.
for _, repitition := range mrand.Perm(repititions - 1) {
targetOffset := int64(len(pattern) * repitition)
// Seek to a multiple of pattern size and read pattern size bytes
offset, err := fr.Seek(targetOffset, os.SEEK_SET)
if err != nil {
t.Fatalf("unexpected error seeking: %v", err)
}
if offset != targetOffset {
t.Fatalf("did not seek to correct offset: %d != %d", offset, targetOffset)
}
p := make([]byte, len(pattern))
n, err := fr.Read(p)
if err != nil {
t.Fatalf("error reading pattern: %v", err)
}
if n != len(pattern) {
t.Fatalf("incorrect read length: %d != %d", n, len(pattern))
}
if string(p) != pattern {
t.Fatalf("incorrect read content: %q != %q", p, pattern)
}
// Check offset
current, err := fr.Seek(0, os.SEEK_CUR)
if err != nil {
t.Fatalf("error checking current offset: %v", err)
}
if current != targetOffset+int64(len(pattern)) {
t.Fatalf("unexpected offset after read: %v", err)
}
}
start, err := fr.Seek(0, os.SEEK_SET)
if err != nil {
t.Fatalf("error seeking to start: %v", err)
}
if start != 0 {
t.Fatalf("expected to seek to start: %v != 0", start)
}
end, err := fr.Seek(0, os.SEEK_END)
if err != nil {
t.Fatalf("error checking current offset: %v", err)
}
if end != int64(len(content)) {
t.Fatalf("expected to seek to end: %v != %v", end, len(content))
}
// 4. Seek before start, ensure error.
// seek before start
before, err := fr.Seek(-1, os.SEEK_SET)
if err == nil {
t.Fatalf("error expected, returned offset=%v", before)
}
// 5. Seek after end,
after, err := fr.Seek(1, os.SEEK_END)
if err != nil {
t.Fatalf("unexpected error expected, returned offset=%v", after)
}
p := make([]byte, 16)
n, err := fr.Read(p)
if n != 0 {
t.Fatalf("bytes reads %d != %d", n, 0)
}
if err != io.EOF {
t.Fatalf("expected io.EOF, got %v", err)
}
}
// TestFileReaderNonExistentFile ensures the reader behaves as expected with a
// missing or zero-length remote file. While the file may not exist, the
// reader should not error out on creation and should return 0-bytes from the
// read method, with an io.EOF error.
func TestFileReaderNonExistentFile(t *testing.T) {
driver := inmemory.New()
fr, err := newFileReader(driver, "/doesnotexist")
if err != nil {
t.Fatalf("unexpected error initializing reader: %v", err)
}
var buf [1024]byte
n, err := fr.Read(buf[:])
if n != 0 {
t.Fatalf("non-zero byte read reported: %d != 0", n)
}
if err != io.EOF {
t.Fatalf("read on missing file should return io.EOF, got %v", err)
}
}
// TestLayerReadErrors covers the various error return type for different
// conditions that can arise when reading a layer.
func TestFileReaderErrors(t *testing.T) {
// TODO(stevvooe): We need to cover error return types, driven by the
// errors returned via the HTTP API. For now, here is a incomplete list:
//
// 1. Layer Not Found: returned when layer is not found or access is
// denied.
// 2. Layer Unavailable: returned when link references are unresolved,
// but layer is known to the registry.
// 3. Layer Invalid: This may more split into more errors, but should be
// returned when name or tarsum does not reference a valid error. We
// may also need something to communication layer verification errors
// for the inline tarsum check.
// 4. Timeout: timeouts to backend. Need to better understand these
// failure cases and how the storage driver propagates these errors
// up the stack.
}

150
docs/storage/filewriter.go Normal file
View file

@ -0,0 +1,150 @@
package storage
import (
"bytes"
"fmt"
"io"
"os"
"github.com/docker/distribution/storagedriver"
)
// fileWriter implements a remote file writer backed by a storage driver.
type fileWriter struct {
driver storagedriver.StorageDriver
// identifying fields
path string
// mutable fields
size int64 // size of the file, aka the current end
offset int64 // offset is the current write offset
err error // terminal error, if set, reader is closed
}
// fileWriterInterface makes the desired io compliant interface that the
// filewriter should implement.
type fileWriterInterface interface {
io.WriteSeeker
io.WriterAt
io.ReaderFrom
io.Closer
}
var _ fileWriterInterface = &fileWriter{}
// newFileWriter returns a prepared fileWriter for the driver and path. This
// could be considered similar to an "open" call on a regular filesystem.
func newFileWriter(driver storagedriver.StorageDriver, path string) (*fileWriter, error) {
fw := fileWriter{
driver: driver,
path: path,
}
if fi, err := driver.Stat(path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// ignore, offset is zero
default:
return nil, err
}
} else {
if fi.IsDir() {
return nil, fmt.Errorf("cannot write to a directory")
}
fw.size = fi.Size()
}
return &fw, nil
}
// Write writes the buffer p at the current write offset.
func (fw *fileWriter) Write(p []byte) (n int, err error) {
nn, err := fw.readFromAt(bytes.NewReader(p), -1)
return int(nn), err
}
// WriteAt writes p at the specified offset. The underlying offset does not
// change.
func (fw *fileWriter) WriteAt(p []byte, offset int64) (n int, err error) {
nn, err := fw.readFromAt(bytes.NewReader(p), offset)
return int(nn), err
}
// ReadFrom reads reader r until io.EOF writing the contents at the current
// offset.
func (fw *fileWriter) ReadFrom(r io.Reader) (n int64, err error) {
return fw.readFromAt(r, -1)
}
// Seek moves the write position do the requested offest based on the whence
// argument, which can be os.SEEK_CUR, os.SEEK_END, or os.SEEK_SET.
func (fw *fileWriter) Seek(offset int64, whence int) (int64, error) {
if fw.err != nil {
return 0, fw.err
}
var err error
newOffset := fw.offset
switch whence {
case os.SEEK_CUR:
newOffset += int64(offset)
case os.SEEK_END:
newOffset = fw.size + int64(offset)
case os.SEEK_SET:
newOffset = int64(offset)
}
if newOffset < 0 {
err = fmt.Errorf("cannot seek to negative position")
} else {
// No problems, set the offset.
fw.offset = newOffset
}
return fw.offset, err
}
// Close closes the fileWriter for writing.
func (fw *fileWriter) Close() error {
if fw.err != nil {
return fw.err
}
fw.err = fmt.Errorf("filewriter@%v: closed", fw.path)
return fw.err
}
// readFromAt writes to fw from r at the specified offset. If offset is less
// than zero, the value of fw.offset is used and updated after the operation.
func (fw *fileWriter) readFromAt(r io.Reader, offset int64) (n int64, err error) {
if fw.err != nil {
return 0, fw.err
}
var updateOffset bool
if offset < 0 {
offset = fw.offset
updateOffset = true
}
nn, err := fw.driver.WriteStream(fw.path, offset, r)
if updateOffset {
// We should forward the offset, whether or not there was an error.
// Basically, we keep the filewriter in sync with the reader's head. If an
// error is encountered, the whole thing should be retried but we proceed
// from an expected offset, even if the data didn't make it to the
// backend.
fw.offset += nn
if fw.offset > fw.size {
fw.size = fw.offset
}
}
return nn, err
}

View file

@ -0,0 +1,148 @@
package storage
import (
"bytes"
"crypto/rand"
"io"
"os"
"testing"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/storagedriver/inmemory"
)
// TestSimpleWrite takes the fileWriter through common write operations
// ensuring data integrity.
func TestSimpleWrite(t *testing.T) {
content := make([]byte, 1<<20)
n, err := rand.Read(content)
if err != nil {
t.Fatalf("unexpected error building random data: %v", err)
}
if n != len(content) {
t.Fatalf("random read did't fill buffer")
}
dgst, err := digest.FromReader(bytes.NewReader(content))
if err != nil {
t.Fatalf("unexpected error digesting random content: %v", err)
}
driver := inmemory.New()
path := "/random"
fw, err := newFileWriter(driver, path)
if err != nil {
t.Fatalf("unexpected error creating fileWriter: %v", err)
}
defer fw.Close()
n, err = fw.Write(content)
if err != nil {
t.Fatalf("unexpected error writing content: %v", err)
}
if n != len(content) {
t.Fatalf("unexpected write length: %d != %d", n, len(content))
}
fr, err := newFileReader(driver, path)
if err != nil {
t.Fatalf("unexpected error creating fileReader: %v", err)
}
defer fr.Close()
verifier := digest.NewDigestVerifier(dgst)
io.Copy(verifier, fr)
if !verifier.Verified() {
t.Fatalf("unable to verify write data")
}
// Check the seek position is equal to the content length
end, err := fw.Seek(0, os.SEEK_END)
if err != nil {
t.Fatalf("unexpected error seeking: %v", err)
}
if end != int64(len(content)) {
t.Fatalf("write did not advance offset: %d != %d", end, len(content))
}
// Double the content, but use the WriteAt method
doubled := append(content, content...)
doubledgst, err := digest.FromReader(bytes.NewReader(doubled))
if err != nil {
t.Fatalf("unexpected error digesting doubled content: %v", err)
}
n, err = fw.WriteAt(content, end)
if err != nil {
t.Fatalf("unexpected error writing content at %d: %v", end, err)
}
if n != len(content) {
t.Fatalf("writeat was short: %d != %d", n, len(content))
}
fr, err = newFileReader(driver, path)
if err != nil {
t.Fatalf("unexpected error creating fileReader: %v", err)
}
defer fr.Close()
verifier = digest.NewDigestVerifier(doubledgst)
io.Copy(verifier, fr)
if !verifier.Verified() {
t.Fatalf("unable to verify write data")
}
// Check that WriteAt didn't update the offset.
end, err = fw.Seek(0, os.SEEK_END)
if err != nil {
t.Fatalf("unexpected error seeking: %v", err)
}
if end != int64(len(content)) {
t.Fatalf("write did not advance offset: %d != %d", end, len(content))
}
// Now, we copy from one path to another, running the data through the
// fileReader to fileWriter, rather than the driver.Move command to ensure
// everything is working correctly.
fr, err = newFileReader(driver, path)
if err != nil {
t.Fatalf("unexpected error creating fileReader: %v", err)
}
defer fr.Close()
fw, err = newFileWriter(driver, "/copied")
if err != nil {
t.Fatalf("unexpected error creating fileWriter: %v", err)
}
defer fw.Close()
nn, err := io.Copy(fw, fr)
if err != nil {
t.Fatalf("unexpected error copying data: %v", err)
}
if nn != int64(len(doubled)) {
t.Fatalf("unexpected copy length: %d != %d", nn, len(doubled))
}
fr, err = newFileReader(driver, "/copied")
if err != nil {
t.Fatalf("unexpected error creating fileReader: %v", err)
}
defer fr.Close()
verifier = digest.NewDigestVerifier(doubledgst)
io.Copy(verifier, fr)
if !verifier.Verified() {
t.Fatalf("unable to verify write data")
}
}

90
docs/storage/layer.go Normal file
View file

@ -0,0 +1,90 @@
package storage
import (
"fmt"
"io"
"time"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
)
// Layer provides a readable and seekable layer object. Typically,
// implementations are *not* goroutine safe.
type Layer interface {
// http.ServeContent requires an efficient implementation of
// ReadSeeker.Seek(0, os.SEEK_END).
io.ReadSeeker
io.Closer
// Name returns the repository under which this layer is linked.
Name() string // TODO(stevvooe): struggling with nomenclature: should this be "repo" or "name"?
// Digest returns the unique digest of the blob, which is the tarsum for
// layers.
Digest() digest.Digest
// CreatedAt returns the time this layer was created.
CreatedAt() time.Time
}
// LayerUpload provides a handle for working with in-progress uploads.
// Instances can be obtained from the LayerService.Upload and
// LayerService.Resume.
type LayerUpload interface {
io.WriteSeeker
io.ReaderFrom
io.Closer
// Name of the repository under which the layer will be linked.
Name() string
// UUID returns the identifier for this upload.
UUID() string
// StartedAt returns the time this layer upload was started.
StartedAt() time.Time
// Finish marks the upload as completed, returning a valid handle to the
// uploaded layer. The digest is validated against the contents of the
// uploaded layer.
Finish(digest digest.Digest) (Layer, error)
// Cancel the layer upload process.
Cancel() error
}
var (
// ErrLayerExists returned when layer already exists
ErrLayerExists = fmt.Errorf("layer exists")
// ErrLayerTarSumVersionUnsupported when tarsum is unsupported version.
ErrLayerTarSumVersionUnsupported = fmt.Errorf("unsupported tarsum version")
// ErrLayerUploadUnknown returned when upload is not found.
ErrLayerUploadUnknown = fmt.Errorf("layer upload unknown")
// ErrLayerClosed returned when an operation is attempted on a closed
// Layer or LayerUpload.
ErrLayerClosed = fmt.Errorf("layer closed")
)
// ErrUnknownLayer returned when layer cannot be found.
type ErrUnknownLayer struct {
FSLayer manifest.FSLayer
}
func (err ErrUnknownLayer) Error() string {
return fmt.Sprintf("unknown layer %v", err.FSLayer.BlobSum)
}
// ErrLayerInvalidDigest returned when tarsum check fails.
type ErrLayerInvalidDigest struct {
Digest digest.Digest
Reason error
}
func (err ErrLayerInvalidDigest) Error() string {
return fmt.Sprintf("invalid digest for referenced layer: %v, %v",
err.Digest, err.Reason)
}

364
docs/storage/layer_test.go Normal file
View file

@ -0,0 +1,364 @@
package storage
import (
"bytes"
"crypto/sha256"
"fmt"
"io"
"io/ioutil"
"os"
"testing"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/storagedriver"
"github.com/docker/distribution/storagedriver/inmemory"
"github.com/docker/distribution/testutil"
"golang.org/x/net/context"
)
// TestSimpleLayerUpload covers the layer upload process, exercising common
// error paths that might be seen during an upload.
func TestSimpleLayerUpload(t *testing.T) {
randomDataReader, tarSumStr, err := testutil.CreateRandomTarFile()
if err != nil {
t.Fatalf("error creating random reader: %v", err)
}
dgst := digest.Digest(tarSumStr)
if err != nil {
t.Fatalf("error allocating upload store: %v", err)
}
ctx := context.Background()
imageName := "foo/bar"
driver := inmemory.New()
registry := NewRegistryWithDriver(driver)
ls := registry.Repository(ctx, imageName).Layers()
h := sha256.New()
rd := io.TeeReader(randomDataReader, h)
layerUpload, err := ls.Upload()
if err != nil {
t.Fatalf("unexpected error starting layer upload: %s", err)
}
// Cancel the upload then restart it
if err := layerUpload.Cancel(); err != nil {
t.Fatalf("unexpected error during upload cancellation: %v", err)
}
// Do a resume, get unknown upload
layerUpload, err = ls.Resume(layerUpload.UUID())
if err != ErrLayerUploadUnknown {
t.Fatalf("unexpected error resuming upload, should be unkown: %v", err)
}
// Restart!
layerUpload, err = ls.Upload()
if err != nil {
t.Fatalf("unexpected error starting layer upload: %s", err)
}
// Get the size of our random tarfile
randomDataSize, err := seekerSize(randomDataReader)
if err != nil {
t.Fatalf("error getting seeker size of random data: %v", err)
}
nn, err := io.Copy(layerUpload, rd)
if err != nil {
t.Fatalf("unexpected error uploading layer data: %v", err)
}
if nn != randomDataSize {
t.Fatalf("layer data write incomplete")
}
offset, err := layerUpload.Seek(0, os.SEEK_CUR)
if err != nil {
t.Fatalf("unexpected error seeking layer upload: %v", err)
}
if offset != nn {
t.Fatalf("layerUpload not updated with correct offset: %v != %v", offset, nn)
}
layerUpload.Close()
// Do a resume, for good fun
layerUpload, err = ls.Resume(layerUpload.UUID())
if err != nil {
t.Fatalf("unexpected error resuming upload: %v", err)
}
sha256Digest := digest.NewDigest("sha256", h)
layer, err := layerUpload.Finish(dgst)
if err != nil {
t.Fatalf("unexpected error finishing layer upload: %v", err)
}
// After finishing an upload, it should no longer exist.
if _, err := ls.Resume(layerUpload.UUID()); err != ErrLayerUploadUnknown {
t.Fatalf("expected layer upload to be unknown, got %v", err)
}
// Test for existence.
exists, err := ls.Exists(layer.Digest())
if err != nil {
t.Fatalf("unexpected error checking for existence: %v", err)
}
if !exists {
t.Fatalf("layer should now exist")
}
h.Reset()
nn, err = io.Copy(h, layer)
if err != nil {
t.Fatalf("error reading layer: %v", err)
}
if nn != randomDataSize {
t.Fatalf("incorrect read length")
}
if digest.NewDigest("sha256", h) != sha256Digest {
t.Fatalf("unexpected digest from uploaded layer: %q != %q", digest.NewDigest("sha256", h), sha256Digest)
}
}
// TestSimpleLayerRead just creates a simple layer file and ensures that basic
// open, read, seek, read works. More specific edge cases should be covered in
// other tests.
func TestSimpleLayerRead(t *testing.T) {
ctx := context.Background()
imageName := "foo/bar"
driver := inmemory.New()
registry := NewRegistryWithDriver(driver)
ls := registry.Repository(ctx, imageName).Layers()
randomLayerReader, tarSumStr, err := testutil.CreateRandomTarFile()
if err != nil {
t.Fatalf("error creating random data: %v", err)
}
dgst := digest.Digest(tarSumStr)
// Test for existence.
exists, err := ls.Exists(dgst)
if err != nil {
t.Fatalf("unexpected error checking for existence: %v", err)
}
if exists {
t.Fatalf("layer should not exist")
}
// Try to get the layer and make sure we get a not found error
layer, err := ls.Fetch(dgst)
if err == nil {
t.Fatalf("error expected fetching unknown layer")
}
switch err.(type) {
case ErrUnknownLayer:
err = nil
default:
t.Fatalf("unexpected error fetching non-existent layer: %v", err)
}
randomLayerDigest, err := writeTestLayer(driver, ls.(*layerStore).repository.pm, imageName, dgst, randomLayerReader)
if err != nil {
t.Fatalf("unexpected error writing test layer: %v", err)
}
randomLayerSize, err := seekerSize(randomLayerReader)
if err != nil {
t.Fatalf("error getting seeker size for random layer: %v", err)
}
layer, err = ls.Fetch(dgst)
if err != nil {
t.Fatal(err)
}
defer layer.Close()
// Now check the sha digest and ensure its the same
h := sha256.New()
nn, err := io.Copy(h, layer)
if err != nil && err != io.EOF {
t.Fatalf("unexpected error copying to hash: %v", err)
}
if nn != randomLayerSize {
t.Fatalf("stored incorrect number of bytes in layer: %d != %d", nn, randomLayerSize)
}
sha256Digest := digest.NewDigest("sha256", h)
if sha256Digest != randomLayerDigest {
t.Fatalf("fetched digest does not match: %q != %q", sha256Digest, randomLayerDigest)
}
// Now seek back the layer, read the whole thing and check against randomLayerData
offset, err := layer.Seek(0, os.SEEK_SET)
if err != nil {
t.Fatalf("error seeking layer: %v", err)
}
if offset != 0 {
t.Fatalf("seek failed: expected 0 offset, got %d", offset)
}
p, err := ioutil.ReadAll(layer)
if err != nil {
t.Fatalf("error reading all of layer: %v", err)
}
if len(p) != int(randomLayerSize) {
t.Fatalf("layer data read has different length: %v != %v", len(p), randomLayerSize)
}
// Reset the randomLayerReader and read back the buffer
_, err = randomLayerReader.Seek(0, os.SEEK_SET)
if err != nil {
t.Fatalf("error resetting layer reader: %v", err)
}
randomLayerData, err := ioutil.ReadAll(randomLayerReader)
if err != nil {
t.Fatalf("random layer read failed: %v", err)
}
if !bytes.Equal(p, randomLayerData) {
t.Fatalf("layer data not equal")
}
}
// TestLayerUploadZeroLength uploads zero-length
func TestLayerUploadZeroLength(t *testing.T) {
ctx := context.Background()
imageName := "foo/bar"
driver := inmemory.New()
registry := NewRegistryWithDriver(driver)
ls := registry.Repository(ctx, imageName).Layers()
upload, err := ls.Upload()
if err != nil {
t.Fatalf("unexpected error starting upload: %v", err)
}
io.Copy(upload, bytes.NewReader([]byte{}))
dgst, err := digest.FromTarArchive(bytes.NewReader([]byte{}))
if err != nil {
t.Fatalf("error getting zero digest: %v", err)
}
if dgst != digest.DigestTarSumV1EmptyTar {
// sanity check on zero digest
t.Fatalf("digest not as expected: %v != %v", dgst, digest.DigestTarSumV1EmptyTar)
}
layer, err := upload.Finish(dgst)
if err != nil {
t.Fatalf("unexpected error finishing upload: %v", err)
}
if layer.Digest() != dgst {
t.Fatalf("unexpected digest: %v != %v", layer.Digest(), dgst)
}
}
// writeRandomLayer creates a random layer under name and tarSum using driver
// and pathMapper. An io.ReadSeeker with the data is returned, along with the
// sha256 hex digest.
func writeRandomLayer(driver storagedriver.StorageDriver, pathMapper *pathMapper, name string) (rs io.ReadSeeker, tarSum digest.Digest, sha256digest digest.Digest, err error) {
reader, tarSumStr, err := testutil.CreateRandomTarFile()
if err != nil {
return nil, "", "", err
}
tarSum = digest.Digest(tarSumStr)
// Now, actually create the layer.
randomLayerDigest, err := writeTestLayer(driver, pathMapper, name, tarSum, ioutil.NopCloser(reader))
if _, err := reader.Seek(0, os.SEEK_SET); err != nil {
return nil, "", "", err
}
return reader, tarSum, randomLayerDigest, err
}
// seekerSize seeks to the end of seeker, checks the size and returns it to
// the original state, returning the size. The state of the seeker should be
// treated as unknown if an error is returned.
func seekerSize(seeker io.ReadSeeker) (int64, error) {
current, err := seeker.Seek(0, os.SEEK_CUR)
if err != nil {
return 0, err
}
end, err := seeker.Seek(0, os.SEEK_END)
if err != nil {
return 0, err
}
resumed, err := seeker.Seek(current, os.SEEK_SET)
if err != nil {
return 0, err
}
if resumed != current {
return 0, fmt.Errorf("error returning seeker to original state, could not seek back to original location")
}
return end, nil
}
// createTestLayer creates a simple test layer in the provided driver under
// tarsum dgst, returning the sha256 digest location. This is implemented
// peicemeal and should probably be replaced by the uploader when it's ready.
func writeTestLayer(driver storagedriver.StorageDriver, pathMapper *pathMapper, name string, dgst digest.Digest, content io.Reader) (digest.Digest, error) {
h := sha256.New()
rd := io.TeeReader(content, h)
p, err := ioutil.ReadAll(rd)
if err != nil {
return "", nil
}
blobDigestSHA := digest.NewDigest("sha256", h)
blobPath, err := pathMapper.path(blobDataPathSpec{
digest: dgst,
})
if err := driver.PutContent(blobPath, p); err != nil {
return "", err
}
if err != nil {
return "", err
}
layerLinkPath, err := pathMapper.path(layerLinkPathSpec{
name: name,
digest: dgst,
})
if err != nil {
return "", err
}
if err := driver.PutContent(layerLinkPath, []byte(dgst)); err != nil {
return "", nil
}
return blobDigestSHA, err
}

View file

@ -0,0 +1,50 @@
package storage
import (
"fmt"
"net/http"
"github.com/docker/distribution/storagedriver"
)
// LayerHandler provides middleware for serving the contents of a Layer.
type LayerHandler interface {
// Resolve returns an http.Handler which can serve the contents of a given
// Layer if possible, or nil and an error when unsupported. This may
// directly serve the contents of the layer or issue a redirect to another
// URL hosting the content.
Resolve(layer Layer) (http.Handler, error)
}
// LayerHandlerInitFunc is the type of a LayerHandler factory function and is
// used to register the contsructor for different LayerHandler backends.
type LayerHandlerInitFunc func(storageDriver storagedriver.StorageDriver, options map[string]interface{}) (LayerHandler, error)
var layerHandlers map[string]LayerHandlerInitFunc
// RegisterLayerHandler is used to register an LayerHandlerInitFunc for
// a LayerHandler backend with the given name.
func RegisterLayerHandler(name string, initFunc LayerHandlerInitFunc) error {
if layerHandlers == nil {
layerHandlers = make(map[string]LayerHandlerInitFunc)
}
if _, exists := layerHandlers[name]; exists {
return fmt.Errorf("name already registered: %s", name)
}
layerHandlers[name] = initFunc
return nil
}
// GetLayerHandler constructs a LayerHandler
// with the given options using the named backend.
func GetLayerHandler(name string, options map[string]interface{}, storageDriver storagedriver.StorageDriver) (LayerHandler, error) {
if layerHandlers != nil {
if initFunc, exists := layerHandlers[name]; exists {
return initFunc(storageDriver, options)
}
}
return nil, fmt.Errorf("no layer handler registered with name: %s", name)
}

View file

@ -0,0 +1,30 @@
package storage
import (
"time"
"github.com/docker/distribution/digest"
)
// layerReadSeeker implements Layer and provides facilities for reading and
// seeking.
type layerReader struct {
fileReader
name string // repo name of this layer
digest digest.Digest
}
var _ Layer = &layerReader{}
func (lrs *layerReader) Name() string {
return lrs.name
}
func (lrs *layerReader) Digest() digest.Digest {
return lrs.digest
}
func (lrs *layerReader) CreatedAt() time.Time {
return lrs.modtime
}

168
docs/storage/layerstore.go Normal file
View file

@ -0,0 +1,168 @@
package storage
import (
"time"
"code.google.com/p/go-uuid/uuid"
ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/storagedriver"
)
type layerStore struct {
repository *repository
}
func (ls *layerStore) Exists(digest digest.Digest) (bool, error) {
ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Exists")
// Because this implementation just follows blob links, an existence check
// is pretty cheap by starting and closing a fetch.
_, err := ls.Fetch(digest)
if err != nil {
switch err.(type) {
case ErrUnknownLayer:
return false, nil
}
return false, err
}
return true, nil
}
func (ls *layerStore) Fetch(dgst digest.Digest) (Layer, error) {
ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Fetch")
bp, err := ls.path(dgst)
if err != nil {
return nil, err
}
fr, err := newFileReader(ls.repository.driver, bp)
if err != nil {
return nil, err
}
return &layerReader{
fileReader: *fr,
name: ls.repository.Name(),
digest: dgst,
}, nil
}
// Upload begins a layer upload, returning a handle. If the layer upload
// is already in progress or the layer has already been uploaded, this
// will return an error.
func (ls *layerStore) Upload() (LayerUpload, error) {
ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Upload")
// NOTE(stevvooe): Consider the issues with allowing concurrent upload of
// the same two layers. Should it be disallowed? For now, we allow both
// parties to proceed and the the first one uploads the layer.
uuid := uuid.New()
startedAt := time.Now().UTC()
path, err := ls.repository.registry.pm.path(uploadDataPathSpec{
name: ls.repository.Name(),
uuid: uuid,
})
if err != nil {
return nil, err
}
startedAtPath, err := ls.repository.registry.pm.path(uploadStartedAtPathSpec{
name: ls.repository.Name(),
uuid: uuid,
})
if err != nil {
return nil, err
}
// Write a startedat file for this upload
if err := ls.repository.driver.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
return nil, err
}
return ls.newLayerUpload(uuid, path, startedAt)
}
// Resume continues an in progress layer upload, returning the current
// state of the upload.
func (ls *layerStore) Resume(uuid string) (LayerUpload, error) {
ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Resume")
startedAtPath, err := ls.repository.registry.pm.path(uploadStartedAtPathSpec{
name: ls.repository.Name(),
uuid: uuid,
})
if err != nil {
return nil, err
}
startedAtBytes, err := ls.repository.driver.GetContent(startedAtPath)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
return nil, ErrLayerUploadUnknown
default:
return nil, err
}
}
startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes))
if err != nil {
return nil, err
}
path, err := ls.repository.pm.path(uploadDataPathSpec{
name: ls.repository.Name(),
uuid: uuid,
})
if err != nil {
return nil, err
}
return ls.newLayerUpload(uuid, path, startedAt)
}
// newLayerUpload allocates a new upload controller with the given state.
func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (LayerUpload, error) {
fw, err := newFileWriter(ls.repository.driver, path)
if err != nil {
return nil, err
}
return &layerUploadController{
layerStore: ls,
uuid: uuid,
startedAt: startedAt,
fileWriter: *fw,
}, nil
}
func (ls *layerStore) path(dgst digest.Digest) (string, error) {
// We must traverse this path through the link to enforce ownership.
layerLinkPath, err := ls.repository.registry.pm.path(layerLinkPathSpec{name: ls.repository.Name(), digest: dgst})
if err != nil {
return "", err
}
blobPath, err := ls.repository.blobStore.resolve(layerLinkPath)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
return "", ErrUnknownLayer{manifest.FSLayer{BlobSum: dgst}}
default:
return "", err
}
}
return blobPath, nil
}

238
docs/storage/layerupload.go Normal file
View file

@ -0,0 +1,238 @@
package storage
import (
"fmt"
"io"
"path"
"time"
"github.com/Sirupsen/logrus"
ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/storagedriver"
"github.com/docker/docker/pkg/tarsum"
)
// layerUploadController is used to control the various aspects of resumable
// layer upload. It implements the LayerUpload interface.
type layerUploadController struct {
layerStore *layerStore
uuid string
startedAt time.Time
fileWriter
}
var _ LayerUpload = &layerUploadController{}
// Name of the repository under which the layer will be linked.
func (luc *layerUploadController) Name() string {
return luc.layerStore.repository.Name()
}
// UUID returns the identifier for this upload.
func (luc *layerUploadController) UUID() string {
return luc.uuid
}
func (luc *layerUploadController) StartedAt() time.Time {
return luc.startedAt
}
// Finish marks the upload as completed, returning a valid handle to the
// uploaded layer. The final size and checksum are validated against the
// contents of the uploaded layer. The checksum should be provided in the
// format <algorithm>:<hex digest>.
func (luc *layerUploadController) Finish(digest digest.Digest) (Layer, error) {
ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Finish")
canonical, err := luc.validateLayer(digest)
if err != nil {
return nil, err
}
if err := luc.moveLayer(canonical); err != nil {
// TODO(stevvooe): Cleanup?
return nil, err
}
// Link the layer blob into the repository.
if err := luc.linkLayer(canonical); err != nil {
return nil, err
}
if err := luc.removeResources(); err != nil {
return nil, err
}
return luc.layerStore.Fetch(canonical)
}
// Cancel the layer upload process.
func (luc *layerUploadController) Cancel() error {
ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Cancel")
if err := luc.removeResources(); err != nil {
return err
}
luc.Close()
return nil
}
// validateLayer checks the layer data against the digest, returning an error
// if it does not match. The canonical digest is returned.
func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Digest, error) {
// First, check the incoming tarsum version of the digest.
version, err := tarsum.GetVersionFromTarsum(dgst.String())
if err != nil {
return "", err
}
// TODO(stevvooe): Should we push this down into the digest type?
switch version {
case tarsum.Version1:
default:
// version 0 and dev, for now.
return "", ErrLayerInvalidDigest{
Digest: dgst,
Reason: ErrLayerTarSumVersionUnsupported,
}
}
digestVerifier := digest.NewDigestVerifier(dgst)
// TODO(stevvooe): Store resumable hash calculations in upload directory
// in driver. Something like a file at path <uuid>/resumablehash/<offest>
// with the hash state up to that point would be perfect. The hasher would
// then only have to fetch the difference.
// Read the file from the backend driver and validate it.
fr, err := newFileReader(luc.fileWriter.driver, luc.path)
if err != nil {
return "", err
}
tr := io.TeeReader(fr, digestVerifier)
// TODO(stevvooe): This is one of the places we need a Digester write
// sink. Instead, its read driven. This might be okay.
// Calculate an updated digest with the latest version.
canonical, err := digest.FromTarArchive(tr)
if err != nil {
return "", err
}
if !digestVerifier.Verified() {
return "", ErrLayerInvalidDigest{
Digest: dgst,
Reason: fmt.Errorf("content does not match digest"),
}
}
return canonical, nil
}
// moveLayer moves the data into its final, hash-qualified destination,
// identified by dgst. The layer should be validated before commencing the
// move.
func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
blobPath, err := luc.layerStore.repository.registry.pm.path(blobDataPathSpec{
digest: dgst,
})
if err != nil {
return err
}
// Check for existence
if _, err := luc.driver.Stat(blobPath); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // ensure that it doesn't exist.
default:
return err
}
} else {
// If the path exists, we can assume that the content has already
// been uploaded, since the blob storage is content-addressable.
// While it may be corrupted, detection of such corruption belongs
// elsewhere.
return nil
}
// If no data was received, we may not actually have a file on disk. Check
// the size here and write a zero-length file to blobPath if this is the
// case. For the most part, this should only ever happen with zero-length
// tars.
if _, err := luc.driver.Stat(luc.path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// HACK(stevvooe): This is slightly dangerous: if we verify above,
// get a hash, then the underlying file is deleted, we risk moving
// a zero-length blob into a nonzero-length blob location. To
// prevent this horrid thing, we employ the hack of only allowing
// to this happen for the zero tarsum.
if dgst == digest.DigestTarSumV1EmptyTar {
return luc.driver.PutContent(blobPath, []byte{})
}
// We let this fail during the move below.
logrus.
WithField("upload.uuid", luc.UUID()).
WithField("digest", dgst).Warnf("attempted to move zero-length content with non-zero digest")
default:
return err // unrelated error
}
}
return luc.driver.Move(luc.path, blobPath)
}
// linkLayer links a valid, written layer blob into the registry under the
// named repository for the upload controller.
func (luc *layerUploadController) linkLayer(digest digest.Digest) error {
layerLinkPath, err := luc.layerStore.repository.registry.pm.path(layerLinkPathSpec{
name: luc.Name(),
digest: digest,
})
if err != nil {
return err
}
return luc.layerStore.repository.registry.driver.PutContent(layerLinkPath, []byte(digest))
}
// removeResources should clean up all resources associated with the upload
// instance. An error will be returned if the clean up cannot proceed. If the
// resources are already not present, no error will be returned.
func (luc *layerUploadController) removeResources() error {
dataPath, err := luc.layerStore.repository.registry.pm.path(uploadDataPathSpec{
name: luc.Name(),
uuid: luc.uuid,
})
if err != nil {
return err
}
// Resolve and delete the containing directory, which should include any
// upload related files.
dirPath := path.Dir(dataPath)
if err := luc.driver.Delete(dirPath); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // already gone!
default:
// This should be uncommon enough such that returning an error
// should be okay. At this point, the upload should be mostly
// complete, but perhaps the backend became unaccessible.
logrus.Errorf("unable to delete layer upload resources %q: %v", dirPath, err)
return err
}
}
return nil
}

View file

@ -0,0 +1,190 @@
package storage
import (
"fmt"
"strings"
ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/libtrust"
)
// ErrUnknownRepository is returned if the named repository is not known by
// the registry.
type ErrUnknownRepository struct {
Name string
}
func (err ErrUnknownRepository) Error() string {
return fmt.Sprintf("unknown respository name=%s", err.Name)
}
// ErrUnknownManifest is returned if the manifest is not known by the
// registry.
type ErrUnknownManifest struct {
Name string
Tag string
}
func (err ErrUnknownManifest) Error() string {
return fmt.Sprintf("unknown manifest name=%s tag=%s", err.Name, err.Tag)
}
// ErrUnknownManifestRevision is returned when a manifest cannot be found by
// revision within a repository.
type ErrUnknownManifestRevision struct {
Name string
Revision digest.Digest
}
func (err ErrUnknownManifestRevision) Error() string {
return fmt.Sprintf("unknown manifest name=%s revision=%s", err.Name, err.Revision)
}
// ErrManifestUnverified is returned when the registry is unable to verify
// the manifest.
type ErrManifestUnverified struct{}
func (ErrManifestUnverified) Error() string {
return fmt.Sprintf("unverified manifest")
}
// ErrManifestVerification provides a type to collect errors encountered
// during manifest verification. Currently, it accepts errors of all types,
// but it may be narrowed to those involving manifest verification.
type ErrManifestVerification []error
func (errs ErrManifestVerification) Error() string {
var parts []string
for _, err := range errs {
parts = append(parts, err.Error())
}
return fmt.Sprintf("errors verifying manifest: %v", strings.Join(parts, ","))
}
type manifestStore struct {
repository *repository
revisionStore *revisionStore
tagStore *tagStore
}
var _ ManifestService = &manifestStore{}
// func (ms *manifestStore) Repository() Repository {
// return ms.repository
// }
func (ms *manifestStore) Tags() ([]string, error) {
ctxu.GetLogger(ms.repository.ctx).Debug("(*manifestStore).Tags")
return ms.tagStore.tags()
}
func (ms *manifestStore) Exists(tag string) (bool, error) {
ctxu.GetLogger(ms.repository.ctx).Debug("(*manifestStore).Exists")
return ms.tagStore.exists(tag)
}
func (ms *manifestStore) Get(tag string) (*manifest.SignedManifest, error) {
ctxu.GetLogger(ms.repository.ctx).Debug("(*manifestStore).Get")
dgst, err := ms.tagStore.resolve(tag)
if err != nil {
return nil, err
}
return ms.revisionStore.get(dgst)
}
func (ms *manifestStore) Put(tag string, manifest *manifest.SignedManifest) error {
ctxu.GetLogger(ms.repository.ctx).Debug("(*manifestStore).Put")
// TODO(stevvooe): Add check here to see if the revision is already
// present in the repository. If it is, we should merge the signatures, do
// a shallow verify (or a full one, doesn't matter) and return an error
// indicating what happened.
// Verify the manifest.
if err := ms.verifyManifest(tag, manifest); err != nil {
return err
}
// Store the revision of the manifest
revision, err := ms.revisionStore.put(manifest)
if err != nil {
return err
}
// Now, tag the manifest
return ms.tagStore.tag(tag, revision)
}
// Delete removes all revisions of the given tag. We may want to change these
// semantics in the future, but this will maintain consistency. The underlying
// blobs are left alone.
func (ms *manifestStore) Delete(tag string) error {
ctxu.GetLogger(ms.repository.ctx).Debug("(*manifestStore).Delete")
revisions, err := ms.tagStore.revisions(tag)
if err != nil {
return err
}
for _, revision := range revisions {
if err := ms.revisionStore.delete(revision); err != nil {
return err
}
}
return ms.tagStore.delete(tag)
}
// verifyManifest ensures that the manifest content is valid from the
// perspective of the registry. It ensures that the name and tag match and
// that the signature is valid for the enclosed payload. As a policy, the
// registry only tries to store valid content, leaving trust policies of that
// content up to consumers.
func (ms *manifestStore) verifyManifest(tag string, mnfst *manifest.SignedManifest) error {
var errs ErrManifestVerification
if mnfst.Name != ms.repository.Name() {
// TODO(stevvooe): This needs to be an exported error
errs = append(errs, fmt.Errorf("repository name does not match manifest name"))
}
if mnfst.Tag != tag {
// TODO(stevvooe): This needs to be an exported error.
errs = append(errs, fmt.Errorf("tag does not match manifest tag"))
}
if _, err := manifest.Verify(mnfst); err != nil {
switch err {
case libtrust.ErrMissingSignatureKey, libtrust.ErrInvalidJSONContent, libtrust.ErrMissingSignatureKey:
errs = append(errs, ErrManifestUnverified{})
default:
if err.Error() == "invalid signature" { // TODO(stevvooe): This should be exported by libtrust
errs = append(errs, ErrManifestUnverified{})
} else {
errs = append(errs, err)
}
}
}
for _, fsLayer := range mnfst.FSLayers {
exists, err := ms.repository.Layers().Exists(fsLayer.BlobSum)
if err != nil {
errs = append(errs, err)
}
if !exists {
errs = append(errs, ErrUnknownLayer{FSLayer: fsLayer})
}
}
if len(errs) != 0 {
// TODO(stevvooe): These need to be recoverable by a caller.
return errs
}
return nil
}

View file

@ -0,0 +1,233 @@
package storage
import (
"bytes"
"io"
"reflect"
"testing"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/storagedriver/inmemory"
"github.com/docker/distribution/testutil"
"github.com/docker/libtrust"
"golang.org/x/net/context"
)
func TestManifestStorage(t *testing.T) {
ctx := context.Background()
name := "foo/bar"
tag := "thetag"
driver := inmemory.New()
registry := NewRegistryWithDriver(driver)
repo := registry.Repository(ctx, name)
ms := repo.Manifests()
exists, err := ms.Exists(tag)
if err != nil {
t.Fatalf("unexpected error checking manifest existence: %v", err)
}
if exists {
t.Fatalf("manifest should not exist")
}
if _, err := ms.Get(tag); true {
switch err.(type) {
case ErrUnknownManifest:
break
default:
t.Fatalf("expected manifest unknown error: %#v", err)
}
}
m := manifest.Manifest{
Versioned: manifest.Versioned{
SchemaVersion: 1,
},
Name: name,
Tag: tag,
}
// Build up some test layers and add them to the manifest, saving the
// readseekers for upload later.
testLayers := map[digest.Digest]io.ReadSeeker{}
for i := 0; i < 2; i++ {
rs, ds, err := testutil.CreateRandomTarFile()
if err != nil {
t.Fatalf("unexpected error generating test layer file")
}
dgst := digest.Digest(ds)
testLayers[digest.Digest(dgst)] = rs
m.FSLayers = append(m.FSLayers, manifest.FSLayer{
BlobSum: dgst,
})
}
pk, err := libtrust.GenerateECP256PrivateKey()
if err != nil {
t.Fatalf("unexpected error generating private key: %v", err)
}
sm, err := manifest.Sign(&m, pk)
if err != nil {
t.Fatalf("error signing manifest: %v", err)
}
err = ms.Put(tag, sm)
if err == nil {
t.Fatalf("expected errors putting manifest")
}
// TODO(stevvooe): We expect errors describing all of the missing layers.
// Now, upload the layers that were missing!
for dgst, rs := range testLayers {
upload, err := repo.Layers().Upload()
if err != nil {
t.Fatalf("unexpected error creating test upload: %v", err)
}
if _, err := io.Copy(upload, rs); err != nil {
t.Fatalf("unexpected error copying to upload: %v", err)
}
if _, err := upload.Finish(dgst); err != nil {
t.Fatalf("unexpected error finishing upload: %v", err)
}
}
if err = ms.Put(tag, sm); err != nil {
t.Fatalf("unexpected error putting manifest: %v", err)
}
exists, err = ms.Exists(tag)
if err != nil {
t.Fatalf("unexpected error checking manifest existence: %v", err)
}
if !exists {
t.Fatalf("manifest should exist")
}
fetchedManifest, err := ms.Get(tag)
if err != nil {
t.Fatalf("unexpected error fetching manifest: %v", err)
}
if !reflect.DeepEqual(fetchedManifest, sm) {
t.Fatalf("fetched manifest not equal: %#v != %#v", fetchedManifest, sm)
}
fetchedJWS, err := libtrust.ParsePrettySignature(fetchedManifest.Raw, "signatures")
if err != nil {
t.Fatalf("unexpected error parsing jws: %v", err)
}
payload, err := fetchedJWS.Payload()
if err != nil {
t.Fatalf("unexpected error extracting payload: %v", err)
}
sigs, err := fetchedJWS.Signatures()
if err != nil {
t.Fatalf("unable to extract signatures: %v", err)
}
if len(sigs) != 1 {
t.Fatalf("unexpected number of signatures: %d != %d", len(sigs), 1)
}
// Grabs the tags and check that this tagged manifest is present
tags, err := ms.Tags()
if err != nil {
t.Fatalf("unexpected error fetching tags: %v", err)
}
if len(tags) != 1 {
t.Fatalf("unexpected tags returned: %v", tags)
}
if tags[0] != tag {
t.Fatalf("unexpected tag found in tags: %v != %v", tags, []string{tag})
}
// Now, push the same manifest with a different key
pk2, err := libtrust.GenerateECP256PrivateKey()
if err != nil {
t.Fatalf("unexpected error generating private key: %v", err)
}
sm2, err := manifest.Sign(&m, pk2)
if err != nil {
t.Fatalf("unexpected error signing manifest: %v", err)
}
jws2, err := libtrust.ParsePrettySignature(sm2.Raw, "signatures")
if err != nil {
t.Fatalf("error parsing signature: %v", err)
}
sigs2, err := jws2.Signatures()
if err != nil {
t.Fatalf("unable to extract signatures: %v", err)
}
if len(sigs2) != 1 {
t.Fatalf("unexpected number of signatures: %d != %d", len(sigs2), 1)
}
if err = ms.Put(tag, sm2); err != nil {
t.Fatalf("unexpected error putting manifest: %v", err)
}
fetched, err := ms.Get(tag)
if err != nil {
t.Fatalf("unexpected error fetching manifest: %v", err)
}
if _, err := manifest.Verify(fetched); err != nil {
t.Fatalf("unexpected error verifying manifest: %v", err)
}
// Assemble our payload and two signatures to get what we expect!
expectedJWS, err := libtrust.NewJSONSignature(payload, sigs[0], sigs2[0])
if err != nil {
t.Fatalf("unexpected error merging jws: %v", err)
}
expectedSigs, err := expectedJWS.Signatures()
if err != nil {
t.Fatalf("unexpected error getting expected signatures: %v", err)
}
receivedJWS, err := libtrust.ParsePrettySignature(fetched.Raw, "signatures")
if err != nil {
t.Fatalf("unexpected error parsing jws: %v", err)
}
receivedPayload, err := receivedJWS.Payload()
if err != nil {
t.Fatalf("unexpected error extracting received payload: %v", err)
}
if !bytes.Equal(receivedPayload, payload) {
t.Fatalf("payloads are not equal")
}
receivedSigs, err := receivedJWS.Signatures()
if err != nil {
t.Fatalf("error getting signatures: %v", err)
}
for i, sig := range receivedSigs {
if !bytes.Equal(sig, expectedSigs[i]) {
t.Fatalf("mismatched signatures from remote: %v != %v", string(sig), string(expectedSigs[i]))
}
}
if err := ms.Delete(tag); err != nil {
t.Fatalf("unexpected error deleting manifest: %v", err)
}
}

View file

@ -0,0 +1,156 @@
package notifications
import (
"net/http"
"time"
"github.com/docker/distribution/manifest"
"code.google.com/p/go-uuid/uuid"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/storage"
)
type bridge struct {
ub URLBuilder
actor ActorRecord
source SourceRecord
request RequestRecord
sink Sink
}
var _ Listener = &bridge{}
// URLBuilder defines a subset of url builder to be used by the event listener.
type URLBuilder interface {
BuildManifestURL(name, tag string) (string, error)
BuildBlobURL(name string, dgst digest.Digest) (string, error)
}
// NewBridge returns a notification listener that writes records to sink,
// using the actor and source. Any urls populated in the events created by
// this bridge will be created using the URLBuilder.
// TODO(stevvooe): Update this to simply take a context.Context object.
func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, request RequestRecord, sink Sink) Listener {
return &bridge{
ub: ub,
actor: actor,
source: source,
request: request,
sink: sink,
}
}
// NewRequestRecord builds a RequestRecord for use in NewBridge from an
// http.Request, associating it with a request id.
func NewRequestRecord(id string, r *http.Request) RequestRecord {
return RequestRecord{
ID: id,
Addr: r.RemoteAddr,
Host: r.Host,
Method: r.Method,
UserAgent: r.UserAgent(),
}
}
func (b *bridge) ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error {
return b.createManifestEventAndWrite(EventActionPush, repo, sm)
}
func (b *bridge) ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error {
return b.createManifestEventAndWrite(EventActionPull, repo, sm)
}
func (b *bridge) ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error {
return b.createManifestEventAndWrite(EventActionDelete, repo, sm)
}
func (b *bridge) LayerPushed(repo storage.Repository, layer storage.Layer) error {
return b.createLayerEventAndWrite(EventActionPush, repo, layer.Digest())
}
func (b *bridge) LayerPulled(repo storage.Repository, layer storage.Layer) error {
return b.createLayerEventAndWrite(EventActionPull, repo, layer.Digest())
}
func (b *bridge) LayerDeleted(repo storage.Repository, layer storage.Layer) error {
return b.createLayerEventAndWrite(EventActionDelete, repo, layer.Digest())
}
func (b *bridge) createManifestEventAndWrite(action string, repo storage.Repository, sm *manifest.SignedManifest) error {
event, err := b.createManifestEvent(action, repo, sm)
if err != nil {
return err
}
return b.sink.Write(*event)
}
func (b *bridge) createManifestEvent(action string, repo storage.Repository, sm *manifest.SignedManifest) (*Event, error) {
event := b.createEvent(action)
event.Target.Type = EventTargetTypeManifest
event.Target.Name = repo.Name()
event.Target.Tag = sm.Tag
p, err := sm.Payload()
if err != nil {
return nil, err
}
event.Target.Digest, err = digest.FromBytes(p)
if err != nil {
return nil, err
}
// TODO(stevvooe): Currently, the is the "tag" url: once the digest url is
// implemented, this should be replaced.
event.Target.URL, err = b.ub.BuildManifestURL(sm.Name, sm.Tag)
if err != nil {
return nil, err
}
return event, nil
}
func (b *bridge) createLayerEventAndWrite(action string, repo storage.Repository, dgst digest.Digest) error {
event, err := b.createLayerEvent(action, repo, dgst)
if err != nil {
return err
}
return b.sink.Write(*event)
}
func (b *bridge) createLayerEvent(action string, repo storage.Repository, dgst digest.Digest) (*Event, error) {
event := b.createEvent(action)
event.Target.Type = EventTargetTypeBlob
event.Target.Name = repo.Name()
event.Target.Digest = dgst
var err error
event.Target.URL, err = b.ub.BuildBlobURL(repo.Name(), dgst)
if err != nil {
return nil, err
}
return event, nil
}
// createEvent creates an event with actor and source populated.
func (b *bridge) createEvent(action string) *Event {
event := createEvent(action)
event.Source = b.source
event.Actor = b.actor
event.Request = b.request
return event
}
// createEvent returns a new event, timestamped, with the specified action.
func createEvent(action string) *Event {
return &Event{
ID: uuid.New(),
Timestamp: time.Now(),
Action: action,
}
}

View file

@ -0,0 +1,86 @@
package notifications
import (
"net/http"
"time"
)
// EndpointConfig covers the optional configuration parameters for an active
// endpoint.
type EndpointConfig struct {
Headers http.Header
Timeout time.Duration
Threshold int
Backoff time.Duration
}
// defaults set any zero-valued fields to a reasonable default.
func (ec *EndpointConfig) defaults() {
if ec.Timeout <= 0 {
ec.Timeout = time.Second
}
if ec.Threshold <= 0 {
ec.Threshold = 10
}
if ec.Backoff <= 0 {
ec.Backoff = time.Second
}
}
// Endpoint is a reliable, queued, thread-safe sink that notify external http
// services when events are written. Writes are non-blocking and always
// succeed for callers but events may be queued internally.
type Endpoint struct {
Sink
url string
name string
EndpointConfig
metrics *safeMetrics
}
// NewEndpoint returns a running endpoint, ready to receive events.
func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
var endpoint Endpoint
endpoint.name = name
endpoint.url = url
endpoint.EndpointConfig = config
endpoint.defaults()
endpoint.metrics = newSafeMetrics()
// Configures the inmemory queue, retry, http pipeline.
endpoint.Sink = newHTTPSink(
endpoint.url, endpoint.Timeout, endpoint.Headers,
endpoint.metrics.httpStatusListener())
endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff)
endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener())
register(&endpoint)
return &endpoint
}
// Name returns the name of the endpoint, generally used for debugging.
func (e *Endpoint) Name() string {
return e.name
}
// URL returns the url of the endpoint.
func (e *Endpoint) URL() string {
return e.url
}
// ReadMetrics populates em with metrics from the endpoint.
func (e *Endpoint) ReadMetrics(em *EndpointMetrics) {
e.metrics.Lock()
defer e.metrics.Unlock()
*em = e.metrics.EndpointMetrics
// Map still need to copied in a threadsafe manner.
em.Statuses = make(map[string]int)
for k, v := range e.metrics.Statuses {
em.Statuses[k] = v
}
}

View file

@ -0,0 +1,154 @@
package notifications
import (
"fmt"
"time"
"github.com/docker/distribution/digest"
)
// EventAction constants used in action field of Event.
const (
EventActionPull = "pull"
EventActionPush = "push"
EventActionDelete = "delete"
)
// EventTargetType constants used in Target section of Event.
const (
EventTargetTypeManifest = "manifest"
EventTargetTypeBlob = "blob"
)
// EventsMediaType is the mediatype for the json event envelope. If the Event,
// ActorRecord, SourceRecord or Envelope structs change, the version number
// should be incremented.
const EventsMediaType = "application/vnd.docker.distribution.events.v1+json"
// Envelope defines the fields of a json event envelope message that can hold
// one or more events.
type Envelope struct {
// Events make up the contents of the envelope. Events present in a single
// envelope are not necessarily related.
Events []Event `json:"events,omitempty"`
}
// TODO(stevvooe): The event type should be separate from the json format. It
// should be defined as an interface. Leaving as is for now since we don't
// need that at this time. If we make this change, the struct below would be
// called "EventRecord".
// Event provides the fields required to describe a registry event.
type Event struct {
// ID provides a unique identifier for the event.
ID string `json:"id,omitempty"`
// Timestamp is the time at which the event occurred.
Timestamp time.Time `json:"timestamp,omitempty"`
// Action indicates what action encompasses the provided event.
Action string `json:"action,omitempty"`
// Target uniquely describes the target of the event.
Target struct {
// Type should be "manifest" or "blob"
Type string `json:"type,omitempty"`
// Name identifies the named repository.
Name string `json:"name,omitempty"`
// Digest should identify the object in the repository.
Digest digest.Digest `json:"digest,omitempty"`
// Tag is present if the operation involved a tagged manifest.
Tag string `json:"tag,omitempty"`
// URL provides a link to the content on the relevant repository instance.
URL string `json:"url,omitempty"`
} `json:"target,omitempty"`
// Request covers the request that generated the event.
Request RequestRecord `json:"request,omitempty"`
// Actor specifies the agent that initiated the event. For most
// situations, this could be from the authorizaton context of the request.
Actor ActorRecord `json:"actor,omitempty"`
// Source identifies the registry node that generated the event. Put
// differently, while the actor "initiates" the event, the source
// "generates" it.
Source SourceRecord `json:"source,omitempty"`
}
// ActorRecord specifies the agent that initiated the event. For most
// situations, this could be from the authorizaton context of the request.
// Data in this record can refer to both the initiating client and the
// generating request.
type ActorRecord struct {
// Name corresponds to the subject or username associated with the
// request context that generated the event.
Name string `json:"name,omitempty"`
// TODO(stevvooe): Look into setting a session cookie to get this
// without docker daemon.
// SessionID
// TODO(stevvooe): Push the "Docker-Command" header to replace cookie and
// get the actual command.
// Command
}
// RequestRecord covers the request that generated the event.
type RequestRecord struct {
// ID uniquely identifies the request that initiated the event.
ID string `json:"id"`
// Addr contains the ip or hostname and possibly port of the client
// connection that initiated the event. This is the RemoteAddr from
// the standard http request.
Addr string `json:"addr,omitempty"`
// Host is the externally accessible host name of the registry instance,
// as specified by the http host header on incoming requests.
Host string `json:"host,omitempty"`
// Method has the request method that generated the event.
Method string `json:"method"`
// UserAgent contains the user agent header of the request.
UserAgent string `json:"useragent"`
}
// SourceRecord identifies the registry node that generated the event. Put
// differently, while the actor "initiates" the event, the source "generates"
// it.
type SourceRecord struct {
// Addr contains the ip or hostname and the port of the registry node
// that generated the event. Generally, this will be resolved by
// os.Hostname() along with the running port.
Addr string `json:"addr,omitempty"`
// InstanceID identifies a running instance of an application. Changes
// after each restart.
InstanceID string `json:"instanceID,omitempty"`
}
var (
// ErrSinkClosed is returned if a write is issued to a sink that has been
// closed. If encountered, the error should be considered terminal and
// retries will not be successful.
ErrSinkClosed = fmt.Errorf("sink: closed")
)
// Sink accepts and sends events.
type Sink interface {
// Write writes one or more events to the sink. If no error is returned,
// the caller will assume that all events have been committed and will not
// try to send them again. If an error is received, the caller may retry
// sending the event. The caller should cede the slice of memory to the
// sink and not modify it after calling this method.
Write(events ...Event) error
// Close the sink, possibly waiting for pending events to flush.
Close() error
}

View file

@ -0,0 +1,145 @@
package notifications
import (
"encoding/json"
"strings"
"testing"
"time"
)
// TestEventJSONFormat provides silly test to detect if the event format or
// envelope has changed. If this code fails, the revision of the protocol may
// need to be incremented.
func TestEventEnvelopeJSONFormat(t *testing.T) {
var expected = strings.TrimSpace(`
{
"events": [
{
"id": "asdf-asdf-asdf-asdf-0",
"timestamp": "2006-01-02T15:04:05Z",
"action": "push",
"target": {
"type": "manifest",
"name": "library/test",
"digest": "sha256:0123456789abcdef0",
"tag": "latest",
"url": "http://example.com/v2/library/test/manifests/latest"
},
"request": {
"id": "asdfasdf",
"addr": "client.local",
"host": "registrycluster.local",
"method": "PUT",
"useragent": "test/0.1"
},
"actor": {
"name": "test-actor"
},
"source": {
"addr": "hostname.local:port"
}
},
{
"id": "asdf-asdf-asdf-asdf-1",
"timestamp": "2006-01-02T15:04:05Z",
"action": "push",
"target": {
"type": "blob",
"name": "library/test",
"digest": "tarsum.v2+sha256:0123456789abcdef1",
"url": "http://example.com/v2/library/test/manifests/latest"
},
"request": {
"id": "asdfasdf",
"addr": "client.local",
"host": "registrycluster.local",
"method": "PUT",
"useragent": "test/0.1"
},
"actor": {
"name": "test-actor"
},
"source": {
"addr": "hostname.local:port"
}
},
{
"id": "asdf-asdf-asdf-asdf-2",
"timestamp": "2006-01-02T15:04:05Z",
"action": "push",
"target": {
"type": "blob",
"name": "library/test",
"digest": "tarsum.v2+sha256:0123456789abcdef2",
"url": "http://example.com/v2/library/test/manifests/latest"
},
"request": {
"id": "asdfasdf",
"addr": "client.local",
"host": "registrycluster.local",
"method": "PUT",
"useragent": "test/0.1"
},
"actor": {
"name": "test-actor"
},
"source": {
"addr": "hostname.local:port"
}
}
]
}
`)
tm, err := time.Parse(time.RFC3339, time.RFC3339[:len(time.RFC3339)-5])
if err != nil {
t.Fatalf("error creating time: %v", err)
}
var prototype Event
prototype.Action = "push"
prototype.Timestamp = tm
prototype.Actor.Name = "test-actor"
prototype.Request.ID = "asdfasdf"
prototype.Request.Addr = "client.local"
prototype.Request.Host = "registrycluster.local"
prototype.Request.Method = "PUT"
prototype.Request.UserAgent = "test/0.1"
prototype.Source.Addr = "hostname.local:port"
var manifestPush Event
manifestPush = prototype
manifestPush.ID = "asdf-asdf-asdf-asdf-0"
manifestPush.Target.Digest = "sha256:0123456789abcdef0"
manifestPush.Target.Type = EventTargetTypeManifest
manifestPush.Target.Name = "library/test"
manifestPush.Target.Tag = "latest"
manifestPush.Target.URL = "http://example.com/v2/library/test/manifests/latest"
var layerPush0 Event
layerPush0 = prototype
layerPush0.ID = "asdf-asdf-asdf-asdf-1"
layerPush0.Target.Digest = "tarsum.v2+sha256:0123456789abcdef1"
layerPush0.Target.Type = EventTargetTypeBlob
layerPush0.Target.Name = "library/test"
layerPush0.Target.URL = "http://example.com/v2/library/test/manifests/latest"
var layerPush1 Event
layerPush1 = prototype
layerPush1.ID = "asdf-asdf-asdf-asdf-2"
layerPush1.Target.Digest = "tarsum.v2+sha256:0123456789abcdef2"
layerPush1.Target.Type = EventTargetTypeBlob
layerPush1.Target.Name = "library/test"
layerPush1.Target.URL = "http://example.com/v2/library/test/manifests/latest"
var envelope Envelope
envelope.Events = append(envelope.Events, manifestPush, layerPush0, layerPush1)
p, err := json.MarshalIndent(envelope, "", " ")
if err != nil {
t.Fatalf("unexpected error marshaling envelope: %v", err)
}
if string(p) != expected {
t.Fatalf("format has changed\n%s\n != \n%s", string(p), expected)
}
}

View file

@ -0,0 +1,145 @@
package notifications
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
)
// httpSink implements a single-flight, http notification endpoint. This is
// very lightweight in that it only makes an attempt at an http request.
// Reliability should be provided by the caller.
type httpSink struct {
url string
mu sync.Mutex
closed bool
client *http.Client
listeners []httpStatusListener
// TODO(stevvooe): Allow one to configure the media type accepted by this
// sink and choose the serialization based on that.
}
// newHTTPSink returns an unreliable, single-flight http sink. Wrap in other
// sinks for increased reliability.
func newHTTPSink(u string, timeout time.Duration, headers http.Header, listeners ...httpStatusListener) *httpSink {
return &httpSink{
url: u,
listeners: listeners,
client: &http.Client{
Transport: &headerRoundTripper{
Transport: http.DefaultTransport.(*http.Transport),
headers: headers,
},
Timeout: timeout,
},
}
}
// httpStatusListener is called on various outcomes of sending notifications.
type httpStatusListener interface {
success(status int, events ...Event)
failure(status int, events ...Event)
err(err error, events ...Event)
}
// Accept makes an attempt to notify the endpoint, returning an error if it
// fails. It is the caller's responsibility to retry on error. The events are
// accepted or rejected as a group.
func (hs *httpSink) Write(events ...Event) error {
hs.mu.Lock()
defer hs.mu.Unlock()
if hs.closed {
return ErrSinkClosed
}
envelope := Envelope{
Events: events,
}
// TODO(stevvooe): It is not ideal to keep re-encoding the request body on
// retry but we are going to do it to keep the code simple. It is likely
// we could change the event struct to manage its own buffer.
p, err := json.MarshalIndent(envelope, "", " ")
if err != nil {
for _, listener := range hs.listeners {
listener.err(err, events...)
}
return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err)
}
body := bytes.NewReader(p)
resp, err := hs.client.Post(hs.url, EventsMediaType, body)
if err != nil {
for _, listener := range hs.listeners {
listener.err(err, events...)
}
return fmt.Errorf("%v: error posting: %v", hs, err)
}
// The notifier will treat any 2xx or 3xx response as accepted by the
// endpoint.
switch {
case resp.StatusCode >= 200 && resp.StatusCode < 400:
for _, listener := range hs.listeners {
listener.success(resp.StatusCode, events...)
}
// TODO(stevvooe): This is a little accepting: we may want to support
// unsupported media type responses with retries using the correct
// media type. There may also be cases that will never work.
return nil
default:
for _, listener := range hs.listeners {
listener.failure(resp.StatusCode, events...)
}
return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status)
}
}
// Close the endpoint
func (hs *httpSink) Close() error {
hs.mu.Lock()
defer hs.mu.Unlock()
if hs.closed {
return fmt.Errorf("httpsink: already closed")
}
hs.closed = true
return nil
}
func (hs *httpSink) String() string {
return fmt.Sprintf("httpSink{%s}", hs.url)
}
type headerRoundTripper struct {
*http.Transport // must be transport to support CancelRequest
headers http.Header
}
func (hrt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
var nreq http.Request
nreq = *req
nreq.Header = make(http.Header)
merge := func(headers http.Header) {
for k, v := range headers {
nreq.Header[k] = append(nreq.Header[k], v...)
}
}
merge(req.Header)
merge(hrt.headers)
return hrt.Transport.RoundTrip(&nreq)
}

View file

@ -0,0 +1,155 @@
package notifications
import (
"encoding/json"
"fmt"
"mime"
"net/http"
"net/http/httptest"
"reflect"
"strconv"
"testing"
)
// TestHTTPSink mocks out an http endpoint and notifies it under a couple of
// conditions, ensuring correct behavior.
func TestHTTPSink(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
t.Fatalf("unexpected request method: %v", r.Method)
return
}
// Extract the content type and make sure it matches
contentType := r.Header.Get("Content-Type")
mediaType, _, err := mime.ParseMediaType(contentType)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
t.Fatalf("error parsing media type: %v, contenttype=%q", err, contentType)
return
}
if mediaType != EventsMediaType {
w.WriteHeader(http.StatusUnsupportedMediaType)
t.Fatalf("incorrect media type: %q != %q", mediaType, EventsMediaType)
return
}
var envelope Envelope
dec := json.NewDecoder(r.Body)
if err := dec.Decode(&envelope); err != nil {
w.WriteHeader(http.StatusBadRequest)
t.Fatalf("error decoding request body: %v", err)
return
}
// Let caller choose the status
status, err := strconv.Atoi(r.FormValue("status"))
if err != nil {
t.Logf("error parsing status: %v", err)
// May just be empty, set status to 200
status = http.StatusOK
}
w.WriteHeader(status)
}))
metrics := newSafeMetrics()
sink := newHTTPSink(server.URL, 0, nil,
&endpointMetricsHTTPStatusListener{safeMetrics: metrics})
var expectedMetrics EndpointMetrics
expectedMetrics.Statuses = make(map[string]int)
for _, tc := range []struct {
events []Event // events to send
url string
failure bool // true if there should be a failure.
statusCode int // if not set, no status code should be incremented.
}{
{
statusCode: http.StatusOK,
events: []Event{
createTestEvent("push", "library/test", "manifest")},
},
{
statusCode: http.StatusOK,
events: []Event{
createTestEvent("push", "library/test", "manifest"),
createTestEvent("push", "library/test", "layer"),
createTestEvent("push", "library/test", "layer"),
},
},
{
statusCode: http.StatusTemporaryRedirect,
},
{
statusCode: http.StatusBadRequest,
failure: true,
},
{
// Case where connection never goes through.
url: "http://shoudlntresolve/",
failure: true,
},
} {
if tc.failure {
expectedMetrics.Failures += len(tc.events)
} else {
expectedMetrics.Successes += len(tc.events)
}
if tc.statusCode > 0 {
expectedMetrics.Statuses[fmt.Sprintf("%d %s", tc.statusCode, http.StatusText(tc.statusCode))] += len(tc.events)
}
url := tc.url
if url == "" {
url = server.URL + "/"
}
// setup endpoint to respond with expected status code.
url += fmt.Sprintf("?status=%v", tc.statusCode)
sink.url = url
t.Logf("testcase: %v, fail=%v", url, tc.failure)
// Try a simple event emission.
err := sink.Write(tc.events...)
if !tc.failure {
if err != nil {
t.Fatalf("unexpected error send event: %v", err)
}
} else {
if err == nil {
t.Fatalf("the endpoint should have rejected the request")
}
}
if !reflect.DeepEqual(metrics.EndpointMetrics, expectedMetrics) {
t.Fatalf("metrics not as expected: %#v != %#v", metrics.EndpointMetrics, expectedMetrics)
}
}
if err := sink.Close(); err != nil {
t.Fatalf("unexpected error closing http sink: %v", err)
}
// double close returns error
if err := sink.Close(); err == nil {
t.Fatalf("second close should have returned error: %v", err)
}
}
func createTestEvent(action, repo, typ string) Event {
event := createEvent(action)
event.Target.Type = typ
event.Target.Name = repo
return *event
}

View file

@ -0,0 +1,140 @@
package notifications
import (
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/registry/storage"
)
// ManifestListener describes a set of methods for listening to events related to manifests.
type ManifestListener interface {
ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error
ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error
// TODO(stevvooe): Please note that delete support is still a little shaky
// and we'll need to propagate these in the future.
ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error
}
// LayerListener describes a listener that can respond to layer related events.
type LayerListener interface {
LayerPushed(repo storage.Repository, layer storage.Layer) error
LayerPulled(repo storage.Repository, layer storage.Layer) error
// TODO(stevvooe): Please note that delete support is still a little shaky
// and we'll need to propagate these in the future.
LayerDeleted(repo storage.Repository, layer storage.Layer) error
}
// Listener combines all repository events into a single interface.
type Listener interface {
ManifestListener
LayerListener
}
type repositoryListener struct {
storage.Repository
listener Listener
}
// Listen dispatches events on the repository to the listener.
func Listen(repo storage.Repository, listener Listener) storage.Repository {
return &repositoryListener{
Repository: repo,
listener: listener,
}
}
func (rl *repositoryListener) Manifests() storage.ManifestService {
return &manifestServiceListener{
ManifestService: rl.Repository.Manifests(),
parent: rl,
}
}
func (rl *repositoryListener) Layers() storage.LayerService {
return &layerServiceListener{
LayerService: rl.Repository.Layers(),
parent: rl,
}
}
type manifestServiceListener struct {
storage.ManifestService
parent *repositoryListener
}
func (msl *manifestServiceListener) Get(tag string) (*manifest.SignedManifest, error) {
sm, err := msl.ManifestService.Get(tag)
if err == nil {
if err := msl.parent.listener.ManifestPulled(msl.parent.Repository, sm); err != nil {
logrus.Errorf("error dispatching manifest pull to listener: %v", err)
}
}
return sm, err
}
func (msl *manifestServiceListener) Put(tag string, sm *manifest.SignedManifest) error {
err := msl.ManifestService.Put(tag, sm)
if err == nil {
if err := msl.parent.listener.ManifestPushed(msl.parent.Repository, sm); err != nil {
logrus.Errorf("error dispatching manifest push to listener: %v", err)
}
}
return err
}
type layerServiceListener struct {
storage.LayerService
parent *repositoryListener
}
func (lsl *layerServiceListener) Fetch(dgst digest.Digest) (storage.Layer, error) {
layer, err := lsl.LayerService.Fetch(dgst)
if err == nil {
if err := lsl.parent.listener.LayerPulled(lsl.parent.Repository, layer); err != nil {
logrus.Errorf("error dispatching layer pull to listener: %v", err)
}
}
return layer, err
}
func (lsl *layerServiceListener) Upload() (storage.LayerUpload, error) {
lu, err := lsl.LayerService.Upload()
return lsl.decorateUpload(lu), err
}
func (lsl *layerServiceListener) Resume(uuid string) (storage.LayerUpload, error) {
lu, err := lsl.LayerService.Resume(uuid)
return lsl.decorateUpload(lu), err
}
func (lsl *layerServiceListener) decorateUpload(lu storage.LayerUpload) storage.LayerUpload {
return &layerUploadListener{
LayerUpload: lu,
parent: lsl,
}
}
type layerUploadListener struct {
storage.LayerUpload
parent *layerServiceListener
}
func (lul *layerUploadListener) Finish(dgst digest.Digest) (storage.Layer, error) {
layer, err := lul.LayerUpload.Finish(dgst)
if err == nil {
if err := lul.parent.parent.listener.LayerPushed(lul.parent.parent.Repository, layer); err != nil {
logrus.Errorf("error dispatching layer push to listener: %v", err)
}
}
return layer, err
}

View file

@ -0,0 +1,153 @@
package notifications
import (
"io"
"reflect"
"testing"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/storagedriver/inmemory"
"github.com/docker/distribution/testutil"
"github.com/docker/libtrust"
"golang.org/x/net/context"
)
func TestListener(t *testing.T) {
registry := storage.NewRegistryWithDriver(inmemory.New())
tl := &testListener{
ops: make(map[string]int),
}
ctx := context.Background()
repository := Listen(registry.Repository(ctx, "foo/bar"), tl)
// Now take the registry through a number of operations
checkExerciseRepository(t, repository)
expectedOps := map[string]int{
"manifest:push": 1,
"manifest:pull": 1,
// "manifest:delete": 0, // deletes not supported for now
"layer:push": 2,
"layer:pull": 2,
// "layer:delete": 0, // deletes not supported for now
}
if !reflect.DeepEqual(tl.ops, expectedOps) {
t.Fatalf("counts do not match:\n%v\n !=\n%v", tl.ops, expectedOps)
}
}
type testListener struct {
ops map[string]int
}
func (tl *testListener) ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error {
tl.ops["manifest:push"]++
return nil
}
func (tl *testListener) ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error {
tl.ops["manifest:pull"]++
return nil
}
func (tl *testListener) ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error {
tl.ops["manifest:delete"]++
return nil
}
func (tl *testListener) LayerPushed(repo storage.Repository, layer storage.Layer) error {
tl.ops["layer:push"]++
return nil
}
func (tl *testListener) LayerPulled(repo storage.Repository, layer storage.Layer) error {
tl.ops["layer:pull"]++
return nil
}
func (tl *testListener) LayerDeleted(repo storage.Repository, layer storage.Layer) error {
tl.ops["layer:delete"]++
return nil
}
// checkExerciseRegistry takes the registry through all of its operations,
// carrying out generic checks.
func checkExerciseRepository(t *testing.T, repository storage.Repository) {
// TODO(stevvooe): This would be a nice testutil function. Basically, it
// takes the registry through a common set of operations. This could be
// used to make cross-cutting updates by changing internals that affect
// update counts. Basically, it would make writing tests a lot easier.
tag := "thetag"
m := manifest.Manifest{
Versioned: manifest.Versioned{
SchemaVersion: 1,
},
Name: repository.Name(),
Tag: tag,
}
layers := repository.Layers()
for i := 0; i < 2; i++ {
rs, ds, err := testutil.CreateRandomTarFile()
if err != nil {
t.Fatalf("error creating test layer: %v", err)
}
dgst := digest.Digest(ds)
upload, err := layers.Upload()
if err != nil {
t.Fatalf("error creating layer upload: %v", err)
}
// Use the resumes, as well!
upload, err = layers.Resume(upload.UUID())
if err != nil {
t.Fatalf("error resuming layer upload: %v", err)
}
io.Copy(upload, rs)
if _, err := upload.Finish(dgst); err != nil {
t.Fatalf("unexpected error finishing upload: %v", err)
}
m.FSLayers = append(m.FSLayers, manifest.FSLayer{
BlobSum: dgst,
})
// Then fetch the layers
if _, err := layers.Fetch(dgst); err != nil {
t.Fatalf("error fetching layer: %v", err)
}
}
pk, err := libtrust.GenerateECP256PrivateKey()
if err != nil {
t.Fatalf("unexpected error generating key: %v", err)
}
sm, err := manifest.Sign(&m, pk)
if err != nil {
t.Fatalf("unexpected error signing manifest: %v", err)
}
manifests := repository.Manifests()
if err := manifests.Put(tag, sm); err != nil {
t.Fatalf("unexpected error putting the manifest: %v", err)
}
fetched, err := manifests.Get(tag)
if err != nil {
t.Fatalf("unexpected error fetching manifest: %v", err)
}
if fetched.Tag != fetched.Tag {
t.Fatalf("retrieved unexpected manifest: %v", err)
}
}

View file

@ -0,0 +1,152 @@
package notifications
import (
"expvar"
"fmt"
"net/http"
"sync"
)
// EndpointMetrics track various actions taken by the endpoint, typically by
// number of events. The goal of this to export it via expvar but we may find
// some other future solution to be better.
type EndpointMetrics struct {
Pending int // events pending in queue
Events int // total events incoming
Successes int // total events written successfully
Failures int // total events failed
Errors int // total events errored
Statuses map[string]int // status code histogram, per call event
}
// safeMetrics guards the metrics implementation with a lock and provides a
// safe update function.
type safeMetrics struct {
EndpointMetrics
sync.Mutex // protects statuses map
}
// newSafeMetrics returns safeMetrics with map allocated.
func newSafeMetrics() *safeMetrics {
var sm safeMetrics
sm.Statuses = make(map[string]int)
return &sm
}
// httpStatusListener returns the listener for the http sink that updates the
// relevent counters.
func (sm *safeMetrics) httpStatusListener() httpStatusListener {
return &endpointMetricsHTTPStatusListener{
safeMetrics: sm,
}
}
// eventQueueListener returns a listener that maintains queue related counters.
func (sm *safeMetrics) eventQueueListener() eventQueueListener {
return &endpointMetricsEventQueueListener{
safeMetrics: sm,
}
}
// endpointMetricsHTTPStatusListener increments counters related to http sinks
// for the relevent events.
type endpointMetricsHTTPStatusListener struct {
*safeMetrics
}
var _ httpStatusListener = &endpointMetricsHTTPStatusListener{}
func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) {
emsl.safeMetrics.Lock()
defer emsl.safeMetrics.Unlock()
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
emsl.Successes += len(events)
}
func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
emsl.safeMetrics.Lock()
defer emsl.safeMetrics.Unlock()
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
emsl.Failures += len(events)
}
func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
emsl.safeMetrics.Lock()
defer emsl.safeMetrics.Unlock()
emsl.Errors += len(events)
}
// endpointMetricsEventQueueListener maintains the incoming events counter and
// the queues pending count.
type endpointMetricsEventQueueListener struct {
*safeMetrics
}
func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
eqc.Lock()
defer eqc.Unlock()
eqc.Events += len(events)
eqc.Pending += len(events)
}
func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
eqc.Lock()
defer eqc.Unlock()
eqc.Pending -= len(events)
}
// endpoints is global registry of endpoints used to report metrics to expvar
var endpoints struct {
registered []*Endpoint
mu sync.Mutex
}
// register places the endpoint into expvar so that stats are tracked.
func register(e *Endpoint) {
endpoints.mu.Lock()
defer endpoints.mu.Unlock()
endpoints.registered = append(endpoints.registered, e)
}
func init() {
// NOTE(stevvooe): Setup registry metrics structure to report to expvar.
// Ideally, we do more metrics through logging but we need some nice
// realtime metrics for queue state for now.
registry := expvar.Get("registry")
if registry == nil {
registry = expvar.NewMap("registry")
}
var notifications expvar.Map
notifications.Init()
notifications.Set("endpoints", expvar.Func(func() interface{} {
endpoints.mu.Lock()
defer endpoints.mu.Unlock()
var names []interface{}
for _, v := range endpoints.registered {
var epjson struct {
Name string `json:"name"`
URL string `json:"url"`
EndpointConfig
Metrics EndpointMetrics
}
epjson.Name = v.Name()
epjson.URL = v.URL()
epjson.EndpointConfig = v.EndpointConfig
v.ReadMetrics(&epjson.Metrics)
names = append(names, epjson)
}
return names
}))
registry.(*expvar.Map).Set("notifications", &notifications)
}

View file

@ -0,0 +1,337 @@
package notifications
import (
"container/list"
"fmt"
"sync"
"time"
"github.com/Sirupsen/logrus"
)
// NOTE(stevvooe): This file contains definitions for several utility sinks.
// Typically, the broadcaster is the only sink that should be required
// externally, but others are suitable for export if the need arises. Albeit,
// the tight integration with endpoint metrics should be removed.
// Broadcaster sends events to multiple, reliable Sinks. The goal of this
// component is to dispatch events to configured endpoints. Reliability can be
// provided by wrapping incoming sinks.
type Broadcaster struct {
sinks []Sink
events chan []Event
closed chan chan struct{}
}
// NewBroadcaster ...
// Add appends one or more sinks to the list of sinks. The broadcaster
// behavior will be affected by the properties of the sink. Generally, the
// sink should accept all messages and deal with reliability on its own. Use
// of EventQueue and RetryingSink should be used here.
func NewBroadcaster(sinks ...Sink) *Broadcaster {
b := Broadcaster{
sinks: sinks,
events: make(chan []Event),
closed: make(chan chan struct{}),
}
// Start the broadcaster
go b.run()
return &b
}
// Write accepts a block of events to be dispatched to all sinks. This method
// will never fail and should never block (hopefully!). The caller cedes the
// slice memory to the broadcaster and should not modify it after calling
// write.
func (b *Broadcaster) Write(events ...Event) error {
select {
case b.events <- events:
case <-b.closed:
return ErrSinkClosed
}
return nil
}
// Close the broadcaster, ensuring that all messages are flushed to the
// underlying sink before returning.
func (b *Broadcaster) Close() error {
logrus.Infof("broadcaster: closing")
select {
case <-b.closed:
// already closed
return fmt.Errorf("broadcaster: already closed")
default:
// do a little chan handoff dance to synchronize closing
closed := make(chan struct{})
b.closed <- closed
close(b.closed)
<-closed
return nil
}
}
// run is the main broadcast loop, started when the broadcaster is created.
// Under normal conditions, it waits for events on the event channel. After
// Close is called, this goroutine will exit.
func (b *Broadcaster) run() {
for {
select {
case block := <-b.events:
for _, sink := range b.sinks {
if err := sink.Write(block...); err != nil {
logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err)
}
}
case closing := <-b.closed:
// close all the underlying sinks
for _, sink := range b.sinks {
if err := sink.Close(); err != nil {
logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err)
}
}
closing <- struct{}{}
logrus.Debugf("broadcaster: closed")
return
}
}
}
// eventQueue accepts all messages into a queue for asynchronous consumption
// by a sink. It is unbounded and thread safe but the sink must be reliable or
// events will be dropped.
type eventQueue struct {
sink Sink
events *list.List
listeners []eventQueueListener
cond *sync.Cond
mu sync.Mutex
closed bool
}
// eventQueueListener is called when various events happen on the queue.
type eventQueueListener interface {
ingress(events ...Event)
egress(events ...Event)
}
// newEventQueue returns a queue to the provided sink. If the updater is non-
// nil, it will be called to update pending metrics on ingress and egress.
func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue {
eq := eventQueue{
sink: sink,
events: list.New(),
listeners: listeners,
}
eq.cond = sync.NewCond(&eq.mu)
go eq.run()
return &eq
}
// Write accepts the events into the queue, only failing if the queue has
// beend closed.
func (eq *eventQueue) Write(events ...Event) error {
eq.mu.Lock()
defer eq.mu.Unlock()
if eq.closed {
return ErrSinkClosed
}
for _, listener := range eq.listeners {
listener.ingress(events...)
}
eq.events.PushBack(events)
eq.cond.Signal() // signal waiters
return nil
}
// Close shutsdown the event queue, flushing
func (eq *eventQueue) Close() error {
eq.mu.Lock()
defer eq.mu.Unlock()
if eq.closed {
return fmt.Errorf("eventqueue: already closed")
}
// set closed flag
eq.closed = true
eq.cond.Signal() // signal flushes queue
eq.cond.Wait() // wait for signal from last flush
return eq.sink.Close()
}
// run is the main goroutine to flush events to the target sink.
func (eq *eventQueue) run() {
for {
block := eq.next()
if block == nil {
return // nil block means event queue is closed.
}
if err := eq.sink.Write(block...); err != nil {
logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err)
}
for _, listener := range eq.listeners {
listener.egress(block...)
}
}
}
// next encompasses the critical section of the run loop. When the queue is
// empty, it will block on the condition. If new data arrives, it will wake
// and return a block. When closed, a nil slice will be returned.
func (eq *eventQueue) next() []Event {
eq.mu.Lock()
defer eq.mu.Unlock()
for eq.events.Len() < 1 {
if eq.closed {
eq.cond.Broadcast()
return nil
}
eq.cond.Wait()
}
front := eq.events.Front()
block := front.Value.([]Event)
eq.events.Remove(front)
return block
}
// retryingSink retries the write until success or an ErrSinkClosed is
// returned. Underlying sink must have p > 0 of succeeding or the sink will
// block. Internally, it is a circuit breaker retries to manage reset.
// Concurrent calls to a retrying sink are serialized through the sink,
// meaning that if one is in-flight, another will not proceed.
type retryingSink struct {
mu sync.Mutex
sink Sink
closed bool
// circuit breaker hueristics
failures struct {
threshold int
recent int
last time.Time
backoff time.Duration // time after which we retry after failure.
}
}
type retryingSinkListener interface {
active(events ...Event)
retry(events ...Event)
}
// TODO(stevvooe): We are using circuit break here, which actually doesn't
// make a whole lot of sense for this use case, since we always retry. Move
// this to use bounded exponential backoff.
// newRetryingSink returns a sink that will retry writes to a sink, backing
// off on failure. Parameters threshold and backoff adjust the behavior of the
// circuit breaker.
func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink {
rs := &retryingSink{
sink: sink,
}
rs.failures.threshold = threshold
rs.failures.backoff = backoff
return rs
}
// Write attempts to flush the events to the downstream sink until it succeeds
// or the sink is closed.
func (rs *retryingSink) Write(events ...Event) error {
rs.mu.Lock()
defer rs.mu.Unlock()
retry:
if rs.closed {
return ErrSinkClosed
}
if !rs.proceed() {
logrus.Warnf("%v encountered too many errors, backing off", rs.sink)
rs.wait(rs.failures.backoff)
goto retry
}
if err := rs.write(events...); err != nil {
if err == ErrSinkClosed {
// terminal!
return err
}
logrus.Errorf("retryingsink: error writing events: %v, retrying", err)
goto retry
}
return nil
}
// Close closes the sink and the underlying sink.
func (rs *retryingSink) Close() error {
rs.mu.Lock()
defer rs.mu.Unlock()
if rs.closed {
return fmt.Errorf("retryingsink: already closed")
}
rs.closed = true
return rs.sink.Close()
}
// write provides a helper that dispatches failure and success properly. Used
// by write as the single-flight write call.
func (rs *retryingSink) write(events ...Event) error {
if err := rs.sink.Write(events...); err != nil {
rs.failure()
return err
}
rs.reset()
return nil
}
// wait backoff time against the sink, unlocking so others can proceed. Should
// only be called by methods that currently have the mutex.
func (rs *retryingSink) wait(backoff time.Duration) {
rs.mu.Unlock()
defer rs.mu.Lock()
// backoff here
time.Sleep(backoff)
}
// reset marks a succesful call.
func (rs *retryingSink) reset() {
rs.failures.recent = 0
rs.failures.last = time.Time{}
}
// failure records a failure.
func (rs *retryingSink) failure() {
rs.failures.recent++
rs.failures.last = time.Now().UTC()
}
// proceed returns true if the call should proceed based on circuit breaker
// hueristics.
func (rs *retryingSink) proceed() bool {
return rs.failures.recent < rs.failures.threshold ||
time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff))
}

View file

@ -0,0 +1,223 @@
package notifications
import (
"fmt"
"math/rand"
"sync"
"time"
"github.com/Sirupsen/logrus"
"testing"
)
func TestBroadcaster(t *testing.T) {
const nEvents = 1000
var sinks []Sink
for i := 0; i < 10; i++ {
sinks = append(sinks, &testSink{})
}
b := NewBroadcaster(sinks...)
var block []Event
var wg sync.WaitGroup
for i := 1; i <= nEvents; i++ {
block = append(block, createTestEvent("push", "library/test", "blob"))
if i%10 == 0 && i > 0 {
wg.Add(1)
go func(block ...Event) {
if err := b.Write(block...); err != nil {
t.Fatalf("error writing block of length %d: %v", len(block), err)
}
wg.Done()
}(block...)
block = nil
}
}
wg.Wait() // Wait until writes complete
checkClose(t, b)
// Iterate through the sinks and check that they all have the expected length.
for _, sink := range sinks {
ts := sink.(*testSink)
ts.mu.Lock()
defer ts.mu.Unlock()
if len(ts.events) != nEvents {
t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents)
}
if !ts.closed {
t.Fatalf("sink should have been closed")
}
}
}
func TestEventQueue(t *testing.T) {
const nevents = 1000
var ts testSink
metrics := newSafeMetrics()
eq := newEventQueue(
// delayed sync simulates destination slower than channel comms
&delayedSink{
Sink: &ts,
delay: time.Millisecond * 1,
}, metrics.eventQueueListener())
var wg sync.WaitGroup
var block []Event
for i := 1; i <= nevents; i++ {
block = append(block, createTestEvent("push", "library/test", "blob"))
if i%10 == 0 && i > 0 {
wg.Add(1)
go func(block ...Event) {
if err := eq.Write(block...); err != nil {
t.Fatalf("error writing event block: %v", err)
}
wg.Done()
}(block...)
block = nil
}
}
wg.Wait()
checkClose(t, eq)
ts.mu.Lock()
defer ts.mu.Unlock()
metrics.Lock()
defer metrics.Unlock()
if len(ts.events) != nevents {
t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000)
}
if !ts.closed {
t.Fatalf("sink should have been closed")
}
if metrics.Events != nevents {
t.Fatalf("unexpected ingress count: %d != %d", metrics.Events, nevents)
}
if metrics.Pending != 0 {
t.Fatalf("unexpected egress count: %d != %d", metrics.Pending, 0)
}
}
func TestRetryingSink(t *testing.T) {
// Make a sync that fails most of the time, ensuring that all the events
// make it through.
var ts testSink
flaky := &flakySink{
rate: 1.0, // start out always failing.
Sink: &ts,
}
s := newRetryingSink(flaky, 3, 10*time.Millisecond)
var wg sync.WaitGroup
var block []Event
for i := 1; i <= 100; i++ {
block = append(block, createTestEvent("push", "library/test", "blob"))
// Above 50, set the failure rate lower
if i > 50 {
s.mu.Lock()
flaky.rate = 0.90
s.mu.Unlock()
}
if i%10 == 0 && i > 0 {
wg.Add(1)
go func(block ...Event) {
defer wg.Done()
if err := s.Write(block...); err != nil {
t.Fatalf("error writing event block: %v", err)
}
}(block...)
block = nil
}
}
wg.Wait()
checkClose(t, s)
ts.mu.Lock()
defer ts.mu.Unlock()
if len(ts.events) != 100 {
t.Fatalf("events not propagated: %d != %d", len(ts.events), 100)
}
}
type testSink struct {
events []Event
mu sync.Mutex
closed bool
}
func (ts *testSink) Write(events ...Event) error {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.events = append(ts.events, events...)
return nil
}
func (ts *testSink) Close() error {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.closed = true
logrus.Infof("closing testSink")
return nil
}
type delayedSink struct {
Sink
delay time.Duration
}
func (ds *delayedSink) Write(events ...Event) error {
time.Sleep(ds.delay)
return ds.Sink.Write(events...)
}
type flakySink struct {
Sink
rate float64
}
func (fs *flakySink) Write(events ...Event) error {
if rand.Float64() < fs.rate {
return fmt.Errorf("error writing %d events", len(events))
}
return fs.Sink.Write(events...)
}
func checkClose(t *testing.T, sink Sink) {
if err := sink.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
// second close should not crash but should return an error.
if err := sink.Close(); err == nil {
t.Fatalf("no error on double close")
}
// Write after closed should be an error
if err := sink.Write([]Event{}...); err == nil {
t.Fatalf("write after closed did not have an error")
} else if err != ErrSinkClosed {
t.Fatalf("error should be ErrSinkClosed")
}
}

458
docs/storage/paths.go Normal file
View file

@ -0,0 +1,458 @@
package storage
import (
"fmt"
"path"
"strings"
"github.com/docker/distribution/digest"
)
const storagePathVersion = "v2"
// pathMapper maps paths based on "object names" and their ids. The "object
// names" mapped by pathMapper are internal to the storage system.
//
// The path layout in the storage backend is roughly as follows:
//
// <root>/v2
// -> repositories/
// -><name>/
// -> _manifests/
// revisions
// -> <manifest digest path>
// -> link
// -> signatures
// <algorithm>/<digest>/link
// tags/<tag>
// -> current/link
// -> index
// -> <algorithm>/<hex digest>/link
// -> _layers/
// <layer links to blob store>
// -> _uploads/<uuid>
// data
// startedat
// -> blob/<algorithm>
// <split directory content addressable storage>
//
// The storage backend layout is broken up into a content- addressable blob
// store and repositories. The content-addressable blob store holds most data
// throughout the backend, keyed by algorithm and digests of the underlying
// content. Access to the blob store is controled through links from the
// repository to blobstore.
//
// A repository is made up of layers, manifests and tags. The layers component
// is just a directory of layers which are "linked" into a repository. A layer
// can only be accessed through a qualified repository name if it is linked in
// the repository. Uploads of layers are managed in the uploads directory,
// which is key by upload uuid. When all data for an upload is received, the
// data is moved into the blob store and the upload directory is deleted.
// Abandoned uploads can be garbage collected by reading the startedat file
// and removing uploads that have been active for longer than a certain time.
//
// The third component of the repository directory is the manifests store,
// which is made up of a revision store and tag store. Manifests are stored in
// the blob store and linked into the revision store. Signatures are separated
// from the manifest payload data and linked into the blob store, as well.
// While the registry can save all revisions of a manifest, no relationship is
// implied as to the ordering of changes to a manifest. The tag store provides
// support for name, tag lookups of manifests, using "current/link" under a
// named tag directory. An index is maintained to support deletions of all
// revisions of a given manifest tag.
//
// We cover the path formats implemented by this path mapper below.
//
// Manifests:
//
// manifestRevisionPathSpec: <root>/v2/repositories/<name>/_manifests/revisions/<algorithm>/<hex digest>/
// manifestRevisionLinkPathSpec: <root>/v2/repositories/<name>/_manifests/revisions/<algorithm>/<hex digest>/link
// manifestSignaturesPathSpec: <root>/v2/repositories/<name>/_manifests/revisions/<algorithm>/<hex digest>/signatures/
// manifestSignatureLinkPathSpec: <root>/v2/repositories/<name>/_manifests/revisions/<algorithm>/<hex digest>/signatures/<algorithm>/<hex digest>/link
//
// Tags:
//
// manifestTagsPathSpec: <root>/v2/repositories/<name>/_manifests/tags/
// manifestTagPathSpec: <root>/v2/repositories/<name>/_manifests/tags/<tag>/
// manifestTagCurrentPathSpec: <root>/v2/repositories/<name>/_manifests/tags/<tag>/current/link
// manifestTagIndexPathSpec: <root>/v2/repositories/<name>/_manifests/tags/<tag>/index/
// manifestTagIndexEntryPathSpec: <root>/v2/repositories/<name>/_manifests/tags/<tag>/index/<algorithm>/<hex digest>/link
//
// Layers:
//
// layerLinkPathSpec: <root>/v2/repositories/<name>/_layers/tarsum/<tarsum version>/<tarsum hash alg>/<tarsum hash>/link
//
// Uploads:
//
// uploadDataPathSpec: <root>/v2/repositories/<name>/_uploads/<uuid>/data
// uploadStartedAtPathSpec: <root>/v2/repositories/<name>/_uploads/<uuid>/startedat
//
// Blob Store:
//
// blobPathSpec: <root>/v2/blobs/<algorithm>/<first two hex bytes of digest>/<hex digest>
// blobDataPathSpec: <root>/v2/blobs/<algorithm>/<first two hex bytes of digest>/<hex digest>/data
//
// For more information on the semantic meaning of each path and their
// contents, please see the path spec documentation.
type pathMapper struct {
root string
version string // should be a constant?
}
var defaultPathMapper = &pathMapper{
root: "/docker/registry/",
version: storagePathVersion,
}
// path returns the path identified by spec.
func (pm *pathMapper) path(spec pathSpec) (string, error) {
// Switch on the path object type and return the appropriate path. At
// first glance, one may wonder why we don't use an interface to
// accomplish this. By keep the formatting separate from the pathSpec, we
// keep separate the path generation componentized. These specs could be
// passed to a completely different mapper implementation and generate a
// different set of paths.
//
// For example, imagine migrating from one backend to the other: one could
// build a filesystem walker that converts a string path in one version,
// to an intermediate path object, than can be consumed and mapped by the
// other version.
rootPrefix := []string{pm.root, pm.version}
repoPrefix := append(rootPrefix, "repositories")
switch v := spec.(type) {
case manifestRevisionPathSpec:
components, err := digestPathComponents(v.revision, false)
if err != nil {
return "", err
}
return path.Join(append(append(repoPrefix, v.name, "_manifests", "revisions"), components...)...), nil
case manifestRevisionLinkPathSpec:
root, err := pm.path(manifestRevisionPathSpec{
name: v.name,
revision: v.revision,
})
if err != nil {
return "", err
}
return path.Join(root, "link"), nil
case manifestSignaturesPathSpec:
root, err := pm.path(manifestRevisionPathSpec{
name: v.name,
revision: v.revision,
})
if err != nil {
return "", err
}
return path.Join(root, "signatures"), nil
case manifestSignatureLinkPathSpec:
root, err := pm.path(manifestSignaturesPathSpec{
name: v.name,
revision: v.revision,
})
if err != nil {
return "", err
}
signatureComponents, err := digestPathComponents(v.signature, false)
if err != nil {
return "", err
}
return path.Join(root, path.Join(append(signatureComponents, "link")...)), nil
case manifestTagsPathSpec:
return path.Join(append(repoPrefix, v.name, "_manifests", "tags")...), nil
case manifestTagPathSpec:
root, err := pm.path(manifestTagsPathSpec{
name: v.name,
})
if err != nil {
return "", err
}
return path.Join(root, v.tag), nil
case manifestTagCurrentPathSpec:
root, err := pm.path(manifestTagPathSpec{
name: v.name,
tag: v.tag,
})
if err != nil {
return "", err
}
return path.Join(root, "current", "link"), nil
case manifestTagIndexPathSpec:
root, err := pm.path(manifestTagPathSpec{
name: v.name,
tag: v.tag,
})
if err != nil {
return "", err
}
return path.Join(root, "index"), nil
case manifestTagIndexEntryPathSpec:
root, err := pm.path(manifestTagIndexPathSpec{
name: v.name,
tag: v.tag,
})
if err != nil {
return "", err
}
components, err := digestPathComponents(v.revision, false)
if err != nil {
return "", err
}
return path.Join(root, path.Join(append(components, "link")...)), nil
case layerLinkPathSpec:
components, err := digestPathComponents(v.digest, false)
if err != nil {
return "", err
}
// For now, only map tarsum paths.
if components[0] != "tarsum" {
// Only tarsum is supported, for now
return "", fmt.Errorf("unsupported content digest: %v", v.digest)
}
layerLinkPathComponents := append(repoPrefix, v.name, "_layers")
return path.Join(path.Join(append(layerLinkPathComponents, components...)...), "link"), nil
case blobDataPathSpec:
components, err := digestPathComponents(v.digest, true)
if err != nil {
return "", err
}
components = append(components, "data")
blobPathPrefix := append(rootPrefix, "blobs")
return path.Join(append(blobPathPrefix, components...)...), nil
case uploadDataPathSpec:
return path.Join(append(repoPrefix, v.name, "_uploads", v.uuid, "data")...), nil
case uploadStartedAtPathSpec:
return path.Join(append(repoPrefix, v.name, "_uploads", v.uuid, "startedat")...), nil
default:
// TODO(sday): This is an internal error. Ensure it doesn't escape (panic?).
return "", fmt.Errorf("unknown path spec: %#v", v)
}
}
// pathSpec is a type to mark structs as path specs. There is no
// implementation because we'd like to keep the specs and the mappers
// decoupled.
type pathSpec interface {
pathSpec()
}
// manifestRevisionPathSpec describes the components of the directory path for
// a manifest revision.
type manifestRevisionPathSpec struct {
name string
revision digest.Digest
}
func (manifestRevisionPathSpec) pathSpec() {}
// manifestRevisionLinkPathSpec describes the path components required to look
// up the data link for a revision of a manifest. If this file is not present,
// the manifest blob is not available in the given repo. The contents of this
// file should just be the digest.
type manifestRevisionLinkPathSpec struct {
name string
revision digest.Digest
}
func (manifestRevisionLinkPathSpec) pathSpec() {}
// manifestSignaturesPathSpec decribes the path components for the directory
// containing all the signatures for the target blob. Entries are named with
// the underlying key id.
type manifestSignaturesPathSpec struct {
name string
revision digest.Digest
}
func (manifestSignaturesPathSpec) pathSpec() {}
// manifestSignatureLinkPathSpec decribes the path components used to look up
// a signature file by the hash of its blob.
type manifestSignatureLinkPathSpec struct {
name string
revision digest.Digest
signature digest.Digest
}
func (manifestSignatureLinkPathSpec) pathSpec() {}
// manifestTagsPathSpec describes the path elements required to point to the
// manifest tags directory.
type manifestTagsPathSpec struct {
name string
}
func (manifestTagsPathSpec) pathSpec() {}
// manifestTagPathSpec describes the path elements required to point to the
// manifest tag links files under a repository. These contain a blob id that
// can be used to look up the data and signatures.
type manifestTagPathSpec struct {
name string
tag string
}
func (manifestTagPathSpec) pathSpec() {}
// manifestTagCurrentPathSpec describes the link to the current revision for a
// given tag.
type manifestTagCurrentPathSpec struct {
name string
tag string
}
func (manifestTagCurrentPathSpec) pathSpec() {}
// manifestTagCurrentPathSpec describes the link to the index of revisions
// with the given tag.
type manifestTagIndexPathSpec struct {
name string
tag string
}
func (manifestTagIndexPathSpec) pathSpec() {}
// manifestTagIndexEntryPathSpec describes the link to a revisions of a
// manifest with given tag within the index.
type manifestTagIndexEntryPathSpec struct {
name string
tag string
revision digest.Digest
}
func (manifestTagIndexEntryPathSpec) pathSpec() {}
// layerLink specifies a path for a layer link, which is a file with a blob
// id. The layer link will contain a content addressable blob id reference
// into the blob store. The format of the contents is as follows:
//
// <algorithm>:<hex digest of layer data>
//
// The following example of the file contents is more illustrative:
//
// sha256:96443a84ce518ac22acb2e985eda402b58ac19ce6f91980bde63726a79d80b36
//
// This says indicates that there is a blob with the id/digest, calculated via
// sha256 that can be fetched from the blob store.
type layerLinkPathSpec struct {
name string
digest digest.Digest
}
func (layerLinkPathSpec) pathSpec() {}
// blobAlgorithmReplacer does some very simple path sanitization for user
// input. Mostly, this is to provide some heirachry for tarsum digests. Paths
// should be "safe" before getting this far due to strict digest requirements
// but we can add further path conversion here, if needed.
var blobAlgorithmReplacer = strings.NewReplacer(
"+", "/",
".", "/",
";", "/",
)
// // blobPathSpec contains the path for the registry global blob store.
// type blobPathSpec struct {
// digest digest.Digest
// }
// func (blobPathSpec) pathSpec() {}
// blobDataPathSpec contains the path for the registry global blob store. For
// now, this contains layer data, exclusively.
type blobDataPathSpec struct {
digest digest.Digest
}
func (blobDataPathSpec) pathSpec() {}
// uploadDataPathSpec defines the path parameters of the data file for
// uploads.
type uploadDataPathSpec struct {
name string
uuid string
}
func (uploadDataPathSpec) pathSpec() {}
// uploadDataPathSpec defines the path parameters for the file that stores the
// start time of an uploads. If it is missing, the upload is considered
// unknown. Admittedly, the presence of this file is an ugly hack to make sure
// we have a way to cleanup old or stalled uploads that doesn't rely on driver
// FileInfo behavior. If we come up with a more clever way to do this, we
// should remove this file immediately and rely on the startetAt field from
// the client to enforce time out policies.
type uploadStartedAtPathSpec struct {
name string
uuid string
}
func (uploadStartedAtPathSpec) pathSpec() {}
// digestPathComponents provides a consistent path breakdown for a given
// digest. For a generic digest, it will be as follows:
//
// <algorithm>/<hex digest>
//
// Most importantly, for tarsum, the layout looks like this:
//
// tarsum/<version>/<digest algorithm>/<full digest>
//
// If multilevel is true, the first two bytes of the digest will separate
// groups of digest folder. It will be as follows:
//
// <algorithm>/<first two bytes of digest>/<full digest>
//
func digestPathComponents(dgst digest.Digest, multilevel bool) ([]string, error) {
if err := dgst.Validate(); err != nil {
return nil, err
}
algorithm := blobAlgorithmReplacer.Replace(dgst.Algorithm())
hex := dgst.Hex()
prefix := []string{algorithm}
var suffix []string
if multilevel {
suffix = append(suffix, hex[:2])
}
suffix = append(suffix, hex)
if tsi, err := digest.ParseTarSum(dgst.String()); err == nil {
// We have a tarsum!
version := tsi.Version
if version == "" {
version = "v0"
}
prefix = []string{
"tarsum",
version,
tsi.Algorithm,
}
}
return append(prefix, suffix...), nil
}

138
docs/storage/paths_test.go Normal file
View file

@ -0,0 +1,138 @@
package storage
import (
"testing"
"github.com/docker/distribution/digest"
)
func TestPathMapper(t *testing.T) {
pm := &pathMapper{
root: "/pathmapper-test",
}
for _, testcase := range []struct {
spec pathSpec
expected string
err error
}{
{
spec: manifestRevisionPathSpec{
name: "foo/bar",
revision: "sha256:abcdef0123456789",
},
expected: "/pathmapper-test/repositories/foo/bar/_manifests/revisions/sha256/abcdef0123456789",
},
{
spec: manifestRevisionLinkPathSpec{
name: "foo/bar",
revision: "sha256:abcdef0123456789",
},
expected: "/pathmapper-test/repositories/foo/bar/_manifests/revisions/sha256/abcdef0123456789/link",
},
{
spec: manifestSignatureLinkPathSpec{
name: "foo/bar",
revision: "sha256:abcdef0123456789",
signature: "sha256:abcdef0123456789",
},
expected: "/pathmapper-test/repositories/foo/bar/_manifests/revisions/sha256/abcdef0123456789/signatures/sha256/abcdef0123456789/link",
},
{
spec: manifestSignaturesPathSpec{
name: "foo/bar",
revision: "sha256:abcdef0123456789",
},
expected: "/pathmapper-test/repositories/foo/bar/_manifests/revisions/sha256/abcdef0123456789/signatures",
},
{
spec: manifestTagsPathSpec{
name: "foo/bar",
},
expected: "/pathmapper-test/repositories/foo/bar/_manifests/tags",
},
{
spec: manifestTagPathSpec{
name: "foo/bar",
tag: "thetag",
},
expected: "/pathmapper-test/repositories/foo/bar/_manifests/tags/thetag",
},
{
spec: manifestTagCurrentPathSpec{
name: "foo/bar",
tag: "thetag",
},
expected: "/pathmapper-test/repositories/foo/bar/_manifests/tags/thetag/current/link",
},
{
spec: manifestTagIndexPathSpec{
name: "foo/bar",
tag: "thetag",
},
expected: "/pathmapper-test/repositories/foo/bar/_manifests/tags/thetag/index",
},
{
spec: manifestTagIndexEntryPathSpec{
name: "foo/bar",
tag: "thetag",
revision: "sha256:abcdef0123456789",
},
expected: "/pathmapper-test/repositories/foo/bar/_manifests/tags/thetag/index/sha256/abcdef0123456789/link",
},
{
spec: layerLinkPathSpec{
name: "foo/bar",
digest: "tarsum.v1+test:abcdef",
},
expected: "/pathmapper-test/repositories/foo/bar/_layers/tarsum/v1/test/abcdef/link",
},
{
spec: blobDataPathSpec{
digest: digest.Digest("tarsum.dev+sha512:abcdefabcdefabcdef908909909"),
},
expected: "/pathmapper-test/blobs/tarsum/dev/sha512/ab/abcdefabcdefabcdef908909909/data",
},
{
spec: blobDataPathSpec{
digest: digest.Digest("tarsum.v1+sha256:abcdefabcdefabcdef908909909"),
},
expected: "/pathmapper-test/blobs/tarsum/v1/sha256/ab/abcdefabcdefabcdef908909909/data",
},
{
spec: uploadDataPathSpec{
name: "foo/bar",
uuid: "asdf-asdf-asdf-adsf",
},
expected: "/pathmapper-test/repositories/foo/bar/_uploads/asdf-asdf-asdf-adsf/data",
},
{
spec: uploadStartedAtPathSpec{
name: "foo/bar",
uuid: "asdf-asdf-asdf-adsf",
},
expected: "/pathmapper-test/repositories/foo/bar/_uploads/asdf-asdf-asdf-adsf/startedat",
},
} {
p, err := pm.path(testcase.spec)
if err != nil {
t.Fatalf("unexpected generating path (%T): %v", testcase.spec, err)
}
if p != testcase.expected {
t.Fatalf("unexpected path generated (%T): %q != %q", testcase.spec, p, testcase.expected)
}
}
// Add a few test cases to ensure we cover some errors
// Specify a path that requires a revision and get a digest validation error.
badpath, err := pm.path(manifestSignaturesPathSpec{
name: "foo/bar",
})
if err == nil {
t.Fatalf("expected an error when mapping an invalid revision: %s", badpath)
}
}

80
docs/storage/registry.go Normal file
View file

@ -0,0 +1,80 @@
package storage
import (
"github.com/docker/distribution/storagedriver"
"golang.org/x/net/context"
)
// registry is the top-level implementation of Registry for use in the storage
// package. All instances should descend from this object.
type registry struct {
driver storagedriver.StorageDriver
pm *pathMapper
blobStore *blobStore
}
// NewRegistryWithDriver creates a new registry instance from the provided
// driver. The resulting registry may be shared by multiple goroutines but is
// cheap to allocate.
func NewRegistryWithDriver(driver storagedriver.StorageDriver) Registry {
bs := &blobStore{}
reg := &registry{
driver: driver,
blobStore: bs,
// TODO(sday): This should be configurable.
pm: defaultPathMapper,
}
reg.blobStore.registry = reg
return reg
}
// Repository returns an instance of the repository tied to the registry.
// Instances should not be shared between goroutines but are cheap to
// allocate. In general, they should be request scoped.
func (reg *registry) Repository(ctx context.Context, name string) Repository {
return &repository{
ctx: ctx,
registry: reg,
name: name,
}
}
// repository provides name-scoped access to various services.
type repository struct {
*registry
ctx context.Context
name string
}
// Name returns the name of the repository.
func (repo *repository) Name() string {
return repo.name
}
// Manifests returns an instance of ManifestService. Instantiation is cheap and
// may be context sensitive in the future. The instance should be used similar
// to a request local.
func (repo *repository) Manifests() ManifestService {
return &manifestStore{
repository: repo,
revisionStore: &revisionStore{
repository: repo,
},
tagStore: &tagStore{
repository: repo,
},
}
}
// Layers returns an instance of the LayerService. Instantiation is cheap and
// may be context sensitive in the future. The instance should be used similar
// to a request local.
func (repo *repository) Layers() LayerService {
return &layerStore{
repository: repo,
}
}

View file

@ -0,0 +1,207 @@
package storage
import (
"encoding/json"
"path"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/libtrust"
)
// revisionStore supports storing and managing manifest revisions.
type revisionStore struct {
*repository
}
// exists returns true if the revision is available in the named repository.
func (rs *revisionStore) exists(revision digest.Digest) (bool, error) {
revpath, err := rs.pm.path(manifestRevisionPathSpec{
name: rs.Name(),
revision: revision,
})
if err != nil {
return false, err
}
exists, err := exists(rs.driver, revpath)
if err != nil {
return false, err
}
return exists, nil
}
// get retrieves the manifest, keyed by revision digest.
func (rs *revisionStore) get(revision digest.Digest) (*manifest.SignedManifest, error) {
// Ensure that this revision is available in this repository.
if exists, err := rs.exists(revision); err != nil {
return nil, err
} else if !exists {
return nil, ErrUnknownManifestRevision{
Name: rs.Name(),
Revision: revision,
}
}
content, err := rs.blobStore.get(revision)
if err != nil {
return nil, err
}
// Fetch the signatures for the manifest
signatures, err := rs.getSignatures(revision)
if err != nil {
return nil, err
}
jsig, err := libtrust.NewJSONSignature(content, signatures...)
if err != nil {
return nil, err
}
// Extract the pretty JWS
raw, err := jsig.PrettySignature("signatures")
if err != nil {
return nil, err
}
var sm manifest.SignedManifest
if err := json.Unmarshal(raw, &sm); err != nil {
return nil, err
}
return &sm, nil
}
// put stores the manifest in the repository, if not already present. Any
// updated signatures will be stored, as well.
func (rs *revisionStore) put(sm *manifest.SignedManifest) (digest.Digest, error) {
// Resolve the payload in the manifest.
payload, err := sm.Payload()
if err != nil {
return "", err
}
// Digest and store the manifest payload in the blob store.
revision, err := rs.blobStore.put(payload)
if err != nil {
logrus.Errorf("error putting payload into blobstore: %v", err)
return "", err
}
// Link the revision into the repository.
if err := rs.link(revision); err != nil {
return "", err
}
// Grab each json signature and store them.
signatures, err := sm.Signatures()
if err != nil {
return "", err
}
for _, signature := range signatures {
if err := rs.putSignature(revision, signature); err != nil {
return "", err
}
}
return revision, nil
}
// link links the revision into the repository.
func (rs *revisionStore) link(revision digest.Digest) error {
revisionPath, err := rs.pm.path(manifestRevisionLinkPathSpec{
name: rs.Name(),
revision: revision,
})
if err != nil {
return err
}
if exists, err := exists(rs.driver, revisionPath); err != nil {
return err
} else if exists {
// Revision has already been linked!
return nil
}
return rs.blobStore.link(revisionPath, revision)
}
// delete removes the specified manifest revision from storage.
func (rs *revisionStore) delete(revision digest.Digest) error {
revisionPath, err := rs.pm.path(manifestRevisionPathSpec{
name: rs.Name(),
revision: revision,
})
if err != nil {
return err
}
return rs.driver.Delete(revisionPath)
}
// getSignatures retrieves all of the signature blobs for the specified
// manifest revision.
func (rs *revisionStore) getSignatures(revision digest.Digest) ([][]byte, error) {
signaturesPath, err := rs.pm.path(manifestSignaturesPathSpec{
name: rs.Name(),
revision: revision,
})
if err != nil {
return nil, err
}
// Need to append signature digest algorithm to path to get all items.
// Perhaps, this should be in the pathMapper but it feels awkward. This
// can be eliminated by implementing listAll on drivers.
signaturesPath = path.Join(signaturesPath, "sha256")
signaturePaths, err := rs.driver.List(signaturesPath)
if err != nil {
return nil, err
}
var signatures [][]byte
for _, sigPath := range signaturePaths {
// Append the link portion
sigPath = path.Join(sigPath, "link")
// TODO(stevvooe): These fetches should be parallelized for performance.
p, err := rs.blobStore.linked(sigPath)
if err != nil {
return nil, err
}
signatures = append(signatures, p)
}
return signatures, nil
}
// putSignature stores the signature for the provided manifest revision.
func (rs *revisionStore) putSignature(revision digest.Digest, signature []byte) error {
signatureDigest, err := rs.blobStore.put(signature)
if err != nil {
return err
}
signaturePath, err := rs.pm.path(manifestSignatureLinkPathSpec{
name: rs.Name(),
revision: revision,
signature: signatureDigest,
})
if err != nil {
return err
}
return rs.blobStore.link(signaturePath, signatureDigest)
}

84
docs/storage/services.go Normal file
View file

@ -0,0 +1,84 @@
package storage
import (
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"golang.org/x/net/context"
)
// TODO(stevvooe): These types need to be moved out of the storage package.
// Registry represents a collection of repositories, addressable by name.
type Registry interface {
// Repository should return a reference to the named repository. The
// registry may or may not have the repository but should always return a
// reference.
Repository(ctx context.Context, name string) Repository
}
// Repository is a named collection of manifests and layers.
type Repository interface {
// Name returns the name of the repository.
Name() string
// Manifests returns a reference to this repository's manifest service.
Manifests() ManifestService
// Layers returns a reference to this repository's layers service.
Layers() LayerService
}
// ManifestService provides operations on image manifests.
type ManifestService interface {
// Tags lists the tags under the named repository.
Tags() ([]string, error)
// Exists returns true if the manifest exists.
Exists(tag string) (bool, error)
// Get retrieves the named manifest, if it exists.
Get(tag string) (*manifest.SignedManifest, error)
// Put creates or updates the named manifest.
// Put(tag string, manifest *manifest.SignedManifest) (digest.Digest, error)
Put(tag string, manifest *manifest.SignedManifest) error
// Delete removes the named manifest, if it exists.
Delete(tag string) error
// TODO(stevvooe): There are several changes that need to be done to this
// interface:
//
// 1. Get(tag string) should be GetByTag(tag string)
// 2. Put(tag string, manifest *manifest.SignedManifest) should be
// Put(manifest *manifest.SignedManifest). The method can read the
// tag on manifest to automatically tag it in the repository.
// 3. Need a GetByDigest(dgst digest.Digest) method.
// 4. Allow explicit tagging with Tag(digest digest.Digest, tag string)
// 5. Support reading tags with a re-entrant reader to avoid large
// allocations in the registry.
// 6. Long-term: Provide All() method that lets one scroll through all of
// the manifest entries.
// 7. Long-term: break out concept of signing from manifests. This is
// really a part of the distribution sprint.
// 8. Long-term: Manifest should be an interface. This code shouldn't
// really be concerned with the storage format.
}
// LayerService provides operations on layer files in a backend storage.
type LayerService interface {
// Exists returns true if the layer exists.
Exists(digest digest.Digest) (bool, error)
// Fetch the layer identifed by TarSum.
Fetch(digest digest.Digest) (Layer, error)
// Upload begins a layer upload to repository identified by name,
// returning a handle.
Upload() (LayerUpload, error)
// Resume continues an in progress layer upload, returning a handle to the
// upload. The caller should seek to the latest desired upload location
// before proceeding.
Resume(uuid string) (LayerUpload, error)
}

157
docs/storage/tagstore.go Normal file
View file

@ -0,0 +1,157 @@
package storage
import (
"path"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/storagedriver"
)
// tagStore provides methods to manage manifest tags in a backend storage driver.
type tagStore struct {
*repository
}
// tags lists the manifest tags for the specified repository.
func (ts *tagStore) tags() ([]string, error) {
p, err := ts.pm.path(manifestTagPathSpec{
name: ts.name,
})
if err != nil {
return nil, err
}
var tags []string
entries, err := ts.driver.List(p)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
return nil, ErrUnknownRepository{Name: ts.name}
default:
return nil, err
}
}
for _, entry := range entries {
_, filename := path.Split(entry)
tags = append(tags, filename)
}
return tags, nil
}
// exists returns true if the specified manifest tag exists in the repository.
func (ts *tagStore) exists(tag string) (bool, error) {
tagPath, err := ts.pm.path(manifestTagCurrentPathSpec{
name: ts.Name(),
tag: tag,
})
if err != nil {
return false, err
}
exists, err := exists(ts.driver, tagPath)
if err != nil {
return false, err
}
return exists, nil
}
// tag tags the digest with the given tag, updating the the store to point at
// the current tag. The digest must point to a manifest.
func (ts *tagStore) tag(tag string, revision digest.Digest) error {
indexEntryPath, err := ts.pm.path(manifestTagIndexEntryPathSpec{
name: ts.Name(),
tag: tag,
revision: revision,
})
if err != nil {
return err
}
currentPath, err := ts.pm.path(manifestTagCurrentPathSpec{
name: ts.Name(),
tag: tag,
})
if err != nil {
return err
}
// Link into the index
if err := ts.blobStore.link(indexEntryPath, revision); err != nil {
return err
}
// Overwrite the current link
return ts.blobStore.link(currentPath, revision)
}
// resolve the current revision for name and tag.
func (ts *tagStore) resolve(tag string) (digest.Digest, error) {
currentPath, err := ts.pm.path(manifestTagCurrentPathSpec{
name: ts.Name(),
tag: tag,
})
if err != nil {
return "", err
}
if exists, err := exists(ts.driver, currentPath); err != nil {
return "", err
} else if !exists {
return "", ErrUnknownManifest{Name: ts.Name(), Tag: tag}
}
revision, err := ts.blobStore.readlink(currentPath)
if err != nil {
return "", err
}
return revision, nil
}
// revisions returns all revisions with the specified name and tag.
func (ts *tagStore) revisions(tag string) ([]digest.Digest, error) {
manifestTagIndexPath, err := ts.pm.path(manifestTagIndexPathSpec{
name: ts.Name(),
tag: tag,
})
if err != nil {
return nil, err
}
// TODO(stevvooe): Need to append digest alg to get listing of revisions.
manifestTagIndexPath = path.Join(manifestTagIndexPath, "sha256")
entries, err := ts.driver.List(manifestTagIndexPath)
if err != nil {
return nil, err
}
var revisions []digest.Digest
for _, entry := range entries {
revisions = append(revisions, digest.NewDigestFromHex("sha256", path.Base(entry)))
}
return revisions, nil
}
// delete removes the tag from repository, including the history of all
// revisions that have the specified tag.
func (ts *tagStore) delete(tag string) error {
tagPath, err := ts.pm.path(manifestTagPathSpec{
name: ts.Name(),
tag: tag,
})
if err != nil {
return err
}
return ts.driver.Delete(tagPath)
}