From 621164bc84e1481b003480fce4821a62653f5bb3 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Fri, 17 Feb 2017 00:07:02 -0800 Subject: [PATCH] content: refactor content store for API After iterating on the GRPC API, the changes required for the actual implementation are now included in the content store. The begin change is the move to a single, atomic `Ingester.Writer` method for locking content ingestion on a key. From this, comes several new interface definitions. The main benefit here is the clarification between `Status` and `Info` that came out of the GPRC API. `Status` tells the status of a write, whereas `Info` is for querying metadata about various blobs. Signed-off-by: Stephen J Day --- cmd/dist/active.go | 23 +-- cmd/dist/common.go | 34 ++++ cmd/dist/delete.go | 21 +-- cmd/dist/get.go | 39 +++++ cmd/dist/ingest.go | 52 +----- cmd/dist/list.go | 17 +- cmd/dist/main.go | 27 ++- cmd/dist/path.go | 89 ---------- content/content.go | 355 +++------------------------------------- content/content_test.go | 55 +++---- content/helpers.go | 48 +++--- content/locks.go | 12 +- content/store.go | 355 ++++++++++++++++++++++++++++++++++++++++ content/writer.go | 57 +++---- 14 files changed, 573 insertions(+), 611 deletions(-) create mode 100644 cmd/dist/common.go create mode 100644 cmd/dist/get.go delete mode 100644 cmd/dist/path.go create mode 100644 content/store.go diff --git a/cmd/dist/active.go b/cmd/dist/active.go index 12be2d3..24b39a6 100644 --- a/cmd/dist/active.go +++ b/cmd/dist/active.go @@ -3,11 +3,9 @@ package main import ( "fmt" "os" - "path/filepath" "text/tabwriter" "time" - "github.com/docker/containerd/content" units "github.com/docker/go-units" "github.com/urfave/cli" ) @@ -26,24 +24,11 @@ var activeCommand = cli.Command{ cli.StringFlag{ Name: "root", Usage: "path to content store root", - Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content + Value: "/tmp/content", // TODO(stevvooe): for now, just use the PWD/.content }, }, Action: func(context *cli.Context) error { - var ( - // ctx = contextpkg.Background() - root = context.String("root") - ) - - if !filepath.IsAbs(root) { - var err error - root, err = filepath.Abs(root) - if err != nil { - return err - } - } - - cs, err := content.Open(root) + cs, err := resolveContentStore(context) if err != nil { return err } @@ -58,8 +43,8 @@ var activeCommand = cli.Command{ for _, active := range active { fmt.Fprintf(tw, "%s\t%s\t%s\n", active.Ref, - units.HumanSize(float64(active.Size)), - units.HumanDuration(time.Since(active.ModTime))) + units.HumanSize(float64(active.Offset)), + units.HumanDuration(time.Since(active.StartedAt))) } tw.Flush() diff --git a/cmd/dist/common.go b/cmd/dist/common.go new file mode 100644 index 0000000..404aa76 --- /dev/null +++ b/cmd/dist/common.go @@ -0,0 +1,34 @@ +package main + +import ( + "net" + "path/filepath" + "time" + + "github.com/docker/containerd/content" + "github.com/urfave/cli" + "google.golang.org/grpc" +) + +func resolveContentStore(context *cli.Context) (*content.Store, error) { + root := context.GlobalString("root") + if !filepath.IsAbs(root) { + var err error + root, err = filepath.Abs(root) + if err != nil { + return nil, err + } + } + return content.NewStore(root) +} + +func connectGRPC(context *cli.Context) (*grpc.ClientConn, error) { + socket := context.GlobalString("socket") + return grpc.Dial(socket, + grpc.WithBlock(), + grpc.WithInsecure(), + grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", socket, timeout) + }), + ) +} diff --git a/cmd/dist/delete.go b/cmd/dist/delete.go index 9cce3b5..6657286 100644 --- a/cmd/dist/delete.go +++ b/cmd/dist/delete.go @@ -3,9 +3,7 @@ package main import ( contextpkg "context" "fmt" - "path/filepath" - "github.com/docker/containerd/content" "github.com/docker/containerd/log" digest "github.com/opencontainers/go-digest" "github.com/urfave/cli" @@ -18,30 +16,15 @@ var deleteCommand = cli.Command{ ArgsUsage: "[flags] [, ...]", Description: `Delete one or more blobs permanently. Successfully deleted blobs are printed to stdout.`, - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "root", - Usage: "path to content store root", - Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content - }, - }, + Flags: []cli.Flag{}, Action: func(context *cli.Context) error { var ( ctx = contextpkg.Background() - root = context.String("root") args = []string(context.Args()) exitError error ) - if !filepath.IsAbs(root) { - var err error - root, err = filepath.Abs(root) - if err != nil { - return err - } - } - - cs, err := content.Open(root) + cs, err := resolveContentStore(context) if err != nil { return err } diff --git a/cmd/dist/get.go b/cmd/dist/get.go new file mode 100644 index 0000000..bf35933 --- /dev/null +++ b/cmd/dist/get.go @@ -0,0 +1,39 @@ +package main + +import ( + "io" + "os" + + digest "github.com/opencontainers/go-digest" + "github.com/urfave/cli" +) + +var getCommand = cli.Command{ + Name: "get", + Usage: "get the data for an object", + ArgsUsage: "[flags] [, ...]", + Description: `Display the paths to one or more blobs. + +Output paths can be used to directly access blobs on disk.`, + Flags: []cli.Flag{}, + Action: func(context *cli.Context) error { + cs, err := resolveContentStore(context) + if err != nil { + return err + } + + dgst, err := digest.Parse(context.Args().First()) + if err != nil { + return err + } + + rc, err := cs.Open(dgst) + if err != nil { + return err + } + defer rc.Close() + + _, err = io.Copy(os.Stdout, rc) + return err + }, +} diff --git a/cmd/dist/ingest.go b/cmd/dist/ingest.go index 206c186..e3353cc 100644 --- a/cmd/dist/ingest.go +++ b/cmd/dist/ingest.go @@ -4,8 +4,6 @@ import ( contextpkg "context" "fmt" "os" - "path/filepath" - "strings" "github.com/docker/containerd/content" "github.com/opencontainers/go-digest" @@ -18,17 +16,6 @@ var ingestCommand = cli.Command{ ArgsUsage: "[flags] ", Description: `Ingest objects into the local content store.`, Flags: []cli.Flag{ - cli.DurationFlag{ - Name: "timeout", - Usage: "total timeout for fetch", - EnvVar: "CONTAINERD_FETCH_TIMEOUT", - }, - cli.StringFlag{ - Name: "path, p", - Usage: "path to content store", - Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content - EnvVar: "CONTAINERD_DIST_CONTENT_STORE", - }, cli.Int64Flag{ Name: "expected-size", Usage: "validate against provided size", @@ -40,57 +27,32 @@ var ingestCommand = cli.Command{ }, Action: func(context *cli.Context) error { var ( - ctx = contextpkg.Background() - timeout = context.Duration("timeout") - root = context.String("path") + ctx = background + cancel func() ref = context.Args().First() expectedSize = context.Int64("expected-size") expectedDigest = digest.Digest(context.String("expected-digest")) ) - if timeout > 0 { - var cancel func() - ctx, cancel = contextpkg.WithTimeout(ctx, timeout) - defer cancel() - } + ctx, cancel = contextpkg.WithCancel(ctx) + defer cancel() if err := expectedDigest.Validate(); expectedDigest != "" && err != nil { return err } - if !filepath.IsAbs(root) { - var err error - root, err = filepath.Abs(root) - if err != nil { - return err - } - } - - cs, err := content.Open(root) + cs, err := resolveContentStore(context) if err != nil { return err } - if expectedDigest != "" { - if ok, err := cs.Exists(expectedDigest); err != nil { - return err - } else if ok { - fmt.Fprintf(os.Stderr, "content with digest %v already exists\n", expectedDigest) - return nil - } - } - if ref == "" { - if expectedDigest == "" { - return fmt.Errorf("must specify a transaction reference or expected digest") - } - - ref = strings.Replace(expectedDigest.String(), ":", "-", -1) + return fmt.Errorf("must specify a transaction reference") } // TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect // all data to be written in a single invocation. Allow multiple writes // to the same transaction key followed by a commit. - return content.WriteBlob(cs, os.Stdin, ref, expectedSize, expectedDigest) + return content.WriteBlob(ctx, cs, os.Stdin, ref, expectedSize, expectedDigest) }, } diff --git a/cmd/dist/list.go b/cmd/dist/list.go index 11b5894..2ba1d88 100644 --- a/cmd/dist/list.go +++ b/cmd/dist/list.go @@ -4,7 +4,6 @@ import ( contextpkg "context" "fmt" "os" - "path/filepath" "text/tabwriter" "time" @@ -22,11 +21,6 @@ var listCommand = cli.Command{ ArgsUsage: "[flags] [, ...]", Description: `List blobs in the content store.`, Flags: []cli.Flag{ - cli.StringFlag{ - Name: "root", - Usage: "path to content store root", - Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content - }, cli.BoolFlag{ Name: "quiet, q", Usage: "print only the blob digest", @@ -35,20 +29,11 @@ var listCommand = cli.Command{ Action: func(context *cli.Context) error { var ( ctx = contextpkg.Background() - root = context.String("root") quiet = context.Bool("quiet") args = []string(context.Args()) ) - if !filepath.IsAbs(root) { - var err error - root, err = filepath.Abs(root) - if err != nil { - return err - } - } - - cs, err := content.Open(root) + cs, err := resolveContentStore(context) if err != nil { return err } diff --git a/cmd/dist/main.go b/cmd/dist/main.go index 5a19608..de4cf22 100644 --- a/cmd/dist/main.go +++ b/cmd/dist/main.go @@ -1,6 +1,7 @@ package main import ( + contextpkg "context" "fmt" "os" @@ -9,6 +10,10 @@ import ( "github.com/urfave/cli" ) +var ( + background = contextpkg.Background() +) + func main() { app := cli.NewApp() app.Name = "dist" @@ -27,20 +32,38 @@ distribution tool Name: "debug", Usage: "enable debug output in logs", }, + cli.DurationFlag{ + Name: "timeout", + Usage: "total timeout for fetch", + EnvVar: "CONTAINERD_FETCH_TIMEOUT", + }, + cli.StringFlag{ + Name: "root", + Usage: "path to content store root", + Value: "/tmp/content", // TODO(stevvooe): for now, just use the PWD/.content + }, } app.Commands = []cli.Command{ fetchCommand, ingestCommand, activeCommand, - pathCommand, + getCommand, deleteCommand, listCommand, applyCommand, } app.Before = func(context *cli.Context) error { - if context.GlobalBool("debug") { + var ( + debug = context.GlobalBool("debug") + timeout = context.GlobalDuration("timeout") + ) + if debug { logrus.SetLevel(logrus.DebugLevel) } + + if timeout > 0 { + background, _ = contextpkg.WithTimeout(background, timeout) + } return nil } if err := app.Run(os.Args); err != nil { diff --git a/cmd/dist/path.go b/cmd/dist/path.go deleted file mode 100644 index 02215a8..0000000 --- a/cmd/dist/path.go +++ /dev/null @@ -1,89 +0,0 @@ -package main - -import ( - contextpkg "context" - "fmt" - "path/filepath" - - "github.com/docker/containerd/content" - "github.com/docker/containerd/log" - digest "github.com/opencontainers/go-digest" - "github.com/urfave/cli" -) - -var pathCommand = cli.Command{ - Name: "path", - Usage: "print the path to one or more blobs", - ArgsUsage: "[flags] [, ...]", - Description: `Display the paths to one or more blobs. - -Output paths can be used to directly access blobs on disk.`, - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "root", - Usage: "path to content store root", - Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content - EnvVar: "CONTAINERD_DIST_CONTENT_STORE", - }, - cli.BoolFlag{ - Name: "quiet, q", - Usage: "elide digests in output", - }, - }, - Action: func(context *cli.Context) error { - var ( - ctx = contextpkg.Background() - root = context.String("root") - args = []string(context.Args()) - quiet = context.Bool("quiet") - exitError error - ) - - if !filepath.IsAbs(root) { - var err error - root, err = filepath.Abs(root) - if err != nil { - return err - } - } - - cs, err := content.Open(root) - if err != nil { - return err - } - - // TODO(stevvooe): Take the set of paths from stdin. - - if len(args) < 1 { - return fmt.Errorf("please specify a blob digest") - } - - for _, arg := range args { - dgst, err := digest.Parse(arg) - if err != nil { - log.G(ctx).WithError(err).Errorf("parsing %q as digest failed", arg) - if exitError == nil { - exitError = err - } - continue - } - - p, err := cs.GetPath(dgst) - if err != nil { - log.G(ctx).WithError(err).Errorf("getting path for %q failed", dgst) - if exitError == nil { - exitError = err - } - continue - } - - if !quiet { - fmt.Println(dgst, p) - } else { - fmt.Println(p) - } - } - - return exitError - }, -} diff --git a/content/content.go b/content/content.go index d7fa750..b63ed1d 100644 --- a/content/content.go +++ b/content/content.go @@ -1,364 +1,53 @@ package content import ( + "context" "io" - "io/ioutil" - "os" - "path/filepath" "sync" "time" - "github.com/docker/containerd/log" - "github.com/nightlyone/lockfile" "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) var ( - ErrBlobNotFound = errors.New("blob not found") + errNotFound = errors.New("content: not found") bufPool = sync.Pool{ New: func() interface{} { - return make([]byte, 32<<10) + return make([]byte, 1<<20) }, } ) -// Store is digest-keyed store for content. All data written into the store is -// stored under a verifiable digest. -// -// Store can generally support multi-reader, single-writer ingest of data, -// including resumable ingest. -type Store struct { - root string +type Info struct { + Digest digest.Digest + Size int64 + CommittedAt time.Time } -func Open(root string) (*Store, error) { - if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil && !os.IsExist(err) { - return nil, err - } - - return &Store{ - root: root, - }, nil +type Provider interface { + Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) } type Status struct { - Ref string - Size int64 - ModTime time.Time - Meta interface{} + Ref string + Offset int64 + StartedAt time.Time + UpdatedAt time.Time } -func (cs *Store) Exists(dgst digest.Digest) (bool, error) { - if _, err := os.Stat(cs.blobPath(dgst)); err != nil { - if !os.IsNotExist(err) { - return false, err - } - - return false, nil - } - - return true, nil +type Writer interface { + io.WriteCloser + Status() (Status, error) + Digest() digest.Digest + Commit(size int64, expected digest.Digest) error } -func (cs *Store) GetPath(dgst digest.Digest) (string, error) { - p := cs.blobPath(dgst) - if _, err := os.Stat(p); err != nil { - if os.IsNotExist(err) { - return "", ErrBlobNotFound - } - - return "", err - } - - return p, nil +type Ingester interface { + Writer(ctx context.Context, ref string) (Writer, error) } -// Delete removes a blob by its digest. -// -// While this is safe to do concurrently, safe exist-removal logic must hold -// some global lock on the store. -func (cs *Store) Delete(dgst digest.Digest) error { - if err := os.RemoveAll(cs.blobPath(dgst)); err != nil { - if !os.IsNotExist(err) { - return err - } - - return nil - } - - return nil -} - -func (cs *Store) blobPath(dgst digest.Digest) string { - return filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) -} - -// Stat returns the current status of a blob by the ingest ref. -func (cs *Store) Stat(ref string) (Status, error) { - dp := filepath.Join(cs.ingestRoot(ref), "data") - return cs.stat(dp) -} - -// stat works like stat above except uses the path to the ingest. -func (cs *Store) stat(ingestPath string) (Status, error) { - dp := filepath.Join(ingestPath, "data") - dfi, err := os.Stat(dp) - if err != nil { - return Status{}, err - } - - ref, err := readFileString(filepath.Join(ingestPath, "ref")) - if err != nil { - return Status{}, err - } - - return Status{ - Ref: ref, - Size: dfi.Size(), - ModTime: dfi.ModTime(), - }, nil -} - -func (cs *Store) Active() ([]Status, error) { - ip := filepath.Join(cs.root, "ingest") - - fp, err := os.Open(ip) - if err != nil { - return nil, err - } - - fis, err := fp.Readdir(-1) - if err != nil { - return nil, err - } - - var active []Status - for _, fi := range fis { - p := filepath.Join(ip, fi.Name()) - stat, err := cs.stat(p) - if err != nil { - if !os.IsNotExist(err) { - return nil, err - } - - // TODO(stevvooe): This is a common error if uploads are being - // completed while making this listing. Need to consider taking a - // lock on the whole store to coordinate this aspect. - // - // Another option is to cleanup downloads asynchronously and - // coordinate this method with the cleanup process. - // - // For now, we just skip them, as they really don't exist. - continue - } - - active = append(active, stat) - } - - return active, nil -} - -// TODO(stevvooe): Allow querying the set of blobs in the blob store. - -// WalkFunc defines the callback for a blob walk. -// -// TODO(stevvooe): Remove the file info. Just need size and modtime. Perhaps, -// not a huge deal, considering we have a path, but let's not just let this one -// go without scrutiny. -type WalkFunc func(path string, fi os.FileInfo, dgst digest.Digest) error - -func (cs *Store) Walk(fn WalkFunc) error { - root := filepath.Join(cs.root, "blobs") - var alg digest.Algorithm - return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error { - if err != nil { - return err - } - if !fi.IsDir() && !alg.Available() { - return nil - } - - // TODO(stevvooe): There are few more cases with subdirs that should be - // handled in case the layout gets corrupted. This isn't strict enough - // an may spew bad data. - - if path == root { - return nil - } - if filepath.Dir(path) == root { - alg = digest.Algorithm(filepath.Base(path)) - - if !alg.Available() { - alg = "" - return filepath.SkipDir - } - - // descending into a hash directory - return nil - } - - dgst := digest.NewDigestFromHex(alg.String(), filepath.Base(path)) - if err := dgst.Validate(); err != nil { - // log error but don't report - log.L.WithError(err).WithField("path", path).Error("invalid digest for blob path") - // if we see this, it could mean some sort of corruption of the - // store or extra paths not expected previously. - } - - return fn(path, fi, dgst) - }) -} - -// Begin starts a new write transaction against the blob store. -// -// The argument `ref` is used to identify the transaction. It must be a valid -// path component, meaning it has no `/` characters and no `:` (we'll ban -// others fs characters, as needed). -func (cs *Store) Begin(ref string) (*Writer, error) { - path, refp, data, lock, err := cs.ingestPaths(ref) - if err != nil { - return nil, err - } - - // use single path mkdir for this to ensure ref is only base path, in - // addition to validation above. - if err := os.Mkdir(path, 0755); err != nil { - return nil, err - } - - if err := tryLock(lock); err != nil { - return nil, err - } - - // write the ref to a file for later use - if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil { - return nil, err - } - - fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0666) - if err != nil { - return nil, errors.Wrap(err, "failed to open data file") - } - defer fp.Close() - - // re-open the file in append mode - fp, err = os.OpenFile(data, os.O_WRONLY|os.O_APPEND, 0666) - if err != nil { - return nil, errors.Wrap(err, "error opening for append") - } - - return &Writer{ - cs: cs, - fp: fp, - lock: lock, - path: path, - digester: digest.Canonical.Digester(), - }, nil -} - -func (cs *Store) Resume(ref string) (*Writer, error) { - path, refp, data, lock, err := cs.ingestPaths(ref) - if err != nil { - return nil, err - } - - if err := tryLock(lock); err != nil { - return nil, err - } - - refraw, err := readFileString(refp) - if err != nil { - return nil, errors.Wrap(err, "could not read ref") - } - - if ref != refraw { - // NOTE(stevvooe): This is fairly catastrophic. Either we have some - // layout corruption or a hash collision for the ref key. - return nil, errors.Wrapf(err, "ref key does not match: %v != %v", ref, refraw) - } - - digester := digest.Canonical.Digester() - - // slow slow slow!!, send to goroutine or use resumable hashes - fp, err := os.Open(data) - if err != nil { - return nil, err - } - defer fp.Close() - - p := bufPool.Get().([]byte) - defer bufPool.Put(p) - - offset, err := io.CopyBuffer(digester.Hash(), fp, p) - if err != nil { - return nil, err - } - - fp1, err := os.OpenFile(data, os.O_WRONLY|os.O_APPEND, 0666) - if err != nil { - if os.IsNotExist(err) { - return nil, errors.Wrap(err, "ingest does not exist") - } - - return nil, errors.Wrap(err, "error opening for append") - } - - return &Writer{ - cs: cs, - fp: fp1, - lock: lock, - ref: ref, - path: path, - offset: offset, - digester: digester, - }, nil -} - -// Remove an active transaction keyed by ref. -func (cs *Store) Remove(ref string) error { - root := cs.ingestRoot(ref) - if err := os.RemoveAll(root); err != nil { - if os.IsNotExist(err) { - return nil - } - - return err - } - - return nil -} - -func (cs *Store) ingestRoot(ref string) string { - dgst := digest.FromString(ref) - return filepath.Join(cs.root, "ingest", dgst.Hex()) -} - -// ingestPaths are returned, including the lockfile. The paths are the following: -// -// - root: entire ingest directory -// - ref: name of the starting ref, must be unique -// - data: file where data is written -// - lock: lock file location -// -func (cs *Store) ingestPaths(ref string) (string, string, string, lockfile.Lockfile, error) { - var ( - fp = cs.ingestRoot(ref) - rp = filepath.Join(fp, "ref") - lp = filepath.Join(fp, "lock") - dp = filepath.Join(fp, "data") - ) - - lock, err := lockfile.New(lp) - if err != nil { - return "", "", "", "", errors.Wrapf(err, "error creating lockfile %v", lp) - } - - return fp, rp, dp, lock, nil -} - -func readFileString(path string) (string, error) { - p, err := ioutil.ReadFile(path) - return string(p), err +func IsNotFound(err error) bool { + return errors.Cause(err) == errNotFound } diff --git a/content/content_test.go b/content/content_test.go index a469630..b831a54 100644 --- a/content/content_test.go +++ b/content/content_test.go @@ -3,6 +3,7 @@ package content import ( "bufio" "bytes" + "context" "crypto/rand" _ "crypto/sha256" // required for digest package "fmt" @@ -21,7 +22,7 @@ import ( ) func TestContentWriter(t *testing.T) { - tmpdir, cs, cleanup := contentStoreEnv(t) + ctx, tmpdir, cs, cleanup := contentStoreEnv(t) defer cleanup() defer testutil.DumpDir(t, tmpdir) @@ -29,7 +30,7 @@ func TestContentWriter(t *testing.T) { t.Fatal("ingest dir should be created", err) } - cw, err := cs.Begin("myref") + cw, err := cs.Writer(ctx, "myref") if err != nil { t.Fatal(err) } @@ -37,20 +38,14 @@ func TestContentWriter(t *testing.T) { t.Fatal(err) } - // try to begin again with same ref, should fail - cw, err = cs.Begin("myref") - if err == nil { - t.Fatal("expected error on repeated begin") - } - // reopen, so we can test things - cw, err = cs.Resume("myref") + cw, err = cs.Writer(ctx, "myref") if err != nil { t.Fatal(err) } // make sure that second resume also fails - if _, err = cs.Resume("myref"); err == nil { + if _, err = cs.Writer(ctx, "myref"); err == nil { // TODO(stevvooe): This also works across processes. Need to find a way // to test that, as well. t.Fatal("no error on second resume") @@ -64,14 +59,14 @@ func TestContentWriter(t *testing.T) { // clear out the time and meta cause we don't care for this test for i := range ingestions { - ingestions[i].Meta = nil - ingestions[i].ModTime = time.Time{} + ingestions[i].UpdatedAt = time.Time{} + ingestions[i].StartedAt = time.Time{} } if !reflect.DeepEqual(ingestions, []Status{ { - Ref: "myref", - Size: 0, + Ref: "myref", + Offset: 0, }, }) { t.Fatalf("unexpected ingestion set: %v", ingestions) @@ -93,7 +88,7 @@ func TestContentWriter(t *testing.T) { t.Fatal(err) } - cw, err = cs.Begin("aref") + cw, err = cs.Writer(ctx, "aref") if err != nil { t.Fatal(err) } @@ -119,7 +114,7 @@ func TestContentWriter(t *testing.T) { } func TestWalkBlobs(t *testing.T) { - _, cs, cleanup := contentStoreEnv(t) + ctx, _, cs, cleanup := contentStoreEnv(t) defer cleanup() const ( @@ -128,7 +123,7 @@ func TestWalkBlobs(t *testing.T) { ) var ( - blobs = populateBlobStore(t, cs, nblobs, maxsize) + blobs = populateBlobStore(t, ctx, cs, nblobs, maxsize) expected = map[digest.Digest]struct{}{} found = map[digest.Digest]struct{}{} ) @@ -158,7 +153,7 @@ func TestWalkBlobs(t *testing.T) { // for blobs. This seems to be due to the number of syscalls and file io we do // coordinating the ingestion. func BenchmarkIngests(b *testing.B) { - _, cs, cleanup := contentStoreEnv(b) + ctx, _, cs, cleanup := contentStoreEnv(b) defer cleanup() for _, size := range []int64{ @@ -181,7 +176,7 @@ func BenchmarkIngests(b *testing.B) { b.StartTimer() for dgst, p := range blobs { - checkWrite(b, cs, dgst, p) + checkWrite(b, ctx, cs, dgst, p) } }) } @@ -208,17 +203,17 @@ func generateBlobs(t checker, nblobs, maxsize int64) map[digest.Digest][]byte { return blobs } -func populateBlobStore(t checker, cs *Store, nblobs, maxsize int64) map[digest.Digest][]byte { +func populateBlobStore(t checker, ctx context.Context, cs *Store, nblobs, maxsize int64) map[digest.Digest][]byte { blobs := generateBlobs(t, nblobs, maxsize) for dgst, p := range blobs { - checkWrite(t, cs, dgst, p) + checkWrite(t, ctx, cs, dgst, p) } return blobs } -func contentStoreEnv(t checker) (string, *Store, func()) { +func contentStoreEnv(t checker) (context.Context, string, *Store, func()) { pc, _, _, ok := runtime.Caller(1) if !ok { t.Fatal("failed to resolve caller") @@ -230,13 +225,15 @@ func contentStoreEnv(t checker) (string, *Store, func()) { t.Fatal(err) } - cs, err := Open(tmpdir) + cs, err := NewStore(tmpdir) if err != nil { os.RemoveAll(tmpdir) t.Fatal(err) } - return tmpdir, cs, func() { + ctx, cancel := context.WithCancel(context.Background()) + return ctx, tmpdir, cs, func() { + cancel() os.RemoveAll(tmpdir) } } @@ -253,10 +250,8 @@ func checkCopy(t checker, size int64, dst io.Writer, src io.Reader) { } func checkBlobPath(t *testing.T, cs *Store, dgst digest.Digest) string { - path, err := cs.GetPath(dgst) - if err != nil { - t.Fatal(err, dgst) - } + path := cs.blobPath(dgst) + if path != filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) { t.Fatalf("unexpected path: %q", path) } @@ -273,8 +268,8 @@ func checkBlobPath(t *testing.T, cs *Store, dgst digest.Digest) string { return path } -func checkWrite(t checker, cs *Store, dgst digest.Digest, p []byte) digest.Digest { - if err := WriteBlob(cs, bytes.NewReader(p), dgst.String(), int64(len(p)), dgst); err != nil { +func checkWrite(t checker, ctx context.Context, cs *Store, dgst digest.Digest, p []byte) digest.Digest { + if err := WriteBlob(ctx, cs, bytes.NewReader(p), dgst.String(), int64(len(p)), dgst); err != nil { t.Fatal(err) } diff --git a/content/helpers.go b/content/helpers.go index 37cb71e..4209350 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -1,37 +1,14 @@ package content import ( + "context" "io" - "os" + "io/ioutil" "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) -// Provider gives access to blob content by paths. -// -// Typically, this is implemented by `*Store`. -type Provider interface { - GetPath(dgst digest.Digest) (string, error) -} - -// OpenBlob opens the blob for reading identified by dgst. -// -// The opened blob may also implement seek. Callers can detect with io.Seeker. -func OpenBlob(provider Provider, dgst digest.Digest) (io.ReadCloser, error) { - path, err := provider.GetPath(dgst) - if err != nil { - return nil, err - } - - fp, err := os.Open(path) - return fp, err -} - -type Ingester interface { - Begin(key string) (*Writer, error) -} - // WriteBlob writes data with the expected digest into the content store. If // expected already exists, the method returns immediately and the reader will // not be consumed. @@ -39,11 +16,23 @@ type Ingester interface { // This is useful when the digest and size are known beforehand. // // Copy is buffered, so no need to wrap reader in buffered io. -func WriteBlob(cs Ingester, r io.Reader, ref string, size int64, expected digest.Digest) error { - cw, err := cs.Begin(ref) +func WriteBlob(ctx context.Context, cs Ingester, r io.Reader, ref string, size int64, expected digest.Digest) error { + cw, err := cs.Writer(ctx, ref) if err != nil { return err } + + ws, err := cw.Status() + if err != nil { + return err + } + + if ws.Offset > 0 { + // Arbitrary limitation for now. We can detect io.Seeker on r and + // resume. + return errors.Errorf("cannot resume already started write") + } + buf := bufPool.Get().([]byte) defer bufPool.Put(buf) @@ -62,3 +51,8 @@ func WriteBlob(cs Ingester, r io.Reader, ref string, size int64, expected digest return nil } + +func readFileString(path string) (string, error) { + p, err := ioutil.ReadFile(path) + return string(p), err +} diff --git a/content/locks.go b/content/locks.go index dd2cae9..400793d 100644 --- a/content/locks.go +++ b/content/locks.go @@ -1,10 +1,10 @@ package content import ( - "errors" "sync" "github.com/nightlyone/lockfile" + "github.com/pkg/errors" ) // In addition to providing inter-process locks for content ingest, we also @@ -16,6 +16,8 @@ import ( // error reporting. var ( + errLocked = errors.New("key is locked") + // locks lets us lock in process, as well as output of process. locks = map[lockfile.Lockfile]struct{}{} locksMu sync.Mutex @@ -26,11 +28,15 @@ func tryLock(lock lockfile.Lockfile) error { defer locksMu.Unlock() if _, ok := locks[lock]; ok { - return errors.New("file in use") + return errLocked } if err := lock.TryLock(); err != nil { - return err + if errors.Cause(err) == lockfile.ErrBusy { + return errLocked + } + + return errors.Wrapf(err, "lock.TryLock() encountered an error") } locks[lock] = struct{}{} diff --git a/content/store.go b/content/store.go new file mode 100644 index 0000000..dfe1993 --- /dev/null +++ b/content/store.go @@ -0,0 +1,355 @@ +package content + +import ( + "context" + "io" + "io/ioutil" + "os" + "path/filepath" + "syscall" + "time" + + "github.com/docker/containerd/log" + "github.com/nightlyone/lockfile" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" +) + +// Store is digest-keyed store for content. All data written into the store is +// stored under a verifiable digest. +// +// Store can generally support multi-reader, single-writer ingest of data, +// including resumable ingest. +type Store struct { + root string +} + +func NewStore(root string) (*Store, error) { + if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil && !os.IsExist(err) { + return nil, err + } + + return &Store{ + root: root, + }, nil +} + +func (s *Store) Info(dgst digest.Digest) (Info, error) { + p := s.blobPath(dgst) + fi, err := os.Stat(p) + if err != nil { + if os.IsNotExist(err) { + err = errNotFound + } + + return Info{}, err + } + + return Info{ + Digest: dgst, + Size: fi.Size(), + CommittedAt: fi.ModTime(), + }, nil +} + +// Open returns an io.ReadCloser for the blob. +// +// TODO(stevvooe): This would work much better as an io.ReaderAt in practice. +// Right now, we are doing type assertion to tease that out, but it won't scale +// well. +func (s *Store) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) { + fp, err := os.Open(s.blobPath(dgst)) + if err != nil { + if os.IsNotExist(err) { + err = errNotFound + } + return nil, err + } + + return fp, nil +} + +// Delete removes a blob by its digest. +// +// While this is safe to do concurrently, safe exist-removal logic must hold +// some global lock on the store. +func (cs *Store) Delete(dgst digest.Digest) error { + if err := os.RemoveAll(cs.blobPath(dgst)); err != nil { + if !os.IsNotExist(err) { + return err + } + + return nil + } + + return nil +} + +// TODO(stevvooe): Allow querying the set of blobs in the blob store. + +// WalkFunc defines the callback for a blob walk. +// +// TODO(stevvooe): Remove the file info. Just need size and modtime. Perhaps, +// not a huge deal, considering we have a path, but let's not just let this one +// go without scrutiny. +type WalkFunc func(path string, fi os.FileInfo, dgst digest.Digest) error + +func (cs *Store) Walk(fn WalkFunc) error { + root := filepath.Join(cs.root, "blobs") + var alg digest.Algorithm + return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error { + if err != nil { + return err + } + if !fi.IsDir() && !alg.Available() { + return nil + } + + // TODO(stevvooe): There are few more cases with subdirs that should be + // handled in case the layout gets corrupted. This isn't strict enough + // an may spew bad data. + + if path == root { + return nil + } + if filepath.Dir(path) == root { + alg = digest.Algorithm(filepath.Base(path)) + + if !alg.Available() { + alg = "" + return filepath.SkipDir + } + + // descending into a hash directory + return nil + } + + dgst := digest.NewDigestFromHex(alg.String(), filepath.Base(path)) + if err := dgst.Validate(); err != nil { + // log error but don't report + log.L.WithError(err).WithField("path", path).Error("invalid digest for blob path") + // if we see this, it could mean some sort of corruption of the + // store or extra paths not expected previously. + } + + return fn(path, fi, dgst) + }) +} + +// Stat returns the current status of a blob by the ingest ref. +func (s *Store) Status(ref string) (Status, error) { + dp := filepath.Join(s.ingestRoot(ref), "data") + return s.status(dp) +} + +// stat works like stat above except uses the path to the ingest. +func (s *Store) status(ingestPath string) (Status, error) { + dp := filepath.Join(ingestPath, "data") + fi, err := os.Stat(dp) + if err != nil { + return Status{}, err + } + + ref, err := readFileString(filepath.Join(ingestPath, "ref")) + if err != nil { + return Status{}, err + } + + var startedAt time.Time + if st, ok := fi.Sys().(*syscall.Stat_t); ok { + startedAt = time.Unix(st.Ctim.Sec, st.Ctim.Nsec) + } else { + startedAt = fi.ModTime() + } + + return Status{ + Ref: ref, + Offset: fi.Size(), + UpdatedAt: fi.ModTime(), + StartedAt: startedAt, + }, nil +} + +// Writer begins or resumes the active writer identified by ref. If the writer +// is already in use, an error is returned. Only one writer may be in use per +// ref at a time. +// +// The argument `ref` is used to uniquely identify a long-lived writer transaction. +func (s *Store) Writer(ctx context.Context, ref string) (Writer, error) { + path, refp, data, lock, err := s.ingestPaths(ref) + if err != nil { + return nil, err + } + + if err := tryLock(lock); err != nil { + if !os.IsNotExist(errors.Cause(err)) { + return nil, errors.Wrapf(err, "locking %v failed", ref) + } + + // if it doesn't exist, we'll make it so below! + } + + var ( + digester = digest.Canonical.Digester() + offset int64 + startedAt time.Time + updatedAt time.Time + ) + + // ensure that the ingest path has been created. + if err := os.Mkdir(path, 0755); err != nil { + if !os.IsExist(err) { + return nil, err + } + + // validate that we have no collision for the ref. + refraw, err := readFileString(refp) + if err != nil { + return nil, errors.Wrap(err, "could not read ref") + } + + if ref != refraw { + // NOTE(stevvooe): This is fairly catastrophic. Either we have some + // layout corruption or a hash collision for the ref key. + return nil, errors.Wrapf(err, "ref key does not match: %v != %v", ref, refraw) + } + + // slow slow slow!!, send to goroutine or use resumable hashes + fp, err := os.Open(data) + if err != nil { + return nil, err + } + defer fp.Close() + + p := bufPool.Get().([]byte) + defer bufPool.Put(p) + + offset, err = io.CopyBuffer(digester.Hash(), fp, p) + if err != nil { + return nil, err + } + + fi, err := os.Stat(data) + if err != nil { + return nil, err + } + + updatedAt = fi.ModTime() + + if st, ok := fi.Sys().(*syscall.Stat_t); ok { + startedAt = time.Unix(st.Ctim.Sec, st.Ctim.Nsec) + } else { + startedAt = updatedAt + } + } else { + // the ingest is new, we need to setup the target location. + // write the ref to a file for later use + if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil { + return nil, err + } + + startedAt = time.Now() + updatedAt = startedAt + } + + fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + return nil, errors.Wrap(err, "failed to open data file") + } + + return &writer{ + s: s, + fp: fp, + lock: lock, + ref: ref, + path: path, + offset: offset, + digester: digester, + startedAt: startedAt, + updatedAt: updatedAt, + }, nil +} + +// Abort an active transaction keyed by ref. If the ingest is active, it will +// be cancelled. Any resoures associated with the ingest will be cleaned. +func (s *Store) Abort(ref string) error { + root := s.ingestRoot(ref) + if err := os.RemoveAll(root); err != nil { + if os.IsNotExist(err) { + return nil + } + + return err + } + + return nil +} + +func (s *Store) Active() ([]Status, error) { + fp, err := os.Open(filepath.Join(s.root, "ingest")) + if err != nil { + return nil, err + } + + fis, err := fp.Readdir(-1) + if err != nil { + return nil, err + } + + var active []Status + for _, fi := range fis { + p := filepath.Join(s.root, "ingest", fi.Name()) + stat, err := s.status(p) + if err != nil { + if !os.IsNotExist(err) { + return nil, err + } + + // TODO(stevvooe): This is a common error if uploads are being + // completed while making this listing. Need to consider taking a + // lock on the whole store to coordinate this aspect. + // + // Another option is to cleanup downloads asynchronously and + // coordinate this method with the cleanup process. + // + // For now, we just skip them, as they really don't exist. + continue + } + + active = append(active, stat) + } + + return active, nil +} + +func (cs *Store) blobPath(dgst digest.Digest) string { + return filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) +} + +func (s *Store) ingestRoot(ref string) string { + dgst := digest.FromString(ref) + return filepath.Join(s.root, "ingest", dgst.Hex()) +} + +// ingestPaths are returned, including the lockfile. The paths are the following: +// +// - root: entire ingest directory +// - ref: name of the starting ref, must be unique +// - data: file where data is written +// - lock: lock file location +// +func (s *Store) ingestPaths(ref string) (string, string, string, lockfile.Lockfile, error) { + var ( + fp = s.ingestRoot(ref) + rp = filepath.Join(fp, "ref") + lp = filepath.Join(fp, "lock") + dp = filepath.Join(fp, "data") + ) + + lock, err := lockfile.New(lp) + if err != nil { + return "", "", "", "", errors.Wrapf(err, "error creating lockfile %v", lp) + } + + return fp, rp, dp, lock, nil +} diff --git a/content/writer.go b/content/writer.go index a2ff020..df33234 100644 --- a/content/writer.go +++ b/content/writer.go @@ -4,54 +4,55 @@ import ( "log" "os" "path/filepath" + "time" "github.com/nightlyone/lockfile" "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) -// Writer represents a write transaction against the blob store. -type Writer struct { - cs *Store - fp *os.File // opened data file - lock lockfile.Lockfile - path string // path to writer dir - ref string // ref key - offset int64 - digester digest.Digester +// writer represents a write transaction against the blob store. +type writer struct { + s *Store + fp *os.File // opened data file + lock lockfile.Lockfile + path string // path to writer dir + ref string // ref key + offset int64 + digester digest.Digester + startedAt time.Time + updatedAt time.Time } -func (cw *Writer) Ref() string { - return cw.ref -} - -// Size returns the current size written. -// -// Cannot be called concurrently with `Write`. If you need need concurrent -// status, query it with `Store.Stat`. -func (cw *Writer) Size() int64 { - return cw.offset +func (w *writer) Status() (Status, error) { + return Status{ + Ref: w.ref, + Offset: w.offset, + StartedAt: w.startedAt, + UpdatedAt: w.updatedAt, + }, nil } // Digest returns the current digest of the content, up to the current write. // // Cannot be called concurrently with `Write`. -func (cw *Writer) Digest() digest.Digest { - return cw.digester.Digest() +func (w *writer) Digest() digest.Digest { + return w.digester.Digest() } // Write p to the transaction. // // Note that writes are unbuffered to the backing file. When writing, it is // recommended to wrap in a bufio.Writer or, preferably, use io.CopyBuffer. -func (cw *Writer) Write(p []byte) (n int, err error) { - n, err = cw.fp.Write(p) - cw.digester.Hash().Write(p[:n]) - cw.offset += int64(len(p)) +func (w *writer) Write(p []byte) (n int, err error) { + n, err = w.fp.Write(p) + w.digester.Hash().Write(p[:n]) + w.offset += int64(len(p)) + w.updatedAt = time.Now() return n, err } -func (cw *Writer) Commit(size int64, expected digest.Digest) error { +func (cw *writer) Commit(size int64, expected digest.Digest) error { if err := cw.fp.Sync(); err != nil { return errors.Wrap(err, "sync failed") } @@ -85,7 +86,7 @@ func (cw *Writer) Commit(size int64, expected digest.Digest) error { var ( ingest = filepath.Join(cw.path, "data") - target = cw.cs.blobPath(dgst) + target = cw.s.blobPath(dgst) ) // make sure parent directories of blob exist @@ -118,7 +119,7 @@ func (cw *Writer) Commit(size int64, expected digest.Digest) error { // // To abandon a transaction completely, first call close then `Store.Remove` to // clean up the associated resources. -func (cw *Writer) Close() (err error) { +func (cw *writer) Close() (err error) { if err := unlock(cw.lock); err != nil { log.Printf("unlock failed: %v", err) }