content: refactor content store for API

After iterating on the GRPC API, the changes required for the actual
implementation are now included in the content store. The begin change
is the move to a single, atomic `Ingester.Writer` method for locking
content ingestion on a key. From this, comes several new interface
definitions.

The main benefit here is the clarification between `Status` and `Info`
that came out of the GPRC API. `Status` tells the status of a write,
whereas `Info` is for querying metadata about various blobs.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2017-02-17 00:07:02 -08:00
parent baaf7543dc
commit 621164bc84
No known key found for this signature in database
GPG key ID: 67B3DED84EDC823F
14 changed files with 573 additions and 611 deletions

23
cmd/dist/active.go vendored
View file

@ -3,11 +3,9 @@ package main
import (
"fmt"
"os"
"path/filepath"
"text/tabwriter"
"time"
"github.com/docker/containerd/content"
units "github.com/docker/go-units"
"github.com/urfave/cli"
)
@ -26,24 +24,11 @@ var activeCommand = cli.Command{
cli.StringFlag{
Name: "root",
Usage: "path to content store root",
Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content
Value: "/tmp/content", // TODO(stevvooe): for now, just use the PWD/.content
},
},
Action: func(context *cli.Context) error {
var (
// ctx = contextpkg.Background()
root = context.String("root")
)
if !filepath.IsAbs(root) {
var err error
root, err = filepath.Abs(root)
if err != nil {
return err
}
}
cs, err := content.Open(root)
cs, err := resolveContentStore(context)
if err != nil {
return err
}
@ -58,8 +43,8 @@ var activeCommand = cli.Command{
for _, active := range active {
fmt.Fprintf(tw, "%s\t%s\t%s\n",
active.Ref,
units.HumanSize(float64(active.Size)),
units.HumanDuration(time.Since(active.ModTime)))
units.HumanSize(float64(active.Offset)),
units.HumanDuration(time.Since(active.StartedAt)))
}
tw.Flush()

34
cmd/dist/common.go vendored Normal file
View file

@ -0,0 +1,34 @@
package main
import (
"net"
"path/filepath"
"time"
"github.com/docker/containerd/content"
"github.com/urfave/cli"
"google.golang.org/grpc"
)
func resolveContentStore(context *cli.Context) (*content.Store, error) {
root := context.GlobalString("root")
if !filepath.IsAbs(root) {
var err error
root, err = filepath.Abs(root)
if err != nil {
return nil, err
}
}
return content.NewStore(root)
}
func connectGRPC(context *cli.Context) (*grpc.ClientConn, error) {
socket := context.GlobalString("socket")
return grpc.Dial(socket,
grpc.WithBlock(),
grpc.WithInsecure(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", socket, timeout)
}),
)
}

21
cmd/dist/delete.go vendored
View file

@ -3,9 +3,7 @@ package main
import (
contextpkg "context"
"fmt"
"path/filepath"
"github.com/docker/containerd/content"
"github.com/docker/containerd/log"
digest "github.com/opencontainers/go-digest"
"github.com/urfave/cli"
@ -18,30 +16,15 @@ var deleteCommand = cli.Command{
ArgsUsage: "[flags] [<digest>, ...]",
Description: `Delete one or more blobs permanently. Successfully deleted
blobs are printed to stdout.`,
Flags: []cli.Flag{
cli.StringFlag{
Name: "root",
Usage: "path to content store root",
Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content
},
},
Flags: []cli.Flag{},
Action: func(context *cli.Context) error {
var (
ctx = contextpkg.Background()
root = context.String("root")
args = []string(context.Args())
exitError error
)
if !filepath.IsAbs(root) {
var err error
root, err = filepath.Abs(root)
if err != nil {
return err
}
}
cs, err := content.Open(root)
cs, err := resolveContentStore(context)
if err != nil {
return err
}

39
cmd/dist/get.go vendored Normal file
View file

@ -0,0 +1,39 @@
package main
import (
"io"
"os"
digest "github.com/opencontainers/go-digest"
"github.com/urfave/cli"
)
var getCommand = cli.Command{
Name: "get",
Usage: "get the data for an object",
ArgsUsage: "[flags] [<digest>, ...]",
Description: `Display the paths to one or more blobs.
Output paths can be used to directly access blobs on disk.`,
Flags: []cli.Flag{},
Action: func(context *cli.Context) error {
cs, err := resolveContentStore(context)
if err != nil {
return err
}
dgst, err := digest.Parse(context.Args().First())
if err != nil {
return err
}
rc, err := cs.Open(dgst)
if err != nil {
return err
}
defer rc.Close()
_, err = io.Copy(os.Stdout, rc)
return err
},
}

50
cmd/dist/ingest.go vendored
View file

@ -4,8 +4,6 @@ import (
contextpkg "context"
"fmt"
"os"
"path/filepath"
"strings"
"github.com/docker/containerd/content"
"github.com/opencontainers/go-digest"
@ -18,17 +16,6 @@ var ingestCommand = cli.Command{
ArgsUsage: "[flags] <key>",
Description: `Ingest objects into the local content store.`,
Flags: []cli.Flag{
cli.DurationFlag{
Name: "timeout",
Usage: "total timeout for fetch",
EnvVar: "CONTAINERD_FETCH_TIMEOUT",
},
cli.StringFlag{
Name: "path, p",
Usage: "path to content store",
Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content
EnvVar: "CONTAINERD_DIST_CONTENT_STORE",
},
cli.Int64Flag{
Name: "expected-size",
Usage: "validate against provided size",
@ -40,57 +27,32 @@ var ingestCommand = cli.Command{
},
Action: func(context *cli.Context) error {
var (
ctx = contextpkg.Background()
timeout = context.Duration("timeout")
root = context.String("path")
ctx = background
cancel func()
ref = context.Args().First()
expectedSize = context.Int64("expected-size")
expectedDigest = digest.Digest(context.String("expected-digest"))
)
if timeout > 0 {
var cancel func()
ctx, cancel = contextpkg.WithTimeout(ctx, timeout)
ctx, cancel = contextpkg.WithCancel(ctx)
defer cancel()
}
if err := expectedDigest.Validate(); expectedDigest != "" && err != nil {
return err
}
if !filepath.IsAbs(root) {
var err error
root, err = filepath.Abs(root)
cs, err := resolveContentStore(context)
if err != nil {
return err
}
}
cs, err := content.Open(root)
if err != nil {
return err
}
if expectedDigest != "" {
if ok, err := cs.Exists(expectedDigest); err != nil {
return err
} else if ok {
fmt.Fprintf(os.Stderr, "content with digest %v already exists\n", expectedDigest)
return nil
}
}
if ref == "" {
if expectedDigest == "" {
return fmt.Errorf("must specify a transaction reference or expected digest")
}
ref = strings.Replace(expectedDigest.String(), ":", "-", -1)
return fmt.Errorf("must specify a transaction reference")
}
// TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect
// all data to be written in a single invocation. Allow multiple writes
// to the same transaction key followed by a commit.
return content.WriteBlob(cs, os.Stdin, ref, expectedSize, expectedDigest)
return content.WriteBlob(ctx, cs, os.Stdin, ref, expectedSize, expectedDigest)
},
}

17
cmd/dist/list.go vendored
View file

@ -4,7 +4,6 @@ import (
contextpkg "context"
"fmt"
"os"
"path/filepath"
"text/tabwriter"
"time"
@ -22,11 +21,6 @@ var listCommand = cli.Command{
ArgsUsage: "[flags] [<prefix>, ...]",
Description: `List blobs in the content store.`,
Flags: []cli.Flag{
cli.StringFlag{
Name: "root",
Usage: "path to content store root",
Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content
},
cli.BoolFlag{
Name: "quiet, q",
Usage: "print only the blob digest",
@ -35,20 +29,11 @@ var listCommand = cli.Command{
Action: func(context *cli.Context) error {
var (
ctx = contextpkg.Background()
root = context.String("root")
quiet = context.Bool("quiet")
args = []string(context.Args())
)
if !filepath.IsAbs(root) {
var err error
root, err = filepath.Abs(root)
if err != nil {
return err
}
}
cs, err := content.Open(root)
cs, err := resolveContentStore(context)
if err != nil {
return err
}

27
cmd/dist/main.go vendored
View file

@ -1,6 +1,7 @@
package main
import (
contextpkg "context"
"fmt"
"os"
@ -9,6 +10,10 @@ import (
"github.com/urfave/cli"
)
var (
background = contextpkg.Background()
)
func main() {
app := cli.NewApp()
app.Name = "dist"
@ -27,20 +32,38 @@ distribution tool
Name: "debug",
Usage: "enable debug output in logs",
},
cli.DurationFlag{
Name: "timeout",
Usage: "total timeout for fetch",
EnvVar: "CONTAINERD_FETCH_TIMEOUT",
},
cli.StringFlag{
Name: "root",
Usage: "path to content store root",
Value: "/tmp/content", // TODO(stevvooe): for now, just use the PWD/.content
},
}
app.Commands = []cli.Command{
fetchCommand,
ingestCommand,
activeCommand,
pathCommand,
getCommand,
deleteCommand,
listCommand,
applyCommand,
}
app.Before = func(context *cli.Context) error {
if context.GlobalBool("debug") {
var (
debug = context.GlobalBool("debug")
timeout = context.GlobalDuration("timeout")
)
if debug {
logrus.SetLevel(logrus.DebugLevel)
}
if timeout > 0 {
background, _ = contextpkg.WithTimeout(background, timeout)
}
return nil
}
if err := app.Run(os.Args); err != nil {

89
cmd/dist/path.go vendored
View file

@ -1,89 +0,0 @@
package main
import (
contextpkg "context"
"fmt"
"path/filepath"
"github.com/docker/containerd/content"
"github.com/docker/containerd/log"
digest "github.com/opencontainers/go-digest"
"github.com/urfave/cli"
)
var pathCommand = cli.Command{
Name: "path",
Usage: "print the path to one or more blobs",
ArgsUsage: "[flags] [<digest>, ...]",
Description: `Display the paths to one or more blobs.
Output paths can be used to directly access blobs on disk.`,
Flags: []cli.Flag{
cli.StringFlag{
Name: "root",
Usage: "path to content store root",
Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content
EnvVar: "CONTAINERD_DIST_CONTENT_STORE",
},
cli.BoolFlag{
Name: "quiet, q",
Usage: "elide digests in output",
},
},
Action: func(context *cli.Context) error {
var (
ctx = contextpkg.Background()
root = context.String("root")
args = []string(context.Args())
quiet = context.Bool("quiet")
exitError error
)
if !filepath.IsAbs(root) {
var err error
root, err = filepath.Abs(root)
if err != nil {
return err
}
}
cs, err := content.Open(root)
if err != nil {
return err
}
// TODO(stevvooe): Take the set of paths from stdin.
if len(args) < 1 {
return fmt.Errorf("please specify a blob digest")
}
for _, arg := range args {
dgst, err := digest.Parse(arg)
if err != nil {
log.G(ctx).WithError(err).Errorf("parsing %q as digest failed", arg)
if exitError == nil {
exitError = err
}
continue
}
p, err := cs.GetPath(dgst)
if err != nil {
log.G(ctx).WithError(err).Errorf("getting path for %q failed", dgst)
if exitError == nil {
exitError = err
}
continue
}
if !quiet {
fmt.Println(dgst, p)
} else {
fmt.Println(p)
}
}
return exitError
},
}

View file

@ -1,364 +1,53 @@
package content
import (
"context"
"io"
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"
"github.com/docker/containerd/log"
"github.com/nightlyone/lockfile"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
var (
ErrBlobNotFound = errors.New("blob not found")
errNotFound = errors.New("content: not found")
bufPool = sync.Pool{
New: func() interface{} {
return make([]byte, 32<<10)
return make([]byte, 1<<20)
},
}
)
// Store is digest-keyed store for content. All data written into the store is
// stored under a verifiable digest.
//
// Store can generally support multi-reader, single-writer ingest of data,
// including resumable ingest.
type Store struct {
root string
type Info struct {
Digest digest.Digest
Size int64
CommittedAt time.Time
}
func Open(root string) (*Store, error) {
if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil && !os.IsExist(err) {
return nil, err
}
return &Store{
root: root,
}, nil
type Provider interface {
Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error)
}
type Status struct {
Ref string
Size int64
ModTime time.Time
Meta interface{}
Offset int64
StartedAt time.Time
UpdatedAt time.Time
}
func (cs *Store) Exists(dgst digest.Digest) (bool, error) {
if _, err := os.Stat(cs.blobPath(dgst)); err != nil {
if !os.IsNotExist(err) {
return false, err
type Writer interface {
io.WriteCloser
Status() (Status, error)
Digest() digest.Digest
Commit(size int64, expected digest.Digest) error
}
return false, nil
type Ingester interface {
Writer(ctx context.Context, ref string) (Writer, error)
}
return true, nil
}
func (cs *Store) GetPath(dgst digest.Digest) (string, error) {
p := cs.blobPath(dgst)
if _, err := os.Stat(p); err != nil {
if os.IsNotExist(err) {
return "", ErrBlobNotFound
}
return "", err
}
return p, nil
}
// Delete removes a blob by its digest.
//
// While this is safe to do concurrently, safe exist-removal logic must hold
// some global lock on the store.
func (cs *Store) Delete(dgst digest.Digest) error {
if err := os.RemoveAll(cs.blobPath(dgst)); err != nil {
if !os.IsNotExist(err) {
return err
}
return nil
}
return nil
}
func (cs *Store) blobPath(dgst digest.Digest) string {
return filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex())
}
// Stat returns the current status of a blob by the ingest ref.
func (cs *Store) Stat(ref string) (Status, error) {
dp := filepath.Join(cs.ingestRoot(ref), "data")
return cs.stat(dp)
}
// stat works like stat above except uses the path to the ingest.
func (cs *Store) stat(ingestPath string) (Status, error) {
dp := filepath.Join(ingestPath, "data")
dfi, err := os.Stat(dp)
if err != nil {
return Status{}, err
}
ref, err := readFileString(filepath.Join(ingestPath, "ref"))
if err != nil {
return Status{}, err
}
return Status{
Ref: ref,
Size: dfi.Size(),
ModTime: dfi.ModTime(),
}, nil
}
func (cs *Store) Active() ([]Status, error) {
ip := filepath.Join(cs.root, "ingest")
fp, err := os.Open(ip)
if err != nil {
return nil, err
}
fis, err := fp.Readdir(-1)
if err != nil {
return nil, err
}
var active []Status
for _, fi := range fis {
p := filepath.Join(ip, fi.Name())
stat, err := cs.stat(p)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
// TODO(stevvooe): This is a common error if uploads are being
// completed while making this listing. Need to consider taking a
// lock on the whole store to coordinate this aspect.
//
// Another option is to cleanup downloads asynchronously and
// coordinate this method with the cleanup process.
//
// For now, we just skip them, as they really don't exist.
continue
}
active = append(active, stat)
}
return active, nil
}
// TODO(stevvooe): Allow querying the set of blobs in the blob store.
// WalkFunc defines the callback for a blob walk.
//
// TODO(stevvooe): Remove the file info. Just need size and modtime. Perhaps,
// not a huge deal, considering we have a path, but let's not just let this one
// go without scrutiny.
type WalkFunc func(path string, fi os.FileInfo, dgst digest.Digest) error
func (cs *Store) Walk(fn WalkFunc) error {
root := filepath.Join(cs.root, "blobs")
var alg digest.Algorithm
return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if !fi.IsDir() && !alg.Available() {
return nil
}
// TODO(stevvooe): There are few more cases with subdirs that should be
// handled in case the layout gets corrupted. This isn't strict enough
// an may spew bad data.
if path == root {
return nil
}
if filepath.Dir(path) == root {
alg = digest.Algorithm(filepath.Base(path))
if !alg.Available() {
alg = ""
return filepath.SkipDir
}
// descending into a hash directory
return nil
}
dgst := digest.NewDigestFromHex(alg.String(), filepath.Base(path))
if err := dgst.Validate(); err != nil {
// log error but don't report
log.L.WithError(err).WithField("path", path).Error("invalid digest for blob path")
// if we see this, it could mean some sort of corruption of the
// store or extra paths not expected previously.
}
return fn(path, fi, dgst)
})
}
// Begin starts a new write transaction against the blob store.
//
// The argument `ref` is used to identify the transaction. It must be a valid
// path component, meaning it has no `/` characters and no `:` (we'll ban
// others fs characters, as needed).
func (cs *Store) Begin(ref string) (*Writer, error) {
path, refp, data, lock, err := cs.ingestPaths(ref)
if err != nil {
return nil, err
}
// use single path mkdir for this to ensure ref is only base path, in
// addition to validation above.
if err := os.Mkdir(path, 0755); err != nil {
return nil, err
}
if err := tryLock(lock); err != nil {
return nil, err
}
// write the ref to a file for later use
if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil {
return nil, err
}
fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0666)
if err != nil {
return nil, errors.Wrap(err, "failed to open data file")
}
defer fp.Close()
// re-open the file in append mode
fp, err = os.OpenFile(data, os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
return nil, errors.Wrap(err, "error opening for append")
}
return &Writer{
cs: cs,
fp: fp,
lock: lock,
path: path,
digester: digest.Canonical.Digester(),
}, nil
}
func (cs *Store) Resume(ref string) (*Writer, error) {
path, refp, data, lock, err := cs.ingestPaths(ref)
if err != nil {
return nil, err
}
if err := tryLock(lock); err != nil {
return nil, err
}
refraw, err := readFileString(refp)
if err != nil {
return nil, errors.Wrap(err, "could not read ref")
}
if ref != refraw {
// NOTE(stevvooe): This is fairly catastrophic. Either we have some
// layout corruption or a hash collision for the ref key.
return nil, errors.Wrapf(err, "ref key does not match: %v != %v", ref, refraw)
}
digester := digest.Canonical.Digester()
// slow slow slow!!, send to goroutine or use resumable hashes
fp, err := os.Open(data)
if err != nil {
return nil, err
}
defer fp.Close()
p := bufPool.Get().([]byte)
defer bufPool.Put(p)
offset, err := io.CopyBuffer(digester.Hash(), fp, p)
if err != nil {
return nil, err
}
fp1, err := os.OpenFile(data, os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
if os.IsNotExist(err) {
return nil, errors.Wrap(err, "ingest does not exist")
}
return nil, errors.Wrap(err, "error opening for append")
}
return &Writer{
cs: cs,
fp: fp1,
lock: lock,
ref: ref,
path: path,
offset: offset,
digester: digester,
}, nil
}
// Remove an active transaction keyed by ref.
func (cs *Store) Remove(ref string) error {
root := cs.ingestRoot(ref)
if err := os.RemoveAll(root); err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
return nil
}
func (cs *Store) ingestRoot(ref string) string {
dgst := digest.FromString(ref)
return filepath.Join(cs.root, "ingest", dgst.Hex())
}
// ingestPaths are returned, including the lockfile. The paths are the following:
//
// - root: entire ingest directory
// - ref: name of the starting ref, must be unique
// - data: file where data is written
// - lock: lock file location
//
func (cs *Store) ingestPaths(ref string) (string, string, string, lockfile.Lockfile, error) {
var (
fp = cs.ingestRoot(ref)
rp = filepath.Join(fp, "ref")
lp = filepath.Join(fp, "lock")
dp = filepath.Join(fp, "data")
)
lock, err := lockfile.New(lp)
if err != nil {
return "", "", "", "", errors.Wrapf(err, "error creating lockfile %v", lp)
}
return fp, rp, dp, lock, nil
}
func readFileString(path string) (string, error) {
p, err := ioutil.ReadFile(path)
return string(p), err
func IsNotFound(err error) bool {
return errors.Cause(err) == errNotFound
}

View file

@ -3,6 +3,7 @@ package content
import (
"bufio"
"bytes"
"context"
"crypto/rand"
_ "crypto/sha256" // required for digest package
"fmt"
@ -21,7 +22,7 @@ import (
)
func TestContentWriter(t *testing.T) {
tmpdir, cs, cleanup := contentStoreEnv(t)
ctx, tmpdir, cs, cleanup := contentStoreEnv(t)
defer cleanup()
defer testutil.DumpDir(t, tmpdir)
@ -29,7 +30,7 @@ func TestContentWriter(t *testing.T) {
t.Fatal("ingest dir should be created", err)
}
cw, err := cs.Begin("myref")
cw, err := cs.Writer(ctx, "myref")
if err != nil {
t.Fatal(err)
}
@ -37,20 +38,14 @@ func TestContentWriter(t *testing.T) {
t.Fatal(err)
}
// try to begin again with same ref, should fail
cw, err = cs.Begin("myref")
if err == nil {
t.Fatal("expected error on repeated begin")
}
// reopen, so we can test things
cw, err = cs.Resume("myref")
cw, err = cs.Writer(ctx, "myref")
if err != nil {
t.Fatal(err)
}
// make sure that second resume also fails
if _, err = cs.Resume("myref"); err == nil {
if _, err = cs.Writer(ctx, "myref"); err == nil {
// TODO(stevvooe): This also works across processes. Need to find a way
// to test that, as well.
t.Fatal("no error on second resume")
@ -64,14 +59,14 @@ func TestContentWriter(t *testing.T) {
// clear out the time and meta cause we don't care for this test
for i := range ingestions {
ingestions[i].Meta = nil
ingestions[i].ModTime = time.Time{}
ingestions[i].UpdatedAt = time.Time{}
ingestions[i].StartedAt = time.Time{}
}
if !reflect.DeepEqual(ingestions, []Status{
{
Ref: "myref",
Size: 0,
Offset: 0,
},
}) {
t.Fatalf("unexpected ingestion set: %v", ingestions)
@ -93,7 +88,7 @@ func TestContentWriter(t *testing.T) {
t.Fatal(err)
}
cw, err = cs.Begin("aref")
cw, err = cs.Writer(ctx, "aref")
if err != nil {
t.Fatal(err)
}
@ -119,7 +114,7 @@ func TestContentWriter(t *testing.T) {
}
func TestWalkBlobs(t *testing.T) {
_, cs, cleanup := contentStoreEnv(t)
ctx, _, cs, cleanup := contentStoreEnv(t)
defer cleanup()
const (
@ -128,7 +123,7 @@ func TestWalkBlobs(t *testing.T) {
)
var (
blobs = populateBlobStore(t, cs, nblobs, maxsize)
blobs = populateBlobStore(t, ctx, cs, nblobs, maxsize)
expected = map[digest.Digest]struct{}{}
found = map[digest.Digest]struct{}{}
)
@ -158,7 +153,7 @@ func TestWalkBlobs(t *testing.T) {
// for blobs. This seems to be due to the number of syscalls and file io we do
// coordinating the ingestion.
func BenchmarkIngests(b *testing.B) {
_, cs, cleanup := contentStoreEnv(b)
ctx, _, cs, cleanup := contentStoreEnv(b)
defer cleanup()
for _, size := range []int64{
@ -181,7 +176,7 @@ func BenchmarkIngests(b *testing.B) {
b.StartTimer()
for dgst, p := range blobs {
checkWrite(b, cs, dgst, p)
checkWrite(b, ctx, cs, dgst, p)
}
})
}
@ -208,17 +203,17 @@ func generateBlobs(t checker, nblobs, maxsize int64) map[digest.Digest][]byte {
return blobs
}
func populateBlobStore(t checker, cs *Store, nblobs, maxsize int64) map[digest.Digest][]byte {
func populateBlobStore(t checker, ctx context.Context, cs *Store, nblobs, maxsize int64) map[digest.Digest][]byte {
blobs := generateBlobs(t, nblobs, maxsize)
for dgst, p := range blobs {
checkWrite(t, cs, dgst, p)
checkWrite(t, ctx, cs, dgst, p)
}
return blobs
}
func contentStoreEnv(t checker) (string, *Store, func()) {
func contentStoreEnv(t checker) (context.Context, string, *Store, func()) {
pc, _, _, ok := runtime.Caller(1)
if !ok {
t.Fatal("failed to resolve caller")
@ -230,13 +225,15 @@ func contentStoreEnv(t checker) (string, *Store, func()) {
t.Fatal(err)
}
cs, err := Open(tmpdir)
cs, err := NewStore(tmpdir)
if err != nil {
os.RemoveAll(tmpdir)
t.Fatal(err)
}
return tmpdir, cs, func() {
ctx, cancel := context.WithCancel(context.Background())
return ctx, tmpdir, cs, func() {
cancel()
os.RemoveAll(tmpdir)
}
}
@ -253,10 +250,8 @@ func checkCopy(t checker, size int64, dst io.Writer, src io.Reader) {
}
func checkBlobPath(t *testing.T, cs *Store, dgst digest.Digest) string {
path, err := cs.GetPath(dgst)
if err != nil {
t.Fatal(err, dgst)
}
path := cs.blobPath(dgst)
if path != filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) {
t.Fatalf("unexpected path: %q", path)
}
@ -273,8 +268,8 @@ func checkBlobPath(t *testing.T, cs *Store, dgst digest.Digest) string {
return path
}
func checkWrite(t checker, cs *Store, dgst digest.Digest, p []byte) digest.Digest {
if err := WriteBlob(cs, bytes.NewReader(p), dgst.String(), int64(len(p)), dgst); err != nil {
func checkWrite(t checker, ctx context.Context, cs *Store, dgst digest.Digest, p []byte) digest.Digest {
if err := WriteBlob(ctx, cs, bytes.NewReader(p), dgst.String(), int64(len(p)), dgst); err != nil {
t.Fatal(err)
}

View file

@ -1,37 +1,14 @@
package content
import (
"context"
"io"
"os"
"io/ioutil"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
// Provider gives access to blob content by paths.
//
// Typically, this is implemented by `*Store`.
type Provider interface {
GetPath(dgst digest.Digest) (string, error)
}
// OpenBlob opens the blob for reading identified by dgst.
//
// The opened blob may also implement seek. Callers can detect with io.Seeker.
func OpenBlob(provider Provider, dgst digest.Digest) (io.ReadCloser, error) {
path, err := provider.GetPath(dgst)
if err != nil {
return nil, err
}
fp, err := os.Open(path)
return fp, err
}
type Ingester interface {
Begin(key string) (*Writer, error)
}
// WriteBlob writes data with the expected digest into the content store. If
// expected already exists, the method returns immediately and the reader will
// not be consumed.
@ -39,11 +16,23 @@ type Ingester interface {
// This is useful when the digest and size are known beforehand.
//
// Copy is buffered, so no need to wrap reader in buffered io.
func WriteBlob(cs Ingester, r io.Reader, ref string, size int64, expected digest.Digest) error {
cw, err := cs.Begin(ref)
func WriteBlob(ctx context.Context, cs Ingester, r io.Reader, ref string, size int64, expected digest.Digest) error {
cw, err := cs.Writer(ctx, ref)
if err != nil {
return err
}
ws, err := cw.Status()
if err != nil {
return err
}
if ws.Offset > 0 {
// Arbitrary limitation for now. We can detect io.Seeker on r and
// resume.
return errors.Errorf("cannot resume already started write")
}
buf := bufPool.Get().([]byte)
defer bufPool.Put(buf)
@ -62,3 +51,8 @@ func WriteBlob(cs Ingester, r io.Reader, ref string, size int64, expected digest
return nil
}
func readFileString(path string) (string, error) {
p, err := ioutil.ReadFile(path)
return string(p), err
}

View file

@ -1,10 +1,10 @@
package content
import (
"errors"
"sync"
"github.com/nightlyone/lockfile"
"github.com/pkg/errors"
)
// In addition to providing inter-process locks for content ingest, we also
@ -16,6 +16,8 @@ import (
// error reporting.
var (
errLocked = errors.New("key is locked")
// locks lets us lock in process, as well as output of process.
locks = map[lockfile.Lockfile]struct{}{}
locksMu sync.Mutex
@ -26,11 +28,15 @@ func tryLock(lock lockfile.Lockfile) error {
defer locksMu.Unlock()
if _, ok := locks[lock]; ok {
return errors.New("file in use")
return errLocked
}
if err := lock.TryLock(); err != nil {
return err
if errors.Cause(err) == lockfile.ErrBusy {
return errLocked
}
return errors.Wrapf(err, "lock.TryLock() encountered an error")
}
locks[lock] = struct{}{}

355
content/store.go Normal file
View file

@ -0,0 +1,355 @@
package content
import (
"context"
"io"
"io/ioutil"
"os"
"path/filepath"
"syscall"
"time"
"github.com/docker/containerd/log"
"github.com/nightlyone/lockfile"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
// Store is digest-keyed store for content. All data written into the store is
// stored under a verifiable digest.
//
// Store can generally support multi-reader, single-writer ingest of data,
// including resumable ingest.
type Store struct {
root string
}
func NewStore(root string) (*Store, error) {
if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil && !os.IsExist(err) {
return nil, err
}
return &Store{
root: root,
}, nil
}
func (s *Store) Info(dgst digest.Digest) (Info, error) {
p := s.blobPath(dgst)
fi, err := os.Stat(p)
if err != nil {
if os.IsNotExist(err) {
err = errNotFound
}
return Info{}, err
}
return Info{
Digest: dgst,
Size: fi.Size(),
CommittedAt: fi.ModTime(),
}, nil
}
// Open returns an io.ReadCloser for the blob.
//
// TODO(stevvooe): This would work much better as an io.ReaderAt in practice.
// Right now, we are doing type assertion to tease that out, but it won't scale
// well.
func (s *Store) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) {
fp, err := os.Open(s.blobPath(dgst))
if err != nil {
if os.IsNotExist(err) {
err = errNotFound
}
return nil, err
}
return fp, nil
}
// Delete removes a blob by its digest.
//
// While this is safe to do concurrently, safe exist-removal logic must hold
// some global lock on the store.
func (cs *Store) Delete(dgst digest.Digest) error {
if err := os.RemoveAll(cs.blobPath(dgst)); err != nil {
if !os.IsNotExist(err) {
return err
}
return nil
}
return nil
}
// TODO(stevvooe): Allow querying the set of blobs in the blob store.
// WalkFunc defines the callback for a blob walk.
//
// TODO(stevvooe): Remove the file info. Just need size and modtime. Perhaps,
// not a huge deal, considering we have a path, but let's not just let this one
// go without scrutiny.
type WalkFunc func(path string, fi os.FileInfo, dgst digest.Digest) error
func (cs *Store) Walk(fn WalkFunc) error {
root := filepath.Join(cs.root, "blobs")
var alg digest.Algorithm
return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if !fi.IsDir() && !alg.Available() {
return nil
}
// TODO(stevvooe): There are few more cases with subdirs that should be
// handled in case the layout gets corrupted. This isn't strict enough
// an may spew bad data.
if path == root {
return nil
}
if filepath.Dir(path) == root {
alg = digest.Algorithm(filepath.Base(path))
if !alg.Available() {
alg = ""
return filepath.SkipDir
}
// descending into a hash directory
return nil
}
dgst := digest.NewDigestFromHex(alg.String(), filepath.Base(path))
if err := dgst.Validate(); err != nil {
// log error but don't report
log.L.WithError(err).WithField("path", path).Error("invalid digest for blob path")
// if we see this, it could mean some sort of corruption of the
// store or extra paths not expected previously.
}
return fn(path, fi, dgst)
})
}
// Stat returns the current status of a blob by the ingest ref.
func (s *Store) Status(ref string) (Status, error) {
dp := filepath.Join(s.ingestRoot(ref), "data")
return s.status(dp)
}
// stat works like stat above except uses the path to the ingest.
func (s *Store) status(ingestPath string) (Status, error) {
dp := filepath.Join(ingestPath, "data")
fi, err := os.Stat(dp)
if err != nil {
return Status{}, err
}
ref, err := readFileString(filepath.Join(ingestPath, "ref"))
if err != nil {
return Status{}, err
}
var startedAt time.Time
if st, ok := fi.Sys().(*syscall.Stat_t); ok {
startedAt = time.Unix(st.Ctim.Sec, st.Ctim.Nsec)
} else {
startedAt = fi.ModTime()
}
return Status{
Ref: ref,
Offset: fi.Size(),
UpdatedAt: fi.ModTime(),
StartedAt: startedAt,
}, nil
}
// Writer begins or resumes the active writer identified by ref. If the writer
// is already in use, an error is returned. Only one writer may be in use per
// ref at a time.
//
// The argument `ref` is used to uniquely identify a long-lived writer transaction.
func (s *Store) Writer(ctx context.Context, ref string) (Writer, error) {
path, refp, data, lock, err := s.ingestPaths(ref)
if err != nil {
return nil, err
}
if err := tryLock(lock); err != nil {
if !os.IsNotExist(errors.Cause(err)) {
return nil, errors.Wrapf(err, "locking %v failed", ref)
}
// if it doesn't exist, we'll make it so below!
}
var (
digester = digest.Canonical.Digester()
offset int64
startedAt time.Time
updatedAt time.Time
)
// ensure that the ingest path has been created.
if err := os.Mkdir(path, 0755); err != nil {
if !os.IsExist(err) {
return nil, err
}
// validate that we have no collision for the ref.
refraw, err := readFileString(refp)
if err != nil {
return nil, errors.Wrap(err, "could not read ref")
}
if ref != refraw {
// NOTE(stevvooe): This is fairly catastrophic. Either we have some
// layout corruption or a hash collision for the ref key.
return nil, errors.Wrapf(err, "ref key does not match: %v != %v", ref, refraw)
}
// slow slow slow!!, send to goroutine or use resumable hashes
fp, err := os.Open(data)
if err != nil {
return nil, err
}
defer fp.Close()
p := bufPool.Get().([]byte)
defer bufPool.Put(p)
offset, err = io.CopyBuffer(digester.Hash(), fp, p)
if err != nil {
return nil, err
}
fi, err := os.Stat(data)
if err != nil {
return nil, err
}
updatedAt = fi.ModTime()
if st, ok := fi.Sys().(*syscall.Stat_t); ok {
startedAt = time.Unix(st.Ctim.Sec, st.Ctim.Nsec)
} else {
startedAt = updatedAt
}
} else {
// the ingest is new, we need to setup the target location.
// write the ref to a file for later use
if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil {
return nil, err
}
startedAt = time.Now()
updatedAt = startedAt
}
fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return nil, errors.Wrap(err, "failed to open data file")
}
return &writer{
s: s,
fp: fp,
lock: lock,
ref: ref,
path: path,
offset: offset,
digester: digester,
startedAt: startedAt,
updatedAt: updatedAt,
}, nil
}
// Abort an active transaction keyed by ref. If the ingest is active, it will
// be cancelled. Any resoures associated with the ingest will be cleaned.
func (s *Store) Abort(ref string) error {
root := s.ingestRoot(ref)
if err := os.RemoveAll(root); err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
return nil
}
func (s *Store) Active() ([]Status, error) {
fp, err := os.Open(filepath.Join(s.root, "ingest"))
if err != nil {
return nil, err
}
fis, err := fp.Readdir(-1)
if err != nil {
return nil, err
}
var active []Status
for _, fi := range fis {
p := filepath.Join(s.root, "ingest", fi.Name())
stat, err := s.status(p)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
// TODO(stevvooe): This is a common error if uploads are being
// completed while making this listing. Need to consider taking a
// lock on the whole store to coordinate this aspect.
//
// Another option is to cleanup downloads asynchronously and
// coordinate this method with the cleanup process.
//
// For now, we just skip them, as they really don't exist.
continue
}
active = append(active, stat)
}
return active, nil
}
func (cs *Store) blobPath(dgst digest.Digest) string {
return filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex())
}
func (s *Store) ingestRoot(ref string) string {
dgst := digest.FromString(ref)
return filepath.Join(s.root, "ingest", dgst.Hex())
}
// ingestPaths are returned, including the lockfile. The paths are the following:
//
// - root: entire ingest directory
// - ref: name of the starting ref, must be unique
// - data: file where data is written
// - lock: lock file location
//
func (s *Store) ingestPaths(ref string) (string, string, string, lockfile.Lockfile, error) {
var (
fp = s.ingestRoot(ref)
rp = filepath.Join(fp, "ref")
lp = filepath.Join(fp, "lock")
dp = filepath.Join(fp, "data")
)
lock, err := lockfile.New(lp)
if err != nil {
return "", "", "", "", errors.Wrapf(err, "error creating lockfile %v", lp)
}
return fp, rp, dp, lock, nil
}

View file

@ -4,54 +4,55 @@ import (
"log"
"os"
"path/filepath"
"time"
"github.com/nightlyone/lockfile"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
// Writer represents a write transaction against the blob store.
type Writer struct {
cs *Store
// writer represents a write transaction against the blob store.
type writer struct {
s *Store
fp *os.File // opened data file
lock lockfile.Lockfile
path string // path to writer dir
ref string // ref key
offset int64
digester digest.Digester
startedAt time.Time
updatedAt time.Time
}
func (cw *Writer) Ref() string {
return cw.ref
}
// Size returns the current size written.
//
// Cannot be called concurrently with `Write`. If you need need concurrent
// status, query it with `Store.Stat`.
func (cw *Writer) Size() int64 {
return cw.offset
func (w *writer) Status() (Status, error) {
return Status{
Ref: w.ref,
Offset: w.offset,
StartedAt: w.startedAt,
UpdatedAt: w.updatedAt,
}, nil
}
// Digest returns the current digest of the content, up to the current write.
//
// Cannot be called concurrently with `Write`.
func (cw *Writer) Digest() digest.Digest {
return cw.digester.Digest()
func (w *writer) Digest() digest.Digest {
return w.digester.Digest()
}
// Write p to the transaction.
//
// Note that writes are unbuffered to the backing file. When writing, it is
// recommended to wrap in a bufio.Writer or, preferably, use io.CopyBuffer.
func (cw *Writer) Write(p []byte) (n int, err error) {
n, err = cw.fp.Write(p)
cw.digester.Hash().Write(p[:n])
cw.offset += int64(len(p))
func (w *writer) Write(p []byte) (n int, err error) {
n, err = w.fp.Write(p)
w.digester.Hash().Write(p[:n])
w.offset += int64(len(p))
w.updatedAt = time.Now()
return n, err
}
func (cw *Writer) Commit(size int64, expected digest.Digest) error {
func (cw *writer) Commit(size int64, expected digest.Digest) error {
if err := cw.fp.Sync(); err != nil {
return errors.Wrap(err, "sync failed")
}
@ -85,7 +86,7 @@ func (cw *Writer) Commit(size int64, expected digest.Digest) error {
var (
ingest = filepath.Join(cw.path, "data")
target = cw.cs.blobPath(dgst)
target = cw.s.blobPath(dgst)
)
// make sure parent directories of blob exist
@ -118,7 +119,7 @@ func (cw *Writer) Commit(size int64, expected digest.Digest) error {
//
// To abandon a transaction completely, first call close then `Store.Remove` to
// clean up the associated resources.
func (cw *Writer) Close() (err error) {
func (cw *writer) Close() (err error) {
if err := unlock(cw.lock); err != nil {
log.Printf("unlock failed: %v", err)
}