dist: expand functionality of the dist tool
With this change, we add the following commands to the dist tool: - `ingest`: verify and accept content into storage - `active`: display active ingest processes - `list`: list content in storage - `path`: provide a path to a blob by digest - `delete`: remove a piece of content from storage We demonstrate the utility with the following shell pipeline: ``` $ ./dist fetch docker.io/library/redis latest mediatype:application/vnd.docker.distribution.manifest.v2+json | \ jq -r '.layers[] | "./dist fetch docker.io/library/redis "+.digest + "| ./dist ingest --expected-digest "+.digest+" --expected-size "+(.size | tostring) +" docker.io/library/redis@"+.digest' | xargs -I{} -P10 -n1 sh -c "{}" ``` The above fetches a manifest, pipes it to jq, which assembles a shell pipeline to ingest each layer into the content store. Because the transactions are keyed by their digest, concurrent downloads and downloads of repeated content are ignored. Each process is then executed parallel using xargs. Put shortly, this is a parallel layer download. In a separate shell session, could monitor the active downloads with the following: ``` $ watch -n0.2 ./dist active ``` For now, the content is downloaded into `.content` in the current working directory. To watch the contents of this directory, you can use the following: ``` $ watch -n0.2 tree .content ``` This will help to understand what is going on internally. To get access to the layers, you can use the path command: ``` $./dist path sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa /home/sjd/go/src/github.com/docker/containerd/.content/blobs/sha256/010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa ``` When you are done, you can clear out the content with the classic xargs pipeline: ``` $ ./dist list -q | xargs ./dist delete ``` Note that this is mostly a POC. Things like failed downloads and abandoned download cleanup aren't quite handled. We'll probably make adjustments around how content store transactions are handled to address this. From here, we'll build out full image pull and create tooling to get runtime bundles from the fetched content. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
3c44ec5dbc
commit
f9cd9be61a
10 changed files with 532 additions and 36 deletions
68
cmd/dist/active.go
vendored
Normal file
68
cmd/dist/active.go
vendored
Normal file
|
@ -0,0 +1,68 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"github.com/docker/containerd/content"
|
||||
units "github.com/docker/go-units"
|
||||
"github.com/urfave/cli"
|
||||
)
|
||||
|
||||
var activeCommand = cli.Command{
|
||||
Name: "active",
|
||||
Usage: "display active transfers.",
|
||||
ArgsUsage: "[flags] [<key>, ...]",
|
||||
Description: `Display the ongoing transfers.`,
|
||||
Flags: []cli.Flag{
|
||||
cli.DurationFlag{
|
||||
Name: "timeout, t",
|
||||
Usage: "total timeout for fetch",
|
||||
EnvVar: "CONTAINERD_FETCH_TIMEOUT",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "root",
|
||||
Usage: "path to content store root",
|
||||
Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content
|
||||
},
|
||||
},
|
||||
Action: func(context *cli.Context) error {
|
||||
var (
|
||||
// ctx = contextpkg.Background()
|
||||
root = context.String("root")
|
||||
)
|
||||
|
||||
if !filepath.IsAbs(root) {
|
||||
var err error
|
||||
root, err = filepath.Abs(root)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
cs, err := content.Open(root)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
active, err := cs.Active()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tw := tabwriter.NewWriter(os.Stdout, 1, 8, 1, '\t', 0)
|
||||
fmt.Fprintf(tw, "REF\tSIZE\tAGE\n")
|
||||
for _, active := range active {
|
||||
fmt.Fprintf(tw, "%s\t%s\t%s\n",
|
||||
active.Ref,
|
||||
units.HumanSize(float64(active.Size)),
|
||||
units.HumanDuration(time.Since(active.ModTime)))
|
||||
}
|
||||
tw.Flush()
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
72
cmd/dist/delete.go
vendored
Normal file
72
cmd/dist/delete.go
vendored
Normal file
|
@ -0,0 +1,72 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
contextpkg "context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/docker/containerd/content"
|
||||
"github.com/docker/containerd/log"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/urfave/cli"
|
||||
)
|
||||
|
||||
var deleteCommand = cli.Command{
|
||||
Name: "delete",
|
||||
Aliases: []string{"del"},
|
||||
Usage: "permanently delete one or more blobs.",
|
||||
ArgsUsage: "[flags] [<digest>, ...]",
|
||||
Description: `Delete one or more blobs permanently. Successfully deleted
|
||||
blobs are printed to stdout.`,
|
||||
Flags: []cli.Flag{
|
||||
cli.StringFlag{
|
||||
Name: "root",
|
||||
Usage: "path to content store root",
|
||||
Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content
|
||||
},
|
||||
},
|
||||
Action: func(context *cli.Context) error {
|
||||
var (
|
||||
ctx = contextpkg.Background()
|
||||
root = context.String("root")
|
||||
args = []string(context.Args())
|
||||
exitError error
|
||||
)
|
||||
|
||||
if !filepath.IsAbs(root) {
|
||||
var err error
|
||||
root, err = filepath.Abs(root)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
cs, err := content.Open(root)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, arg := range args {
|
||||
dgst, err := digest.Parse(arg)
|
||||
if err != nil {
|
||||
if exitError == nil {
|
||||
exitError = err
|
||||
}
|
||||
log.G(ctx).WithError(err).Errorf("could not delete %v", dgst)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := cs.Delete(dgst); err != nil {
|
||||
if exitError == nil {
|
||||
exitError = err
|
||||
}
|
||||
log.G(ctx).WithError(err).Errorf("could not delete %v", dgst)
|
||||
continue
|
||||
}
|
||||
|
||||
fmt.Println(dgst)
|
||||
}
|
||||
|
||||
return exitError
|
||||
},
|
||||
}
|
96
cmd/dist/ingest.go
vendored
Normal file
96
cmd/dist/ingest.go
vendored
Normal file
|
@ -0,0 +1,96 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
contextpkg "context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/docker/containerd/content"
|
||||
"github.com/opencontainers/go-digest"
|
||||
"github.com/urfave/cli"
|
||||
)
|
||||
|
||||
var ingestCommand = cli.Command{
|
||||
Name: "ingest",
|
||||
Usage: "accept content into the store",
|
||||
ArgsUsage: "[flags] <key>",
|
||||
Description: `Ingest objects into the local content store.`,
|
||||
Flags: []cli.Flag{
|
||||
cli.DurationFlag{
|
||||
Name: "timeout",
|
||||
Usage: "total timeout for fetch",
|
||||
EnvVar: "CONTAINERD_FETCH_TIMEOUT",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "path, p",
|
||||
Usage: "path to content store",
|
||||
Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content
|
||||
EnvVar: "CONTAINERD_DIST_CONTENT_STORE",
|
||||
},
|
||||
cli.Int64Flag{
|
||||
Name: "expected-size",
|
||||
Usage: "validate against provided size",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "expected-digest",
|
||||
Usage: "verify content against expected digest",
|
||||
},
|
||||
},
|
||||
Action: func(context *cli.Context) error {
|
||||
var (
|
||||
ctx = contextpkg.Background()
|
||||
timeout = context.Duration("timeout")
|
||||
root = context.String("path")
|
||||
ref = context.Args().First()
|
||||
expectedSize = context.Int64("expected-size")
|
||||
expectedDigest = digest.Digest(context.String("expected-digest"))
|
||||
)
|
||||
|
||||
if timeout > 0 {
|
||||
var cancel func()
|
||||
ctx, cancel = contextpkg.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
if err := expectedDigest.Validate(); expectedDigest != "" && err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !filepath.IsAbs(root) {
|
||||
var err error
|
||||
root, err = filepath.Abs(root)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
cs, err := content.Open(root)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if expectedDigest != "" {
|
||||
if ok, err := cs.Exists(expectedDigest); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
fmt.Fprintf(os.Stderr, "content with digest %v already exists\n", expectedDigest)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if ref == "" {
|
||||
if expectedDigest == "" {
|
||||
return fmt.Errorf("must specify a transaction reference or expected digest")
|
||||
}
|
||||
|
||||
ref = strings.Replace(expectedDigest.String(), ":", "-", -1)
|
||||
}
|
||||
|
||||
// TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect
|
||||
// all data to be written in a single invocation. Allow multiple writes
|
||||
// to the same transaction key followed by a commit.
|
||||
return content.WriteBlob(cs, os.Stdin, ref, expectedSize, expectedDigest)
|
||||
},
|
||||
}
|
85
cmd/dist/list.go
vendored
Normal file
85
cmd/dist/list.go
vendored
Normal file
|
@ -0,0 +1,85 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
contextpkg "context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"github.com/docker/containerd/content"
|
||||
"github.com/docker/containerd/log"
|
||||
units "github.com/docker/go-units"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/urfave/cli"
|
||||
)
|
||||
|
||||
var listCommand = cli.Command{
|
||||
Name: "list",
|
||||
Aliases: []string{"ls"},
|
||||
Usage: "list all blobs in the store.",
|
||||
ArgsUsage: "[flags] [<prefix>, ...]",
|
||||
Description: `List blobs in the content store.`,
|
||||
Flags: []cli.Flag{
|
||||
cli.StringFlag{
|
||||
Name: "root",
|
||||
Usage: "path to content store root",
|
||||
Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content
|
||||
},
|
||||
cli.BoolFlag{
|
||||
Name: "quiet, q",
|
||||
Usage: "print only the blob digest",
|
||||
},
|
||||
},
|
||||
Action: func(context *cli.Context) error {
|
||||
var (
|
||||
ctx = contextpkg.Background()
|
||||
root = context.String("root")
|
||||
quiet = context.Bool("quiet")
|
||||
args = []string(context.Args())
|
||||
)
|
||||
|
||||
if !filepath.IsAbs(root) {
|
||||
var err error
|
||||
root, err = filepath.Abs(root)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
cs, err := content.Open(root)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(args) > 0 {
|
||||
// TODO(stevvooe): Implement selection of a few blobs. Not sure
|
||||
// what kind of efficiency gains we can actually get here.
|
||||
log.G(ctx).Warnf("args ignored; need to implement matchers")
|
||||
}
|
||||
|
||||
var walkFn content.WalkFunc
|
||||
if quiet {
|
||||
walkFn = func(path string, fi os.FileInfo, dgst digest.Digest) error {
|
||||
fmt.Println(dgst)
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
tw := tabwriter.NewWriter(os.Stdout, 1, 8, 1, '\t', 0)
|
||||
defer tw.Flush()
|
||||
|
||||
fmt.Fprintf(tw, "DIGEST\tSIZE\tAGE\n")
|
||||
walkFn = func(path string, fi os.FileInfo, dgst digest.Digest) error {
|
||||
fmt.Fprintf(tw, "%s\t%s\t%s\n",
|
||||
dgst,
|
||||
units.HumanSize(float64(fi.Size())),
|
||||
units.HumanDuration(time.Since(fi.ModTime())))
|
||||
return nil
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return cs.Walk(walkFn)
|
||||
},
|
||||
}
|
5
cmd/dist/main.go
vendored
5
cmd/dist/main.go
vendored
|
@ -30,6 +30,11 @@ distribution tool
|
|||
}
|
||||
app.Commands = []cli.Command{
|
||||
fetchCommand,
|
||||
ingestCommand,
|
||||
activeCommand,
|
||||
pathCommand,
|
||||
deleteCommand,
|
||||
listCommand,
|
||||
}
|
||||
app.Before = func(context *cli.Context) error {
|
||||
if context.GlobalBool("debug") {
|
||||
|
|
89
cmd/dist/path.go
vendored
Normal file
89
cmd/dist/path.go
vendored
Normal file
|
@ -0,0 +1,89 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
contextpkg "context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/docker/containerd/content"
|
||||
"github.com/docker/containerd/log"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/urfave/cli"
|
||||
)
|
||||
|
||||
var pathCommand = cli.Command{
|
||||
Name: "path",
|
||||
Usage: "print the path to one or more blobs",
|
||||
ArgsUsage: "[flags] [<digest>, ...]",
|
||||
Description: `Display the paths to one or more blobs.
|
||||
|
||||
Output paths can be used to directly access blobs on disk.`,
|
||||
Flags: []cli.Flag{
|
||||
cli.StringFlag{
|
||||
Name: "root",
|
||||
Usage: "path to content store root",
|
||||
Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content
|
||||
EnvVar: "CONTAINERD_DIST_CONTENT_STORE",
|
||||
},
|
||||
cli.BoolFlag{
|
||||
Name: "quiet, q",
|
||||
Usage: "elide digests in output",
|
||||
},
|
||||
},
|
||||
Action: func(context *cli.Context) error {
|
||||
var (
|
||||
ctx = contextpkg.Background()
|
||||
root = context.String("root")
|
||||
args = []string(context.Args())
|
||||
quiet = context.Bool("quiet")
|
||||
exitError error
|
||||
)
|
||||
|
||||
if !filepath.IsAbs(root) {
|
||||
var err error
|
||||
root, err = filepath.Abs(root)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
cs, err := content.Open(root)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO(stevvooe): Take the set of paths from stdin.
|
||||
|
||||
if len(args) < 1 {
|
||||
return fmt.Errorf("please specify a blob digest")
|
||||
}
|
||||
|
||||
for _, arg := range args {
|
||||
dgst, err := digest.Parse(arg)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("parsing %q as digest failed", arg)
|
||||
if exitError == nil {
|
||||
exitError = err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
p, err := cs.GetPath(dgst)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("getting path for %q failed", dgst)
|
||||
if exitError == nil {
|
||||
exitError = err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if !quiet {
|
||||
fmt.Println(dgst, p)
|
||||
} else {
|
||||
fmt.Println(p)
|
||||
}
|
||||
}
|
||||
|
||||
return exitError
|
||||
},
|
||||
}
|
|
@ -6,6 +6,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/docker/containerd/log"
|
||||
"github.com/nightlyone/lockfile"
|
||||
|
@ -45,9 +46,56 @@ func Open(root string) (*Store, error) {
|
|||
type Status struct {
|
||||
Ref string
|
||||
Size int64
|
||||
ModTime time.Time
|
||||
Meta interface{}
|
||||
}
|
||||
|
||||
func (cs *Store) Exists(dgst digest.Digest) (bool, error) {
|
||||
if _, err := os.Stat(cs.blobPath(dgst)); err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (cs *Store) GetPath(dgst digest.Digest) (string, error) {
|
||||
p := cs.blobPath(dgst)
|
||||
if _, err := os.Stat(p); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return "", ErrBlobNotFound
|
||||
}
|
||||
|
||||
return "", err
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// Delete removes a blob by its digest.
|
||||
//
|
||||
// While this is safe to do concurrently, safe exist-removal logic must hold
|
||||
// some global lock on the store.
|
||||
func (cs *Store) Delete(dgst digest.Digest) error {
|
||||
if err := os.RemoveAll(cs.blobPath(dgst)); err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *Store) blobPath(dgst digest.Digest) string {
|
||||
return filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex())
|
||||
}
|
||||
|
||||
// Stat returns the current status of a blob by the ingest ref.
|
||||
func (cs *Store) Stat(ref string) (Status, error) {
|
||||
dp := filepath.Join(cs.ingestRoot(ref), "data")
|
||||
return cs.stat(dp)
|
||||
|
@ -69,8 +117,8 @@ func (cs *Store) stat(ingestPath string) (Status, error) {
|
|||
return Status{
|
||||
Ref: ref,
|
||||
Size: dfi.Size(),
|
||||
ModTime: dfi.ModTime(),
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
func (cs *Store) Active() ([]Status, error) {
|
||||
|
@ -114,7 +162,14 @@ func (cs *Store) Active() ([]Status, error) {
|
|||
|
||||
// TODO(stevvooe): Allow querying the set of blobs in the blob store.
|
||||
|
||||
func (cs *Store) Walk(fn func(path string, dgst digest.Digest) error) error {
|
||||
// WalkFunc defines the callback for a blob walk.
|
||||
//
|
||||
// TODO(stevvooe): Remove the file info. Just need size and modtime. Perhaps,
|
||||
// not a huge deal, considering we have a path, but let's not just let this one
|
||||
// go without scrunity.
|
||||
type WalkFunc func(path string, fi os.FileInfo, dgst digest.Digest) error
|
||||
|
||||
func (cs *Store) Walk(fn WalkFunc) error {
|
||||
root := filepath.Join(cs.root, "blobs")
|
||||
var alg digest.Algorithm
|
||||
return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {
|
||||
|
@ -148,23 +203,10 @@ func (cs *Store) Walk(fn func(path string, dgst digest.Digest) error) error {
|
|||
// store or extra paths not expected previously.
|
||||
}
|
||||
|
||||
return fn(path, dgst)
|
||||
return fn(path, fi, dgst)
|
||||
})
|
||||
}
|
||||
|
||||
func (cs *Store) GetPath(dgst digest.Digest) (string, error) {
|
||||
p := filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex())
|
||||
if _, err := os.Stat(p); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return "", ErrBlobNotFound
|
||||
}
|
||||
|
||||
return "", err
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// Begin starts a new write transaction against the blob store.
|
||||
//
|
||||
// The argument `ref` is used to identify the transaction. It must be a valid
|
||||
|
@ -267,6 +309,20 @@ func (cs *Store) Resume(ref string) (*Writer, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
// Remove an active transaction keyed by ref.
|
||||
func (cs *Store) Remove(ref string) error {
|
||||
root := cs.ingestRoot(ref)
|
||||
if err := os.RemoveAll(root); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *Store) ingestRoot(ref string) string {
|
||||
dgst := digest.FromString(ref)
|
||||
return filepath.Join(cs.root, "ingest", dgst.Hex())
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"reflect"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
@ -59,6 +60,12 @@ func TestContentWriter(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// clear out the time and meta cause we don't care for this test
|
||||
for i := range ingestions {
|
||||
ingestions[i].Meta = nil
|
||||
ingestions[i].ModTime = time.Time{}
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(ingestions, []Status{
|
||||
{
|
||||
Ref: "myref",
|
||||
|
@ -129,7 +136,7 @@ func TestWalkBlobs(t *testing.T) {
|
|||
expected[dgst] = struct{}{}
|
||||
}
|
||||
|
||||
if err := cs.Walk(func(path string, dgst digest.Digest) error {
|
||||
if err := cs.Walk(func(path string, fi os.FileInfo, dgst digest.Digest) error {
|
||||
found[dgst] = struct{}{}
|
||||
if checked := checkBlobPath(t, cs, dgst); checked != path {
|
||||
t.Fatalf("blob path did not match: %v != %v", path, checked)
|
||||
|
@ -266,7 +273,7 @@ func checkBlobPath(t *testing.T, cs *Store, dgst digest.Digest) string {
|
|||
}
|
||||
|
||||
func checkWrite(t checker, cs *Store, dgst digest.Digest, p []byte) digest.Digest {
|
||||
if err := WriteBlob(cs, bytes.NewReader(p), int64(len(p)), dgst); err != nil {
|
||||
if err := WriteBlob(cs, bytes.NewReader(p), dgst.String(), int64(len(p)), dgst); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -39,8 +39,8 @@ type Ingester interface {
|
|||
// This is useful when the digest and size are known beforehand.
|
||||
//
|
||||
// Copy is buffered, so no need to wrap reader in buffered io.
|
||||
func WriteBlob(cs Ingester, r io.Reader, size int64, expected digest.Digest) error {
|
||||
cw, err := cs.Begin(expected.Hex())
|
||||
func WriteBlob(cs Ingester, r io.Reader, ref string, size int64, expected digest.Digest) error {
|
||||
cw, err := cs.Begin(ref)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ func WriteBlob(cs Ingester, r io.Reader, size int64, expected digest.Digest) err
|
|||
return err
|
||||
}
|
||||
|
||||
if nn != size {
|
||||
if size > 0 && nn != size {
|
||||
return errors.Errorf("failed size verification: %v != %v", nn, size)
|
||||
}
|
||||
|
||||
|
|
|
@ -25,6 +25,21 @@ func (cw *Writer) Ref() string {
|
|||
return cw.ref
|
||||
}
|
||||
|
||||
// Size returns the current size written.
|
||||
//
|
||||
// Cannot be called concurrently with `Write`. If you need need concurrent
|
||||
// status, query it with `Store.Stat`.
|
||||
func (cw *Writer) Size() int64 {
|
||||
return cw.offset
|
||||
}
|
||||
|
||||
// Digest returns the current digest of the content, up to the current write.
|
||||
//
|
||||
// Cannot be called concurrently with `Write`.
|
||||
func (cw *Writer) Digest() digest.Digest {
|
||||
return cw.digester.Digest()
|
||||
}
|
||||
|
||||
// Write p to the transaction.
|
||||
//
|
||||
// Note that writes are unbuffered to the backing file. When writing, it is
|
||||
|
@ -32,6 +47,7 @@ func (cw *Writer) Ref() string {
|
|||
func (cw *Writer) Write(p []byte) (n int, err error) {
|
||||
n, err = cw.fp.Write(p)
|
||||
cw.digester.Hash().Write(p[:n])
|
||||
cw.offset += int64(len(p))
|
||||
return n, err
|
||||
}
|
||||
|
||||
|
@ -54,7 +70,7 @@ func (cw *Writer) Commit(size int64, expected digest.Digest) error {
|
|||
return errors.Wrap(err, "failed to change ingest file permissions")
|
||||
}
|
||||
|
||||
if size != fi.Size() {
|
||||
if size > 0 && size != fi.Size() {
|
||||
return errors.Errorf("failed size validation: %v != %v", fi.Size(), size)
|
||||
}
|
||||
|
||||
|
@ -63,24 +79,23 @@ func (cw *Writer) Commit(size int64, expected digest.Digest) error {
|
|||
}
|
||||
|
||||
dgst := cw.digester.Digest()
|
||||
// TODO(stevvooe): Correctly handle missing expected digest or allow no
|
||||
// expected digest at commit time.
|
||||
if expected != "" && expected != dgst {
|
||||
return errors.Errorf("unexpected digest: %v != %v", dgst, expected)
|
||||
}
|
||||
|
||||
apath := filepath.Join(cw.cs.root, "blobs", dgst.Algorithm().String())
|
||||
if err := os.MkdirAll(apath, 0755); err != nil {
|
||||
var (
|
||||
ingest = filepath.Join(cw.path, "data")
|
||||
target = cw.cs.blobPath(dgst)
|
||||
)
|
||||
|
||||
// make sure parent directories of blob exist
|
||||
if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var (
|
||||
ingest = filepath.Join(cw.path, "data")
|
||||
target = filepath.Join(apath, dgst.Hex())
|
||||
)
|
||||
|
||||
// clean up!!
|
||||
defer os.RemoveAll(cw.path)
|
||||
|
||||
if err := os.Rename(ingest, target); err != nil {
|
||||
if os.IsExist(err) {
|
||||
// collision with the target file!
|
||||
|
@ -100,6 +115,9 @@ func (cw *Writer) Commit(size int64, expected digest.Digest) error {
|
|||
// If one needs to resume the transaction, a new writer can be obtained from
|
||||
// `ContentStore.Resume` using the same key. The write can then be continued
|
||||
// from it was left off.
|
||||
//
|
||||
// To abandon a transaction completely, first call close then `Store.Remove` to
|
||||
// clean up the associated resources.
|
||||
func (cw *Writer) Close() (err error) {
|
||||
if err := unlock(cw.lock); err != nil {
|
||||
log.Printf("unlock failed: %v", err)
|
||||
|
|
Loading…
Reference in a new issue