Merge pull request #472 from stevvooe/expanding-dist-tool

dist: expand functionality of the dist tool
This commit is contained in:
Stephen Day 2017-01-27 10:34:54 -08:00 committed by GitHub
commit 594dca9e31
10 changed files with 532 additions and 36 deletions

68
cmd/dist/active.go vendored Normal file
View file

@ -0,0 +1,68 @@
package main
import (
"fmt"
"os"
"path/filepath"
"text/tabwriter"
"time"
"github.com/docker/containerd/content"
units "github.com/docker/go-units"
"github.com/urfave/cli"
)
var activeCommand = cli.Command{
Name: "active",
Usage: "display active transfers.",
ArgsUsage: "[flags] [<key>, ...]",
Description: `Display the ongoing transfers.`,
Flags: []cli.Flag{
cli.DurationFlag{
Name: "timeout, t",
Usage: "total timeout for fetch",
EnvVar: "CONTAINERD_FETCH_TIMEOUT",
},
cli.StringFlag{
Name: "root",
Usage: "path to content store root",
Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content
},
},
Action: func(context *cli.Context) error {
var (
// ctx = contextpkg.Background()
root = context.String("root")
)
if !filepath.IsAbs(root) {
var err error
root, err = filepath.Abs(root)
if err != nil {
return err
}
}
cs, err := content.Open(root)
if err != nil {
return err
}
active, err := cs.Active()
if err != nil {
return err
}
tw := tabwriter.NewWriter(os.Stdout, 1, 8, 1, '\t', 0)
fmt.Fprintf(tw, "REF\tSIZE\tAGE\n")
for _, active := range active {
fmt.Fprintf(tw, "%s\t%s\t%s\n",
active.Ref,
units.HumanSize(float64(active.Size)),
units.HumanDuration(time.Since(active.ModTime)))
}
tw.Flush()
return nil
},
}

72
cmd/dist/delete.go vendored Normal file
View file

@ -0,0 +1,72 @@
package main
import (
contextpkg "context"
"fmt"
"path/filepath"
"github.com/docker/containerd/content"
"github.com/docker/containerd/log"
digest "github.com/opencontainers/go-digest"
"github.com/urfave/cli"
)
var deleteCommand = cli.Command{
Name: "delete",
Aliases: []string{"del"},
Usage: "permanently delete one or more blobs.",
ArgsUsage: "[flags] [<digest>, ...]",
Description: `Delete one or more blobs permanently. Successfully deleted
blobs are printed to stdout.`,
Flags: []cli.Flag{
cli.StringFlag{
Name: "root",
Usage: "path to content store root",
Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content
},
},
Action: func(context *cli.Context) error {
var (
ctx = contextpkg.Background()
root = context.String("root")
args = []string(context.Args())
exitError error
)
if !filepath.IsAbs(root) {
var err error
root, err = filepath.Abs(root)
if err != nil {
return err
}
}
cs, err := content.Open(root)
if err != nil {
return err
}
for _, arg := range args {
dgst, err := digest.Parse(arg)
if err != nil {
if exitError == nil {
exitError = err
}
log.G(ctx).WithError(err).Errorf("could not delete %v", dgst)
continue
}
if err := cs.Delete(dgst); err != nil {
if exitError == nil {
exitError = err
}
log.G(ctx).WithError(err).Errorf("could not delete %v", dgst)
continue
}
fmt.Println(dgst)
}
return exitError
},
}

96
cmd/dist/ingest.go vendored Normal file
View file

@ -0,0 +1,96 @@
package main
import (
contextpkg "context"
"fmt"
"os"
"path/filepath"
"strings"
"github.com/docker/containerd/content"
"github.com/opencontainers/go-digest"
"github.com/urfave/cli"
)
var ingestCommand = cli.Command{
Name: "ingest",
Usage: "accept content into the store",
ArgsUsage: "[flags] <key>",
Description: `Ingest objects into the local content store.`,
Flags: []cli.Flag{
cli.DurationFlag{
Name: "timeout",
Usage: "total timeout for fetch",
EnvVar: "CONTAINERD_FETCH_TIMEOUT",
},
cli.StringFlag{
Name: "path, p",
Usage: "path to content store",
Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content
EnvVar: "CONTAINERD_DIST_CONTENT_STORE",
},
cli.Int64Flag{
Name: "expected-size",
Usage: "validate against provided size",
},
cli.StringFlag{
Name: "expected-digest",
Usage: "verify content against expected digest",
},
},
Action: func(context *cli.Context) error {
var (
ctx = contextpkg.Background()
timeout = context.Duration("timeout")
root = context.String("path")
ref = context.Args().First()
expectedSize = context.Int64("expected-size")
expectedDigest = digest.Digest(context.String("expected-digest"))
)
if timeout > 0 {
var cancel func()
ctx, cancel = contextpkg.WithTimeout(ctx, timeout)
defer cancel()
}
if err := expectedDigest.Validate(); expectedDigest != "" && err != nil {
return err
}
if !filepath.IsAbs(root) {
var err error
root, err = filepath.Abs(root)
if err != nil {
return err
}
}
cs, err := content.Open(root)
if err != nil {
return err
}
if expectedDigest != "" {
if ok, err := cs.Exists(expectedDigest); err != nil {
return err
} else if ok {
fmt.Fprintf(os.Stderr, "content with digest %v already exists\n", expectedDigest)
return nil
}
}
if ref == "" {
if expectedDigest == "" {
return fmt.Errorf("must specify a transaction reference or expected digest")
}
ref = strings.Replace(expectedDigest.String(), ":", "-", -1)
}
// TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect
// all data to be written in a single invocation. Allow multiple writes
// to the same transaction key followed by a commit.
return content.WriteBlob(cs, os.Stdin, ref, expectedSize, expectedDigest)
},
}

85
cmd/dist/list.go vendored Normal file
View file

@ -0,0 +1,85 @@
package main
import (
contextpkg "context"
"fmt"
"os"
"path/filepath"
"text/tabwriter"
"time"
"github.com/docker/containerd/content"
"github.com/docker/containerd/log"
units "github.com/docker/go-units"
digest "github.com/opencontainers/go-digest"
"github.com/urfave/cli"
)
var listCommand = cli.Command{
Name: "list",
Aliases: []string{"ls"},
Usage: "list all blobs in the store.",
ArgsUsage: "[flags] [<prefix>, ...]",
Description: `List blobs in the content store.`,
Flags: []cli.Flag{
cli.StringFlag{
Name: "root",
Usage: "path to content store root",
Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content
},
cli.BoolFlag{
Name: "quiet, q",
Usage: "print only the blob digest",
},
},
Action: func(context *cli.Context) error {
var (
ctx = contextpkg.Background()
root = context.String("root")
quiet = context.Bool("quiet")
args = []string(context.Args())
)
if !filepath.IsAbs(root) {
var err error
root, err = filepath.Abs(root)
if err != nil {
return err
}
}
cs, err := content.Open(root)
if err != nil {
return err
}
if len(args) > 0 {
// TODO(stevvooe): Implement selection of a few blobs. Not sure
// what kind of efficiency gains we can actually get here.
log.G(ctx).Warnf("args ignored; need to implement matchers")
}
var walkFn content.WalkFunc
if quiet {
walkFn = func(path string, fi os.FileInfo, dgst digest.Digest) error {
fmt.Println(dgst)
return nil
}
} else {
tw := tabwriter.NewWriter(os.Stdout, 1, 8, 1, '\t', 0)
defer tw.Flush()
fmt.Fprintf(tw, "DIGEST\tSIZE\tAGE\n")
walkFn = func(path string, fi os.FileInfo, dgst digest.Digest) error {
fmt.Fprintf(tw, "%s\t%s\t%s\n",
dgst,
units.HumanSize(float64(fi.Size())),
units.HumanDuration(time.Since(fi.ModTime())))
return nil
}
}
return cs.Walk(walkFn)
},
}

5
cmd/dist/main.go vendored
View file

@ -30,6 +30,11 @@ distribution tool
} }
app.Commands = []cli.Command{ app.Commands = []cli.Command{
fetchCommand, fetchCommand,
ingestCommand,
activeCommand,
pathCommand,
deleteCommand,
listCommand,
} }
app.Before = func(context *cli.Context) error { app.Before = func(context *cli.Context) error {
if context.GlobalBool("debug") { if context.GlobalBool("debug") {

89
cmd/dist/path.go vendored Normal file
View file

@ -0,0 +1,89 @@
package main
import (
contextpkg "context"
"fmt"
"path/filepath"
"github.com/docker/containerd/content"
"github.com/docker/containerd/log"
digest "github.com/opencontainers/go-digest"
"github.com/urfave/cli"
)
var pathCommand = cli.Command{
Name: "path",
Usage: "print the path to one or more blobs",
ArgsUsage: "[flags] [<digest>, ...]",
Description: `Display the paths to one or more blobs.
Output paths can be used to directly access blobs on disk.`,
Flags: []cli.Flag{
cli.StringFlag{
Name: "root",
Usage: "path to content store root",
Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content
EnvVar: "CONTAINERD_DIST_CONTENT_STORE",
},
cli.BoolFlag{
Name: "quiet, q",
Usage: "elide digests in output",
},
},
Action: func(context *cli.Context) error {
var (
ctx = contextpkg.Background()
root = context.String("root")
args = []string(context.Args())
quiet = context.Bool("quiet")
exitError error
)
if !filepath.IsAbs(root) {
var err error
root, err = filepath.Abs(root)
if err != nil {
return err
}
}
cs, err := content.Open(root)
if err != nil {
return err
}
// TODO(stevvooe): Take the set of paths from stdin.
if len(args) < 1 {
return fmt.Errorf("please specify a blob digest")
}
for _, arg := range args {
dgst, err := digest.Parse(arg)
if err != nil {
log.G(ctx).WithError(err).Errorf("parsing %q as digest failed", arg)
if exitError == nil {
exitError = err
}
continue
}
p, err := cs.GetPath(dgst)
if err != nil {
log.G(ctx).WithError(err).Errorf("getting path for %q failed", dgst)
if exitError == nil {
exitError = err
}
continue
}
if !quiet {
fmt.Println(dgst, p)
} else {
fmt.Println(p)
}
}
return exitError
},
}

View file

@ -6,6 +6,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
"time"
"github.com/docker/containerd/log" "github.com/docker/containerd/log"
"github.com/nightlyone/lockfile" "github.com/nightlyone/lockfile"
@ -43,11 +44,58 @@ func Open(root string) (*Store, error) {
} }
type Status struct { type Status struct {
Ref string Ref string
Size int64 Size int64
Meta interface{} ModTime time.Time
Meta interface{}
} }
func (cs *Store) Exists(dgst digest.Digest) (bool, error) {
if _, err := os.Stat(cs.blobPath(dgst)); err != nil {
if !os.IsNotExist(err) {
return false, err
}
return false, nil
}
return true, nil
}
func (cs *Store) GetPath(dgst digest.Digest) (string, error) {
p := cs.blobPath(dgst)
if _, err := os.Stat(p); err != nil {
if os.IsNotExist(err) {
return "", ErrBlobNotFound
}
return "", err
}
return p, nil
}
// Delete removes a blob by its digest.
//
// While this is safe to do concurrently, safe exist-removal logic must hold
// some global lock on the store.
func (cs *Store) Delete(dgst digest.Digest) error {
if err := os.RemoveAll(cs.blobPath(dgst)); err != nil {
if !os.IsNotExist(err) {
return err
}
return nil
}
return nil
}
func (cs *Store) blobPath(dgst digest.Digest) string {
return filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex())
}
// Stat returns the current status of a blob by the ingest ref.
func (cs *Store) Stat(ref string) (Status, error) { func (cs *Store) Stat(ref string) (Status, error) {
dp := filepath.Join(cs.ingestRoot(ref), "data") dp := filepath.Join(cs.ingestRoot(ref), "data")
return cs.stat(dp) return cs.stat(dp)
@ -67,10 +115,10 @@ func (cs *Store) stat(ingestPath string) (Status, error) {
} }
return Status{ return Status{
Ref: ref, Ref: ref,
Size: dfi.Size(), Size: dfi.Size(),
ModTime: dfi.ModTime(),
}, nil }, nil
} }
func (cs *Store) Active() ([]Status, error) { func (cs *Store) Active() ([]Status, error) {
@ -114,7 +162,14 @@ func (cs *Store) Active() ([]Status, error) {
// TODO(stevvooe): Allow querying the set of blobs in the blob store. // TODO(stevvooe): Allow querying the set of blobs in the blob store.
func (cs *Store) Walk(fn func(path string, dgst digest.Digest) error) error { // WalkFunc defines the callback for a blob walk.
//
// TODO(stevvooe): Remove the file info. Just need size and modtime. Perhaps,
// not a huge deal, considering we have a path, but let's not just let this one
// go without scrunity.
type WalkFunc func(path string, fi os.FileInfo, dgst digest.Digest) error
func (cs *Store) Walk(fn WalkFunc) error {
root := filepath.Join(cs.root, "blobs") root := filepath.Join(cs.root, "blobs")
var alg digest.Algorithm var alg digest.Algorithm
return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error { return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {
@ -148,23 +203,10 @@ func (cs *Store) Walk(fn func(path string, dgst digest.Digest) error) error {
// store or extra paths not expected previously. // store or extra paths not expected previously.
} }
return fn(path, dgst) return fn(path, fi, dgst)
}) })
} }
func (cs *Store) GetPath(dgst digest.Digest) (string, error) {
p := filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex())
if _, err := os.Stat(p); err != nil {
if os.IsNotExist(err) {
return "", ErrBlobNotFound
}
return "", err
}
return p, nil
}
// Begin starts a new write transaction against the blob store. // Begin starts a new write transaction against the blob store.
// //
// The argument `ref` is used to identify the transaction. It must be a valid // The argument `ref` is used to identify the transaction. It must be a valid
@ -267,6 +309,20 @@ func (cs *Store) Resume(ref string) (*Writer, error) {
}, nil }, nil
} }
// Remove an active transaction keyed by ref.
func (cs *Store) Remove(ref string) error {
root := cs.ingestRoot(ref)
if err := os.RemoveAll(root); err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
return nil
}
func (cs *Store) ingestRoot(ref string) string { func (cs *Store) ingestRoot(ref string) string {
dgst := digest.FromString(ref) dgst := digest.FromString(ref)
return filepath.Join(cs.root, "ingest", dgst.Hex()) return filepath.Join(cs.root, "ingest", dgst.Hex())

View file

@ -14,6 +14,7 @@ import (
"reflect" "reflect"
"runtime" "runtime"
"testing" "testing"
"time"
"github.com/opencontainers/go-digest" "github.com/opencontainers/go-digest"
) )
@ -59,6 +60,12 @@ func TestContentWriter(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// clear out the time and meta cause we don't care for this test
for i := range ingestions {
ingestions[i].Meta = nil
ingestions[i].ModTime = time.Time{}
}
if !reflect.DeepEqual(ingestions, []Status{ if !reflect.DeepEqual(ingestions, []Status{
{ {
Ref: "myref", Ref: "myref",
@ -129,7 +136,7 @@ func TestWalkBlobs(t *testing.T) {
expected[dgst] = struct{}{} expected[dgst] = struct{}{}
} }
if err := cs.Walk(func(path string, dgst digest.Digest) error { if err := cs.Walk(func(path string, fi os.FileInfo, dgst digest.Digest) error {
found[dgst] = struct{}{} found[dgst] = struct{}{}
if checked := checkBlobPath(t, cs, dgst); checked != path { if checked := checkBlobPath(t, cs, dgst); checked != path {
t.Fatalf("blob path did not match: %v != %v", path, checked) t.Fatalf("blob path did not match: %v != %v", path, checked)
@ -266,7 +273,7 @@ func checkBlobPath(t *testing.T, cs *Store, dgst digest.Digest) string {
} }
func checkWrite(t checker, cs *Store, dgst digest.Digest, p []byte) digest.Digest { func checkWrite(t checker, cs *Store, dgst digest.Digest, p []byte) digest.Digest {
if err := WriteBlob(cs, bytes.NewReader(p), int64(len(p)), dgst); err != nil { if err := WriteBlob(cs, bytes.NewReader(p), dgst.String(), int64(len(p)), dgst); err != nil {
t.Fatal(err) t.Fatal(err)
} }

View file

@ -39,8 +39,8 @@ type Ingester interface {
// This is useful when the digest and size are known beforehand. // This is useful when the digest and size are known beforehand.
// //
// Copy is buffered, so no need to wrap reader in buffered io. // Copy is buffered, so no need to wrap reader in buffered io.
func WriteBlob(cs Ingester, r io.Reader, size int64, expected digest.Digest) error { func WriteBlob(cs Ingester, r io.Reader, ref string, size int64, expected digest.Digest) error {
cw, err := cs.Begin(expected.Hex()) cw, err := cs.Begin(ref)
if err != nil { if err != nil {
return err return err
} }
@ -52,7 +52,7 @@ func WriteBlob(cs Ingester, r io.Reader, size int64, expected digest.Digest) err
return err return err
} }
if nn != size { if size > 0 && nn != size {
return errors.Errorf("failed size verification: %v != %v", nn, size) return errors.Errorf("failed size verification: %v != %v", nn, size)
} }

View file

@ -25,6 +25,21 @@ func (cw *Writer) Ref() string {
return cw.ref return cw.ref
} }
// Size returns the current size written.
//
// Cannot be called concurrently with `Write`. If you need need concurrent
// status, query it with `Store.Stat`.
func (cw *Writer) Size() int64 {
return cw.offset
}
// Digest returns the current digest of the content, up to the current write.
//
// Cannot be called concurrently with `Write`.
func (cw *Writer) Digest() digest.Digest {
return cw.digester.Digest()
}
// Write p to the transaction. // Write p to the transaction.
// //
// Note that writes are unbuffered to the backing file. When writing, it is // Note that writes are unbuffered to the backing file. When writing, it is
@ -32,6 +47,7 @@ func (cw *Writer) Ref() string {
func (cw *Writer) Write(p []byte) (n int, err error) { func (cw *Writer) Write(p []byte) (n int, err error) {
n, err = cw.fp.Write(p) n, err = cw.fp.Write(p)
cw.digester.Hash().Write(p[:n]) cw.digester.Hash().Write(p[:n])
cw.offset += int64(len(p))
return n, err return n, err
} }
@ -54,7 +70,7 @@ func (cw *Writer) Commit(size int64, expected digest.Digest) error {
return errors.Wrap(err, "failed to change ingest file permissions") return errors.Wrap(err, "failed to change ingest file permissions")
} }
if size != fi.Size() { if size > 0 && size != fi.Size() {
return errors.Errorf("failed size validation: %v != %v", fi.Size(), size) return errors.Errorf("failed size validation: %v != %v", fi.Size(), size)
} }
@ -63,24 +79,23 @@ func (cw *Writer) Commit(size int64, expected digest.Digest) error {
} }
dgst := cw.digester.Digest() dgst := cw.digester.Digest()
// TODO(stevvooe): Correctly handle missing expected digest or allow no
// expected digest at commit time.
if expected != "" && expected != dgst { if expected != "" && expected != dgst {
return errors.Errorf("unexpected digest: %v != %v", dgst, expected) return errors.Errorf("unexpected digest: %v != %v", dgst, expected)
} }
apath := filepath.Join(cw.cs.root, "blobs", dgst.Algorithm().String()) var (
if err := os.MkdirAll(apath, 0755); err != nil { ingest = filepath.Join(cw.path, "data")
target = cw.cs.blobPath(dgst)
)
// make sure parent directories of blob exist
if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil {
return err return err
} }
var (
ingest = filepath.Join(cw.path, "data")
target = filepath.Join(apath, dgst.Hex())
)
// clean up!! // clean up!!
defer os.RemoveAll(cw.path) defer os.RemoveAll(cw.path)
if err := os.Rename(ingest, target); err != nil { if err := os.Rename(ingest, target); err != nil {
if os.IsExist(err) { if os.IsExist(err) {
// collision with the target file! // collision with the target file!
@ -100,6 +115,9 @@ func (cw *Writer) Commit(size int64, expected digest.Digest) error {
// If one needs to resume the transaction, a new writer can be obtained from // If one needs to resume the transaction, a new writer can be obtained from
// `ContentStore.Resume` using the same key. The write can then be continued // `ContentStore.Resume` using the same key. The write can then be continued
// from it was left off. // from it was left off.
//
// To abandon a transaction completely, first call close then `Store.Remove` to
// clean up the associated resources.
func (cw *Writer) Close() (err error) { func (cw *Writer) Close() (err error) {
if err := unlock(cw.lock); err != nil { if err := unlock(cw.lock); err != nil {
log.Printf("unlock failed: %v", err) log.Printf("unlock failed: %v", err)