diff --git a/cmd/dist/active.go b/cmd/dist/active.go index 12be2d3..24b39a6 100644 --- a/cmd/dist/active.go +++ b/cmd/dist/active.go @@ -3,11 +3,9 @@ package main import ( "fmt" "os" - "path/filepath" "text/tabwriter" "time" - "github.com/docker/containerd/content" units "github.com/docker/go-units" "github.com/urfave/cli" ) @@ -26,24 +24,11 @@ var activeCommand = cli.Command{ cli.StringFlag{ Name: "root", Usage: "path to content store root", - Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content + Value: "/tmp/content", // TODO(stevvooe): for now, just use the PWD/.content }, }, Action: func(context *cli.Context) error { - var ( - // ctx = contextpkg.Background() - root = context.String("root") - ) - - if !filepath.IsAbs(root) { - var err error - root, err = filepath.Abs(root) - if err != nil { - return err - } - } - - cs, err := content.Open(root) + cs, err := resolveContentStore(context) if err != nil { return err } @@ -58,8 +43,8 @@ var activeCommand = cli.Command{ for _, active := range active { fmt.Fprintf(tw, "%s\t%s\t%s\n", active.Ref, - units.HumanSize(float64(active.Size)), - units.HumanDuration(time.Since(active.ModTime))) + units.HumanSize(float64(active.Offset)), + units.HumanDuration(time.Since(active.StartedAt))) } tw.Flush() diff --git a/cmd/dist/common.go b/cmd/dist/common.go new file mode 100644 index 0000000..404aa76 --- /dev/null +++ b/cmd/dist/common.go @@ -0,0 +1,34 @@ +package main + +import ( + "net" + "path/filepath" + "time" + + "github.com/docker/containerd/content" + "github.com/urfave/cli" + "google.golang.org/grpc" +) + +func resolveContentStore(context *cli.Context) (*content.Store, error) { + root := context.GlobalString("root") + if !filepath.IsAbs(root) { + var err error + root, err = filepath.Abs(root) + if err != nil { + return nil, err + } + } + return content.NewStore(root) +} + +func connectGRPC(context *cli.Context) (*grpc.ClientConn, error) { + socket := context.GlobalString("socket") + return grpc.Dial(socket, + grpc.WithBlock(), + grpc.WithInsecure(), + grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", socket, timeout) + }), + ) +} diff --git a/cmd/dist/delete.go b/cmd/dist/delete.go index 9cce3b5..6657286 100644 --- a/cmd/dist/delete.go +++ b/cmd/dist/delete.go @@ -3,9 +3,7 @@ package main import ( contextpkg "context" "fmt" - "path/filepath" - "github.com/docker/containerd/content" "github.com/docker/containerd/log" digest "github.com/opencontainers/go-digest" "github.com/urfave/cli" @@ -18,30 +16,15 @@ var deleteCommand = cli.Command{ ArgsUsage: "[flags] [, ...]", Description: `Delete one or more blobs permanently. Successfully deleted blobs are printed to stdout.`, - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "root", - Usage: "path to content store root", - Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content - }, - }, + Flags: []cli.Flag{}, Action: func(context *cli.Context) error { var ( ctx = contextpkg.Background() - root = context.String("root") args = []string(context.Args()) exitError error ) - if !filepath.IsAbs(root) { - var err error - root, err = filepath.Abs(root) - if err != nil { - return err - } - } - - cs, err := content.Open(root) + cs, err := resolveContentStore(context) if err != nil { return err } diff --git a/cmd/dist/get.go b/cmd/dist/get.go new file mode 100644 index 0000000..bf35933 --- /dev/null +++ b/cmd/dist/get.go @@ -0,0 +1,39 @@ +package main + +import ( + "io" + "os" + + digest "github.com/opencontainers/go-digest" + "github.com/urfave/cli" +) + +var getCommand = cli.Command{ + Name: "get", + Usage: "get the data for an object", + ArgsUsage: "[flags] [, ...]", + Description: `Display the paths to one or more blobs. + +Output paths can be used to directly access blobs on disk.`, + Flags: []cli.Flag{}, + Action: func(context *cli.Context) error { + cs, err := resolveContentStore(context) + if err != nil { + return err + } + + dgst, err := digest.Parse(context.Args().First()) + if err != nil { + return err + } + + rc, err := cs.Open(dgst) + if err != nil { + return err + } + defer rc.Close() + + _, err = io.Copy(os.Stdout, rc) + return err + }, +} diff --git a/cmd/dist/ingest.go b/cmd/dist/ingest.go index 206c186..e3353cc 100644 --- a/cmd/dist/ingest.go +++ b/cmd/dist/ingest.go @@ -4,8 +4,6 @@ import ( contextpkg "context" "fmt" "os" - "path/filepath" - "strings" "github.com/docker/containerd/content" "github.com/opencontainers/go-digest" @@ -18,17 +16,6 @@ var ingestCommand = cli.Command{ ArgsUsage: "[flags] ", Description: `Ingest objects into the local content store.`, Flags: []cli.Flag{ - cli.DurationFlag{ - Name: "timeout", - Usage: "total timeout for fetch", - EnvVar: "CONTAINERD_FETCH_TIMEOUT", - }, - cli.StringFlag{ - Name: "path, p", - Usage: "path to content store", - Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content - EnvVar: "CONTAINERD_DIST_CONTENT_STORE", - }, cli.Int64Flag{ Name: "expected-size", Usage: "validate against provided size", @@ -40,57 +27,32 @@ var ingestCommand = cli.Command{ }, Action: func(context *cli.Context) error { var ( - ctx = contextpkg.Background() - timeout = context.Duration("timeout") - root = context.String("path") + ctx = background + cancel func() ref = context.Args().First() expectedSize = context.Int64("expected-size") expectedDigest = digest.Digest(context.String("expected-digest")) ) - if timeout > 0 { - var cancel func() - ctx, cancel = contextpkg.WithTimeout(ctx, timeout) - defer cancel() - } + ctx, cancel = contextpkg.WithCancel(ctx) + defer cancel() if err := expectedDigest.Validate(); expectedDigest != "" && err != nil { return err } - if !filepath.IsAbs(root) { - var err error - root, err = filepath.Abs(root) - if err != nil { - return err - } - } - - cs, err := content.Open(root) + cs, err := resolveContentStore(context) if err != nil { return err } - if expectedDigest != "" { - if ok, err := cs.Exists(expectedDigest); err != nil { - return err - } else if ok { - fmt.Fprintf(os.Stderr, "content with digest %v already exists\n", expectedDigest) - return nil - } - } - if ref == "" { - if expectedDigest == "" { - return fmt.Errorf("must specify a transaction reference or expected digest") - } - - ref = strings.Replace(expectedDigest.String(), ":", "-", -1) + return fmt.Errorf("must specify a transaction reference") } // TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect // all data to be written in a single invocation. Allow multiple writes // to the same transaction key followed by a commit. - return content.WriteBlob(cs, os.Stdin, ref, expectedSize, expectedDigest) + return content.WriteBlob(ctx, cs, os.Stdin, ref, expectedSize, expectedDigest) }, } diff --git a/cmd/dist/list.go b/cmd/dist/list.go index 11b5894..2ba1d88 100644 --- a/cmd/dist/list.go +++ b/cmd/dist/list.go @@ -4,7 +4,6 @@ import ( contextpkg "context" "fmt" "os" - "path/filepath" "text/tabwriter" "time" @@ -22,11 +21,6 @@ var listCommand = cli.Command{ ArgsUsage: "[flags] [, ...]", Description: `List blobs in the content store.`, Flags: []cli.Flag{ - cli.StringFlag{ - Name: "root", - Usage: "path to content store root", - Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content - }, cli.BoolFlag{ Name: "quiet, q", Usage: "print only the blob digest", @@ -35,20 +29,11 @@ var listCommand = cli.Command{ Action: func(context *cli.Context) error { var ( ctx = contextpkg.Background() - root = context.String("root") quiet = context.Bool("quiet") args = []string(context.Args()) ) - if !filepath.IsAbs(root) { - var err error - root, err = filepath.Abs(root) - if err != nil { - return err - } - } - - cs, err := content.Open(root) + cs, err := resolveContentStore(context) if err != nil { return err } diff --git a/cmd/dist/main.go b/cmd/dist/main.go index 5a19608..de4cf22 100644 --- a/cmd/dist/main.go +++ b/cmd/dist/main.go @@ -1,6 +1,7 @@ package main import ( + contextpkg "context" "fmt" "os" @@ -9,6 +10,10 @@ import ( "github.com/urfave/cli" ) +var ( + background = contextpkg.Background() +) + func main() { app := cli.NewApp() app.Name = "dist" @@ -27,20 +32,38 @@ distribution tool Name: "debug", Usage: "enable debug output in logs", }, + cli.DurationFlag{ + Name: "timeout", + Usage: "total timeout for fetch", + EnvVar: "CONTAINERD_FETCH_TIMEOUT", + }, + cli.StringFlag{ + Name: "root", + Usage: "path to content store root", + Value: "/tmp/content", // TODO(stevvooe): for now, just use the PWD/.content + }, } app.Commands = []cli.Command{ fetchCommand, ingestCommand, activeCommand, - pathCommand, + getCommand, deleteCommand, listCommand, applyCommand, } app.Before = func(context *cli.Context) error { - if context.GlobalBool("debug") { + var ( + debug = context.GlobalBool("debug") + timeout = context.GlobalDuration("timeout") + ) + if debug { logrus.SetLevel(logrus.DebugLevel) } + + if timeout > 0 { + background, _ = contextpkg.WithTimeout(background, timeout) + } return nil } if err := app.Run(os.Args); err != nil { diff --git a/cmd/dist/path.go b/cmd/dist/path.go deleted file mode 100644 index 02215a8..0000000 --- a/cmd/dist/path.go +++ /dev/null @@ -1,89 +0,0 @@ -package main - -import ( - contextpkg "context" - "fmt" - "path/filepath" - - "github.com/docker/containerd/content" - "github.com/docker/containerd/log" - digest "github.com/opencontainers/go-digest" - "github.com/urfave/cli" -) - -var pathCommand = cli.Command{ - Name: "path", - Usage: "print the path to one or more blobs", - ArgsUsage: "[flags] [, ...]", - Description: `Display the paths to one or more blobs. - -Output paths can be used to directly access blobs on disk.`, - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "root", - Usage: "path to content store root", - Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content - EnvVar: "CONTAINERD_DIST_CONTENT_STORE", - }, - cli.BoolFlag{ - Name: "quiet, q", - Usage: "elide digests in output", - }, - }, - Action: func(context *cli.Context) error { - var ( - ctx = contextpkg.Background() - root = context.String("root") - args = []string(context.Args()) - quiet = context.Bool("quiet") - exitError error - ) - - if !filepath.IsAbs(root) { - var err error - root, err = filepath.Abs(root) - if err != nil { - return err - } - } - - cs, err := content.Open(root) - if err != nil { - return err - } - - // TODO(stevvooe): Take the set of paths from stdin. - - if len(args) < 1 { - return fmt.Errorf("please specify a blob digest") - } - - for _, arg := range args { - dgst, err := digest.Parse(arg) - if err != nil { - log.G(ctx).WithError(err).Errorf("parsing %q as digest failed", arg) - if exitError == nil { - exitError = err - } - continue - } - - p, err := cs.GetPath(dgst) - if err != nil { - log.G(ctx).WithError(err).Errorf("getting path for %q failed", dgst) - if exitError == nil { - exitError = err - } - continue - } - - if !quiet { - fmt.Println(dgst, p) - } else { - fmt.Println(p) - } - } - - return exitError - }, -} diff --git a/content/content.go b/content/content.go index d7fa750..b63ed1d 100644 --- a/content/content.go +++ b/content/content.go @@ -1,364 +1,53 @@ package content import ( + "context" "io" - "io/ioutil" - "os" - "path/filepath" "sync" "time" - "github.com/docker/containerd/log" - "github.com/nightlyone/lockfile" "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) var ( - ErrBlobNotFound = errors.New("blob not found") + errNotFound = errors.New("content: not found") bufPool = sync.Pool{ New: func() interface{} { - return make([]byte, 32<<10) + return make([]byte, 1<<20) }, } ) -// Store is digest-keyed store for content. All data written into the store is -// stored under a verifiable digest. -// -// Store can generally support multi-reader, single-writer ingest of data, -// including resumable ingest. -type Store struct { - root string +type Info struct { + Digest digest.Digest + Size int64 + CommittedAt time.Time } -func Open(root string) (*Store, error) { - if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil && !os.IsExist(err) { - return nil, err - } - - return &Store{ - root: root, - }, nil +type Provider interface { + Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) } type Status struct { - Ref string - Size int64 - ModTime time.Time - Meta interface{} + Ref string + Offset int64 + StartedAt time.Time + UpdatedAt time.Time } -func (cs *Store) Exists(dgst digest.Digest) (bool, error) { - if _, err := os.Stat(cs.blobPath(dgst)); err != nil { - if !os.IsNotExist(err) { - return false, err - } - - return false, nil - } - - return true, nil +type Writer interface { + io.WriteCloser + Status() (Status, error) + Digest() digest.Digest + Commit(size int64, expected digest.Digest) error } -func (cs *Store) GetPath(dgst digest.Digest) (string, error) { - p := cs.blobPath(dgst) - if _, err := os.Stat(p); err != nil { - if os.IsNotExist(err) { - return "", ErrBlobNotFound - } - - return "", err - } - - return p, nil +type Ingester interface { + Writer(ctx context.Context, ref string) (Writer, error) } -// Delete removes a blob by its digest. -// -// While this is safe to do concurrently, safe exist-removal logic must hold -// some global lock on the store. -func (cs *Store) Delete(dgst digest.Digest) error { - if err := os.RemoveAll(cs.blobPath(dgst)); err != nil { - if !os.IsNotExist(err) { - return err - } - - return nil - } - - return nil -} - -func (cs *Store) blobPath(dgst digest.Digest) string { - return filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) -} - -// Stat returns the current status of a blob by the ingest ref. -func (cs *Store) Stat(ref string) (Status, error) { - dp := filepath.Join(cs.ingestRoot(ref), "data") - return cs.stat(dp) -} - -// stat works like stat above except uses the path to the ingest. -func (cs *Store) stat(ingestPath string) (Status, error) { - dp := filepath.Join(ingestPath, "data") - dfi, err := os.Stat(dp) - if err != nil { - return Status{}, err - } - - ref, err := readFileString(filepath.Join(ingestPath, "ref")) - if err != nil { - return Status{}, err - } - - return Status{ - Ref: ref, - Size: dfi.Size(), - ModTime: dfi.ModTime(), - }, nil -} - -func (cs *Store) Active() ([]Status, error) { - ip := filepath.Join(cs.root, "ingest") - - fp, err := os.Open(ip) - if err != nil { - return nil, err - } - - fis, err := fp.Readdir(-1) - if err != nil { - return nil, err - } - - var active []Status - for _, fi := range fis { - p := filepath.Join(ip, fi.Name()) - stat, err := cs.stat(p) - if err != nil { - if !os.IsNotExist(err) { - return nil, err - } - - // TODO(stevvooe): This is a common error if uploads are being - // completed while making this listing. Need to consider taking a - // lock on the whole store to coordinate this aspect. - // - // Another option is to cleanup downloads asynchronously and - // coordinate this method with the cleanup process. - // - // For now, we just skip them, as they really don't exist. - continue - } - - active = append(active, stat) - } - - return active, nil -} - -// TODO(stevvooe): Allow querying the set of blobs in the blob store. - -// WalkFunc defines the callback for a blob walk. -// -// TODO(stevvooe): Remove the file info. Just need size and modtime. Perhaps, -// not a huge deal, considering we have a path, but let's not just let this one -// go without scrutiny. -type WalkFunc func(path string, fi os.FileInfo, dgst digest.Digest) error - -func (cs *Store) Walk(fn WalkFunc) error { - root := filepath.Join(cs.root, "blobs") - var alg digest.Algorithm - return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error { - if err != nil { - return err - } - if !fi.IsDir() && !alg.Available() { - return nil - } - - // TODO(stevvooe): There are few more cases with subdirs that should be - // handled in case the layout gets corrupted. This isn't strict enough - // an may spew bad data. - - if path == root { - return nil - } - if filepath.Dir(path) == root { - alg = digest.Algorithm(filepath.Base(path)) - - if !alg.Available() { - alg = "" - return filepath.SkipDir - } - - // descending into a hash directory - return nil - } - - dgst := digest.NewDigestFromHex(alg.String(), filepath.Base(path)) - if err := dgst.Validate(); err != nil { - // log error but don't report - log.L.WithError(err).WithField("path", path).Error("invalid digest for blob path") - // if we see this, it could mean some sort of corruption of the - // store or extra paths not expected previously. - } - - return fn(path, fi, dgst) - }) -} - -// Begin starts a new write transaction against the blob store. -// -// The argument `ref` is used to identify the transaction. It must be a valid -// path component, meaning it has no `/` characters and no `:` (we'll ban -// others fs characters, as needed). -func (cs *Store) Begin(ref string) (*Writer, error) { - path, refp, data, lock, err := cs.ingestPaths(ref) - if err != nil { - return nil, err - } - - // use single path mkdir for this to ensure ref is only base path, in - // addition to validation above. - if err := os.Mkdir(path, 0755); err != nil { - return nil, err - } - - if err := tryLock(lock); err != nil { - return nil, err - } - - // write the ref to a file for later use - if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil { - return nil, err - } - - fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0666) - if err != nil { - return nil, errors.Wrap(err, "failed to open data file") - } - defer fp.Close() - - // re-open the file in append mode - fp, err = os.OpenFile(data, os.O_WRONLY|os.O_APPEND, 0666) - if err != nil { - return nil, errors.Wrap(err, "error opening for append") - } - - return &Writer{ - cs: cs, - fp: fp, - lock: lock, - path: path, - digester: digest.Canonical.Digester(), - }, nil -} - -func (cs *Store) Resume(ref string) (*Writer, error) { - path, refp, data, lock, err := cs.ingestPaths(ref) - if err != nil { - return nil, err - } - - if err := tryLock(lock); err != nil { - return nil, err - } - - refraw, err := readFileString(refp) - if err != nil { - return nil, errors.Wrap(err, "could not read ref") - } - - if ref != refraw { - // NOTE(stevvooe): This is fairly catastrophic. Either we have some - // layout corruption or a hash collision for the ref key. - return nil, errors.Wrapf(err, "ref key does not match: %v != %v", ref, refraw) - } - - digester := digest.Canonical.Digester() - - // slow slow slow!!, send to goroutine or use resumable hashes - fp, err := os.Open(data) - if err != nil { - return nil, err - } - defer fp.Close() - - p := bufPool.Get().([]byte) - defer bufPool.Put(p) - - offset, err := io.CopyBuffer(digester.Hash(), fp, p) - if err != nil { - return nil, err - } - - fp1, err := os.OpenFile(data, os.O_WRONLY|os.O_APPEND, 0666) - if err != nil { - if os.IsNotExist(err) { - return nil, errors.Wrap(err, "ingest does not exist") - } - - return nil, errors.Wrap(err, "error opening for append") - } - - return &Writer{ - cs: cs, - fp: fp1, - lock: lock, - ref: ref, - path: path, - offset: offset, - digester: digester, - }, nil -} - -// Remove an active transaction keyed by ref. -func (cs *Store) Remove(ref string) error { - root := cs.ingestRoot(ref) - if err := os.RemoveAll(root); err != nil { - if os.IsNotExist(err) { - return nil - } - - return err - } - - return nil -} - -func (cs *Store) ingestRoot(ref string) string { - dgst := digest.FromString(ref) - return filepath.Join(cs.root, "ingest", dgst.Hex()) -} - -// ingestPaths are returned, including the lockfile. The paths are the following: -// -// - root: entire ingest directory -// - ref: name of the starting ref, must be unique -// - data: file where data is written -// - lock: lock file location -// -func (cs *Store) ingestPaths(ref string) (string, string, string, lockfile.Lockfile, error) { - var ( - fp = cs.ingestRoot(ref) - rp = filepath.Join(fp, "ref") - lp = filepath.Join(fp, "lock") - dp = filepath.Join(fp, "data") - ) - - lock, err := lockfile.New(lp) - if err != nil { - return "", "", "", "", errors.Wrapf(err, "error creating lockfile %v", lp) - } - - return fp, rp, dp, lock, nil -} - -func readFileString(path string) (string, error) { - p, err := ioutil.ReadFile(path) - return string(p), err +func IsNotFound(err error) bool { + return errors.Cause(err) == errNotFound } diff --git a/content/content_test.go b/content/content_test.go index a469630..b831a54 100644 --- a/content/content_test.go +++ b/content/content_test.go @@ -3,6 +3,7 @@ package content import ( "bufio" "bytes" + "context" "crypto/rand" _ "crypto/sha256" // required for digest package "fmt" @@ -21,7 +22,7 @@ import ( ) func TestContentWriter(t *testing.T) { - tmpdir, cs, cleanup := contentStoreEnv(t) + ctx, tmpdir, cs, cleanup := contentStoreEnv(t) defer cleanup() defer testutil.DumpDir(t, tmpdir) @@ -29,7 +30,7 @@ func TestContentWriter(t *testing.T) { t.Fatal("ingest dir should be created", err) } - cw, err := cs.Begin("myref") + cw, err := cs.Writer(ctx, "myref") if err != nil { t.Fatal(err) } @@ -37,20 +38,14 @@ func TestContentWriter(t *testing.T) { t.Fatal(err) } - // try to begin again with same ref, should fail - cw, err = cs.Begin("myref") - if err == nil { - t.Fatal("expected error on repeated begin") - } - // reopen, so we can test things - cw, err = cs.Resume("myref") + cw, err = cs.Writer(ctx, "myref") if err != nil { t.Fatal(err) } // make sure that second resume also fails - if _, err = cs.Resume("myref"); err == nil { + if _, err = cs.Writer(ctx, "myref"); err == nil { // TODO(stevvooe): This also works across processes. Need to find a way // to test that, as well. t.Fatal("no error on second resume") @@ -64,14 +59,14 @@ func TestContentWriter(t *testing.T) { // clear out the time and meta cause we don't care for this test for i := range ingestions { - ingestions[i].Meta = nil - ingestions[i].ModTime = time.Time{} + ingestions[i].UpdatedAt = time.Time{} + ingestions[i].StartedAt = time.Time{} } if !reflect.DeepEqual(ingestions, []Status{ { - Ref: "myref", - Size: 0, + Ref: "myref", + Offset: 0, }, }) { t.Fatalf("unexpected ingestion set: %v", ingestions) @@ -93,7 +88,7 @@ func TestContentWriter(t *testing.T) { t.Fatal(err) } - cw, err = cs.Begin("aref") + cw, err = cs.Writer(ctx, "aref") if err != nil { t.Fatal(err) } @@ -119,7 +114,7 @@ func TestContentWriter(t *testing.T) { } func TestWalkBlobs(t *testing.T) { - _, cs, cleanup := contentStoreEnv(t) + ctx, _, cs, cleanup := contentStoreEnv(t) defer cleanup() const ( @@ -128,7 +123,7 @@ func TestWalkBlobs(t *testing.T) { ) var ( - blobs = populateBlobStore(t, cs, nblobs, maxsize) + blobs = populateBlobStore(t, ctx, cs, nblobs, maxsize) expected = map[digest.Digest]struct{}{} found = map[digest.Digest]struct{}{} ) @@ -158,7 +153,7 @@ func TestWalkBlobs(t *testing.T) { // for blobs. This seems to be due to the number of syscalls and file io we do // coordinating the ingestion. func BenchmarkIngests(b *testing.B) { - _, cs, cleanup := contentStoreEnv(b) + ctx, _, cs, cleanup := contentStoreEnv(b) defer cleanup() for _, size := range []int64{ @@ -181,7 +176,7 @@ func BenchmarkIngests(b *testing.B) { b.StartTimer() for dgst, p := range blobs { - checkWrite(b, cs, dgst, p) + checkWrite(b, ctx, cs, dgst, p) } }) } @@ -208,17 +203,17 @@ func generateBlobs(t checker, nblobs, maxsize int64) map[digest.Digest][]byte { return blobs } -func populateBlobStore(t checker, cs *Store, nblobs, maxsize int64) map[digest.Digest][]byte { +func populateBlobStore(t checker, ctx context.Context, cs *Store, nblobs, maxsize int64) map[digest.Digest][]byte { blobs := generateBlobs(t, nblobs, maxsize) for dgst, p := range blobs { - checkWrite(t, cs, dgst, p) + checkWrite(t, ctx, cs, dgst, p) } return blobs } -func contentStoreEnv(t checker) (string, *Store, func()) { +func contentStoreEnv(t checker) (context.Context, string, *Store, func()) { pc, _, _, ok := runtime.Caller(1) if !ok { t.Fatal("failed to resolve caller") @@ -230,13 +225,15 @@ func contentStoreEnv(t checker) (string, *Store, func()) { t.Fatal(err) } - cs, err := Open(tmpdir) + cs, err := NewStore(tmpdir) if err != nil { os.RemoveAll(tmpdir) t.Fatal(err) } - return tmpdir, cs, func() { + ctx, cancel := context.WithCancel(context.Background()) + return ctx, tmpdir, cs, func() { + cancel() os.RemoveAll(tmpdir) } } @@ -253,10 +250,8 @@ func checkCopy(t checker, size int64, dst io.Writer, src io.Reader) { } func checkBlobPath(t *testing.T, cs *Store, dgst digest.Digest) string { - path, err := cs.GetPath(dgst) - if err != nil { - t.Fatal(err, dgst) - } + path := cs.blobPath(dgst) + if path != filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) { t.Fatalf("unexpected path: %q", path) } @@ -273,8 +268,8 @@ func checkBlobPath(t *testing.T, cs *Store, dgst digest.Digest) string { return path } -func checkWrite(t checker, cs *Store, dgst digest.Digest, p []byte) digest.Digest { - if err := WriteBlob(cs, bytes.NewReader(p), dgst.String(), int64(len(p)), dgst); err != nil { +func checkWrite(t checker, ctx context.Context, cs *Store, dgst digest.Digest, p []byte) digest.Digest { + if err := WriteBlob(ctx, cs, bytes.NewReader(p), dgst.String(), int64(len(p)), dgst); err != nil { t.Fatal(err) } diff --git a/content/helpers.go b/content/helpers.go index 37cb71e..4209350 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -1,37 +1,14 @@ package content import ( + "context" "io" - "os" + "io/ioutil" "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) -// Provider gives access to blob content by paths. -// -// Typically, this is implemented by `*Store`. -type Provider interface { - GetPath(dgst digest.Digest) (string, error) -} - -// OpenBlob opens the blob for reading identified by dgst. -// -// The opened blob may also implement seek. Callers can detect with io.Seeker. -func OpenBlob(provider Provider, dgst digest.Digest) (io.ReadCloser, error) { - path, err := provider.GetPath(dgst) - if err != nil { - return nil, err - } - - fp, err := os.Open(path) - return fp, err -} - -type Ingester interface { - Begin(key string) (*Writer, error) -} - // WriteBlob writes data with the expected digest into the content store. If // expected already exists, the method returns immediately and the reader will // not be consumed. @@ -39,11 +16,23 @@ type Ingester interface { // This is useful when the digest and size are known beforehand. // // Copy is buffered, so no need to wrap reader in buffered io. -func WriteBlob(cs Ingester, r io.Reader, ref string, size int64, expected digest.Digest) error { - cw, err := cs.Begin(ref) +func WriteBlob(ctx context.Context, cs Ingester, r io.Reader, ref string, size int64, expected digest.Digest) error { + cw, err := cs.Writer(ctx, ref) if err != nil { return err } + + ws, err := cw.Status() + if err != nil { + return err + } + + if ws.Offset > 0 { + // Arbitrary limitation for now. We can detect io.Seeker on r and + // resume. + return errors.Errorf("cannot resume already started write") + } + buf := bufPool.Get().([]byte) defer bufPool.Put(buf) @@ -62,3 +51,8 @@ func WriteBlob(cs Ingester, r io.Reader, ref string, size int64, expected digest return nil } + +func readFileString(path string) (string, error) { + p, err := ioutil.ReadFile(path) + return string(p), err +} diff --git a/content/locks.go b/content/locks.go index dd2cae9..400793d 100644 --- a/content/locks.go +++ b/content/locks.go @@ -1,10 +1,10 @@ package content import ( - "errors" "sync" "github.com/nightlyone/lockfile" + "github.com/pkg/errors" ) // In addition to providing inter-process locks for content ingest, we also @@ -16,6 +16,8 @@ import ( // error reporting. var ( + errLocked = errors.New("key is locked") + // locks lets us lock in process, as well as output of process. locks = map[lockfile.Lockfile]struct{}{} locksMu sync.Mutex @@ -26,11 +28,15 @@ func tryLock(lock lockfile.Lockfile) error { defer locksMu.Unlock() if _, ok := locks[lock]; ok { - return errors.New("file in use") + return errLocked } if err := lock.TryLock(); err != nil { - return err + if errors.Cause(err) == lockfile.ErrBusy { + return errLocked + } + + return errors.Wrapf(err, "lock.TryLock() encountered an error") } locks[lock] = struct{}{} diff --git a/content/store.go b/content/store.go new file mode 100644 index 0000000..dfe1993 --- /dev/null +++ b/content/store.go @@ -0,0 +1,355 @@ +package content + +import ( + "context" + "io" + "io/ioutil" + "os" + "path/filepath" + "syscall" + "time" + + "github.com/docker/containerd/log" + "github.com/nightlyone/lockfile" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" +) + +// Store is digest-keyed store for content. All data written into the store is +// stored under a verifiable digest. +// +// Store can generally support multi-reader, single-writer ingest of data, +// including resumable ingest. +type Store struct { + root string +} + +func NewStore(root string) (*Store, error) { + if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil && !os.IsExist(err) { + return nil, err + } + + return &Store{ + root: root, + }, nil +} + +func (s *Store) Info(dgst digest.Digest) (Info, error) { + p := s.blobPath(dgst) + fi, err := os.Stat(p) + if err != nil { + if os.IsNotExist(err) { + err = errNotFound + } + + return Info{}, err + } + + return Info{ + Digest: dgst, + Size: fi.Size(), + CommittedAt: fi.ModTime(), + }, nil +} + +// Open returns an io.ReadCloser for the blob. +// +// TODO(stevvooe): This would work much better as an io.ReaderAt in practice. +// Right now, we are doing type assertion to tease that out, but it won't scale +// well. +func (s *Store) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) { + fp, err := os.Open(s.blobPath(dgst)) + if err != nil { + if os.IsNotExist(err) { + err = errNotFound + } + return nil, err + } + + return fp, nil +} + +// Delete removes a blob by its digest. +// +// While this is safe to do concurrently, safe exist-removal logic must hold +// some global lock on the store. +func (cs *Store) Delete(dgst digest.Digest) error { + if err := os.RemoveAll(cs.blobPath(dgst)); err != nil { + if !os.IsNotExist(err) { + return err + } + + return nil + } + + return nil +} + +// TODO(stevvooe): Allow querying the set of blobs in the blob store. + +// WalkFunc defines the callback for a blob walk. +// +// TODO(stevvooe): Remove the file info. Just need size and modtime. Perhaps, +// not a huge deal, considering we have a path, but let's not just let this one +// go without scrutiny. +type WalkFunc func(path string, fi os.FileInfo, dgst digest.Digest) error + +func (cs *Store) Walk(fn WalkFunc) error { + root := filepath.Join(cs.root, "blobs") + var alg digest.Algorithm + return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error { + if err != nil { + return err + } + if !fi.IsDir() && !alg.Available() { + return nil + } + + // TODO(stevvooe): There are few more cases with subdirs that should be + // handled in case the layout gets corrupted. This isn't strict enough + // an may spew bad data. + + if path == root { + return nil + } + if filepath.Dir(path) == root { + alg = digest.Algorithm(filepath.Base(path)) + + if !alg.Available() { + alg = "" + return filepath.SkipDir + } + + // descending into a hash directory + return nil + } + + dgst := digest.NewDigestFromHex(alg.String(), filepath.Base(path)) + if err := dgst.Validate(); err != nil { + // log error but don't report + log.L.WithError(err).WithField("path", path).Error("invalid digest for blob path") + // if we see this, it could mean some sort of corruption of the + // store or extra paths not expected previously. + } + + return fn(path, fi, dgst) + }) +} + +// Stat returns the current status of a blob by the ingest ref. +func (s *Store) Status(ref string) (Status, error) { + dp := filepath.Join(s.ingestRoot(ref), "data") + return s.status(dp) +} + +// stat works like stat above except uses the path to the ingest. +func (s *Store) status(ingestPath string) (Status, error) { + dp := filepath.Join(ingestPath, "data") + fi, err := os.Stat(dp) + if err != nil { + return Status{}, err + } + + ref, err := readFileString(filepath.Join(ingestPath, "ref")) + if err != nil { + return Status{}, err + } + + var startedAt time.Time + if st, ok := fi.Sys().(*syscall.Stat_t); ok { + startedAt = time.Unix(st.Ctim.Sec, st.Ctim.Nsec) + } else { + startedAt = fi.ModTime() + } + + return Status{ + Ref: ref, + Offset: fi.Size(), + UpdatedAt: fi.ModTime(), + StartedAt: startedAt, + }, nil +} + +// Writer begins or resumes the active writer identified by ref. If the writer +// is already in use, an error is returned. Only one writer may be in use per +// ref at a time. +// +// The argument `ref` is used to uniquely identify a long-lived writer transaction. +func (s *Store) Writer(ctx context.Context, ref string) (Writer, error) { + path, refp, data, lock, err := s.ingestPaths(ref) + if err != nil { + return nil, err + } + + if err := tryLock(lock); err != nil { + if !os.IsNotExist(errors.Cause(err)) { + return nil, errors.Wrapf(err, "locking %v failed", ref) + } + + // if it doesn't exist, we'll make it so below! + } + + var ( + digester = digest.Canonical.Digester() + offset int64 + startedAt time.Time + updatedAt time.Time + ) + + // ensure that the ingest path has been created. + if err := os.Mkdir(path, 0755); err != nil { + if !os.IsExist(err) { + return nil, err + } + + // validate that we have no collision for the ref. + refraw, err := readFileString(refp) + if err != nil { + return nil, errors.Wrap(err, "could not read ref") + } + + if ref != refraw { + // NOTE(stevvooe): This is fairly catastrophic. Either we have some + // layout corruption or a hash collision for the ref key. + return nil, errors.Wrapf(err, "ref key does not match: %v != %v", ref, refraw) + } + + // slow slow slow!!, send to goroutine or use resumable hashes + fp, err := os.Open(data) + if err != nil { + return nil, err + } + defer fp.Close() + + p := bufPool.Get().([]byte) + defer bufPool.Put(p) + + offset, err = io.CopyBuffer(digester.Hash(), fp, p) + if err != nil { + return nil, err + } + + fi, err := os.Stat(data) + if err != nil { + return nil, err + } + + updatedAt = fi.ModTime() + + if st, ok := fi.Sys().(*syscall.Stat_t); ok { + startedAt = time.Unix(st.Ctim.Sec, st.Ctim.Nsec) + } else { + startedAt = updatedAt + } + } else { + // the ingest is new, we need to setup the target location. + // write the ref to a file for later use + if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil { + return nil, err + } + + startedAt = time.Now() + updatedAt = startedAt + } + + fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + return nil, errors.Wrap(err, "failed to open data file") + } + + return &writer{ + s: s, + fp: fp, + lock: lock, + ref: ref, + path: path, + offset: offset, + digester: digester, + startedAt: startedAt, + updatedAt: updatedAt, + }, nil +} + +// Abort an active transaction keyed by ref. If the ingest is active, it will +// be cancelled. Any resoures associated with the ingest will be cleaned. +func (s *Store) Abort(ref string) error { + root := s.ingestRoot(ref) + if err := os.RemoveAll(root); err != nil { + if os.IsNotExist(err) { + return nil + } + + return err + } + + return nil +} + +func (s *Store) Active() ([]Status, error) { + fp, err := os.Open(filepath.Join(s.root, "ingest")) + if err != nil { + return nil, err + } + + fis, err := fp.Readdir(-1) + if err != nil { + return nil, err + } + + var active []Status + for _, fi := range fis { + p := filepath.Join(s.root, "ingest", fi.Name()) + stat, err := s.status(p) + if err != nil { + if !os.IsNotExist(err) { + return nil, err + } + + // TODO(stevvooe): This is a common error if uploads are being + // completed while making this listing. Need to consider taking a + // lock on the whole store to coordinate this aspect. + // + // Another option is to cleanup downloads asynchronously and + // coordinate this method with the cleanup process. + // + // For now, we just skip them, as they really don't exist. + continue + } + + active = append(active, stat) + } + + return active, nil +} + +func (cs *Store) blobPath(dgst digest.Digest) string { + return filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) +} + +func (s *Store) ingestRoot(ref string) string { + dgst := digest.FromString(ref) + return filepath.Join(s.root, "ingest", dgst.Hex()) +} + +// ingestPaths are returned, including the lockfile. The paths are the following: +// +// - root: entire ingest directory +// - ref: name of the starting ref, must be unique +// - data: file where data is written +// - lock: lock file location +// +func (s *Store) ingestPaths(ref string) (string, string, string, lockfile.Lockfile, error) { + var ( + fp = s.ingestRoot(ref) + rp = filepath.Join(fp, "ref") + lp = filepath.Join(fp, "lock") + dp = filepath.Join(fp, "data") + ) + + lock, err := lockfile.New(lp) + if err != nil { + return "", "", "", "", errors.Wrapf(err, "error creating lockfile %v", lp) + } + + return fp, rp, dp, lock, nil +} diff --git a/content/writer.go b/content/writer.go index a2ff020..df33234 100644 --- a/content/writer.go +++ b/content/writer.go @@ -4,54 +4,55 @@ import ( "log" "os" "path/filepath" + "time" "github.com/nightlyone/lockfile" "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) -// Writer represents a write transaction against the blob store. -type Writer struct { - cs *Store - fp *os.File // opened data file - lock lockfile.Lockfile - path string // path to writer dir - ref string // ref key - offset int64 - digester digest.Digester +// writer represents a write transaction against the blob store. +type writer struct { + s *Store + fp *os.File // opened data file + lock lockfile.Lockfile + path string // path to writer dir + ref string // ref key + offset int64 + digester digest.Digester + startedAt time.Time + updatedAt time.Time } -func (cw *Writer) Ref() string { - return cw.ref -} - -// Size returns the current size written. -// -// Cannot be called concurrently with `Write`. If you need need concurrent -// status, query it with `Store.Stat`. -func (cw *Writer) Size() int64 { - return cw.offset +func (w *writer) Status() (Status, error) { + return Status{ + Ref: w.ref, + Offset: w.offset, + StartedAt: w.startedAt, + UpdatedAt: w.updatedAt, + }, nil } // Digest returns the current digest of the content, up to the current write. // // Cannot be called concurrently with `Write`. -func (cw *Writer) Digest() digest.Digest { - return cw.digester.Digest() +func (w *writer) Digest() digest.Digest { + return w.digester.Digest() } // Write p to the transaction. // // Note that writes are unbuffered to the backing file. When writing, it is // recommended to wrap in a bufio.Writer or, preferably, use io.CopyBuffer. -func (cw *Writer) Write(p []byte) (n int, err error) { - n, err = cw.fp.Write(p) - cw.digester.Hash().Write(p[:n]) - cw.offset += int64(len(p)) +func (w *writer) Write(p []byte) (n int, err error) { + n, err = w.fp.Write(p) + w.digester.Hash().Write(p[:n]) + w.offset += int64(len(p)) + w.updatedAt = time.Now() return n, err } -func (cw *Writer) Commit(size int64, expected digest.Digest) error { +func (cw *writer) Commit(size int64, expected digest.Digest) error { if err := cw.fp.Sync(); err != nil { return errors.Wrap(err, "sync failed") } @@ -85,7 +86,7 @@ func (cw *Writer) Commit(size int64, expected digest.Digest) error { var ( ingest = filepath.Join(cw.path, "data") - target = cw.cs.blobPath(dgst) + target = cw.s.blobPath(dgst) ) // make sure parent directories of blob exist @@ -118,7 +119,7 @@ func (cw *Writer) Commit(size int64, expected digest.Digest) error { // // To abandon a transaction completely, first call close then `Store.Remove` to // clean up the associated resources. -func (cw *Writer) Close() (err error) { +func (cw *writer) Close() (err error) { if err := unlock(cw.lock); err != nil { log.Printf("unlock failed: %v", err) }