diff --git a/content/content.go b/content/content.go index c4318b3..1f2a2df 100644 --- a/content/content.go +++ b/content/content.go @@ -2,9 +2,9 @@ package content import ( "io" + "io/ioutil" "os" "path/filepath" - "strings" "sync" "github.com/docker/containerd/log" @@ -23,21 +23,21 @@ var ( } ) -// ContentStore is digest-keyed store for content. All data written into the -// store is stored under a verifiable digest. +// Store is digest-keyed store for content. All data written into the store is +// stored under a verifiable digest. // -// ContentStore can generally support multi-reader, single-writer ingest of -// data, including resumable ingest. -type ContentStore struct { +// Store can generally support multi-reader, single-writer ingest of data, +// including resumable ingest. +type Store struct { root string } -func OpenContentStore(root string) (*ContentStore, error) { +func Open(root string) (*Store, error) { if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil && !os.IsExist(err) { return nil, err } - return &ContentStore{ + return &Store{ root: root, }, nil } @@ -48,8 +48,20 @@ type Status struct { Meta interface{} } -func (cs *ContentStore) Stat(ref string) (Status, error) { - dfi, err := os.Stat(filepath.Join(cs.root, "ingest", ref, "data")) +func (cs *Store) Stat(ref string) (Status, error) { + dp := filepath.Join(cs.ingestRoot(ref), "data") + return cs.stat(dp) +} + +// stat works like stat above except uses the path to the ingest. +func (cs *Store) stat(ingestPath string) (Status, error) { + dp := filepath.Join(ingestPath, "data") + dfi, err := os.Stat(dp) + if err != nil { + return Status{}, err + } + + ref, err := readFileString(filepath.Join(ingestPath, "ref")) if err != nil { return Status{}, err } @@ -58,9 +70,10 @@ func (cs *ContentStore) Stat(ref string) (Status, error) { Ref: ref, Size: dfi.Size(), }, nil + } -func (cs *ContentStore) Active() ([]Status, error) { +func (cs *Store) Active() ([]Status, error) { ip := filepath.Join(cs.root, "ingest") fp, err := os.Open(ip) @@ -75,7 +88,8 @@ func (cs *ContentStore) Active() ([]Status, error) { var active []Status for _, fi := range fis { - stat, err := cs.Stat(fi.Name()) + p := filepath.Join(ip, fi.Name()) + stat, err := cs.stat(p) if err != nil { if !os.IsNotExist(err) { return nil, err @@ -100,7 +114,7 @@ func (cs *ContentStore) Active() ([]Status, error) { // TODO(stevvooe): Allow querying the set of blobs in the blob store. -func (cs *ContentStore) Walk(fn func(path string, dgst digest.Digest) error) error { +func (cs *Store) Walk(fn func(path string, dgst digest.Digest) error) error { root := filepath.Join(cs.root, "blobs") var alg digest.Algorithm return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error { @@ -138,7 +152,7 @@ func (cs *ContentStore) Walk(fn func(path string, dgst digest.Digest) error) err }) } -func (cs *ContentStore) GetPath(dgst digest.Digest) (string, error) { +func (cs *Store) GetPath(dgst digest.Digest) (string, error) { p := filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) if _, err := os.Stat(p); err != nil { if os.IsNotExist(err) { @@ -156,10 +170,8 @@ func (cs *ContentStore) GetPath(dgst digest.Digest) (string, error) { // The argument `ref` is used to identify the transaction. It must be a valid // path component, meaning it has no `/` characters and no `:` (we'll ban // others fs characters, as needed). -// -// TODO(stevvooe): Figure out minimum common set of characters, basically common -func (cs *ContentStore) Begin(ref string) (*ContentWriter, error) { - path, data, lock, err := cs.ingestPaths(ref) +func (cs *Store) Begin(ref string) (*Writer, error) { + path, refp, data, lock, err := cs.ingestPaths(ref) if err != nil { return nil, err } @@ -174,6 +186,11 @@ func (cs *ContentStore) Begin(ref string) (*ContentWriter, error) { return nil, err } + // write the ref to a file for later use + if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil { + return nil, err + } + fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0666) if err != nil { return nil, errors.Wrap(err, "failed to open data file") @@ -186,7 +203,7 @@ func (cs *ContentStore) Begin(ref string) (*ContentWriter, error) { return nil, errors.Wrap(err, "error opening for append") } - return &ContentWriter{ + return &Writer{ cs: cs, fp: fp, lock: lock, @@ -195,8 +212,8 @@ func (cs *ContentStore) Begin(ref string) (*ContentWriter, error) { }, nil } -func (cs *ContentStore) Resume(ref string) (*ContentWriter, error) { - path, data, lock, err := cs.ingestPaths(ref) +func (cs *Store) Resume(ref string) (*Writer, error) { + path, refp, data, lock, err := cs.ingestPaths(ref) if err != nil { return nil, err } @@ -205,6 +222,17 @@ func (cs *ContentStore) Resume(ref string) (*ContentWriter, error) { return nil, err } + refraw, err := readFileString(refp) + if err != nil { + return nil, errors.Wrap(err, "could not read ref") + } + + if ref != refraw { + // NOTE(stevvooe): This is fairly catastrophic. Either we have some + // layout corruption or a hash collision for the ref key. + return nil, errors.Wrapf(err, "ref key does not match: %v != %v", ref, refraw) + } + digester := digest.Canonical.Digester() // slow slow slow!!, send to goroutine or use resumable hashes @@ -228,39 +256,46 @@ func (cs *ContentStore) Resume(ref string) (*ContentWriter, error) { return nil, errors.Wrap(err, "error opening for append") } - return &ContentWriter{ + return &Writer{ cs: cs, fp: fp1, lock: lock, + ref: ref, path: path, offset: offset, digester: digester, }, nil } -func (cs *ContentStore) ingestPaths(ref string) (string, string, lockfile.Lockfile, error) { - cref := filepath.Clean(ref) - if cref != ref { - return "", "", "", errors.Errorf("invalid path after clean") - } - - fp := filepath.Join(cs.root, "ingest", ref) - - // ensure we don't escape root - if !strings.HasPrefix(fp, cs.root) { - return "", "", "", errors.Errorf("path %q escapes root", ref) - } - - // ensure we are just a single path component - if ref != filepath.Base(fp) { - return "", "", "", errors.Errorf("ref must be a single path component") - } - - lockfilePath := filepath.Join(fp, "lock") - lock, err := lockfile.New(lockfilePath) - if err != nil { - return "", "", "", errors.Wrapf(err, "error creating lockfile %v", lockfilePath) - } - - return fp, filepath.Join(fp, "data"), lock, nil +func (cs *Store) ingestRoot(ref string) string { + dgst := digest.FromString(ref) + return filepath.Join(cs.root, "ingest", dgst.Hex()) +} + +// ingestPaths are returned, including the lockfile. The paths are the following: +// +// - root: entire ingest directory +// - ref: name of the starting ref, must be unique +// - data: file where data is written +// - lock: lock file location +// +func (cs *Store) ingestPaths(ref string) (string, string, string, lockfile.Lockfile, error) { + var ( + fp = cs.ingestRoot(ref) + rp = filepath.Join(fp, "ref") + lp = filepath.Join(fp, "lock") + dp = filepath.Join(fp, "data") + ) + + lock, err := lockfile.New(lp) + if err != nil { + return "", "", "", "", errors.Wrapf(err, "error creating lockfile %v", lp) + } + + return fp, rp, dp, lock, nil +} + +func readFileString(path string) (string, error) { + p, err := ioutil.ReadFile(path) + return string(p), err } diff --git a/content/content_test.go b/content/content_test.go index fe0e562..50bfff9 100644 --- a/content/content_test.go +++ b/content/content_test.go @@ -120,20 +120,12 @@ func TestWalkBlobs(t *testing.T) { ) var ( - blobs = map[digest.Digest][]byte{} + blobs = populateBlobStore(t, cs, nblobs, maxsize) expected = map[digest.Digest]struct{}{} found = map[digest.Digest]struct{}{} ) - for i := 0; i < nblobs; i++ { - p := make([]byte, mrand.Intn(maxsize)) - - if _, err := rand.Read(p); err != nil { - t.Fatal(err) - } - - dgst := checkWrite(t, cs, p) - blobs[dgst] = p + for dgst := range blobs { expected[dgst] = struct{}{} } @@ -152,9 +144,73 @@ func TestWalkBlobs(t *testing.T) { } } -func contentStoreEnv(t interface { +// BenchmarkIngests checks the insertion time over varying blob sizes. +// +// Note that at the time of writing there is roughly a 4ms insertion overhead +// for blobs. This seems to be due to the number of syscalls and file io we do +// coordinating the ingestion. +func BenchmarkIngests(b *testing.B) { + _, cs, cleanup := contentStoreEnv(b) + defer cleanup() + + for _, size := range []int64{ + 1 << 10, + 4 << 10, + 512 << 10, + 1 << 20, + } { + size := size + b.Run(fmt.Sprint(size), func(b *testing.B) { + b.StopTimer() + blobs := generateBlobs(b, int64(b.N), size) + + var bytes int64 + for _, blob := range blobs { + bytes += int64(len(blob)) + } + b.SetBytes(bytes) + + b.StartTimer() + + for dgst, p := range blobs { + checkWrite(b, cs, dgst, p) + } + }) + } +} + +type checker interface { Fatal(args ...interface{}) -}) (string, *ContentStore, func()) { +} + +func generateBlobs(t checker, nblobs, maxsize int64) map[digest.Digest][]byte { + blobs := map[digest.Digest][]byte{} + + for i := int64(0); i < nblobs; i++ { + p := make([]byte, mrand.Int63n(maxsize)) + + if _, err := rand.Read(p); err != nil { + t.Fatal(err) + } + + dgst := digest.FromBytes(p) + blobs[dgst] = p + } + + return blobs +} + +func populateBlobStore(t checker, cs *Store, nblobs, maxsize int64) map[digest.Digest][]byte { + blobs := generateBlobs(t, nblobs, maxsize) + + for dgst, p := range blobs { + checkWrite(t, cs, dgst, p) + } + + return blobs +} + +func contentStoreEnv(t checker) (string, *Store, func()) { pc, _, _, ok := runtime.Caller(1) if !ok { t.Fatal("failed to resolve caller") @@ -166,7 +222,7 @@ func contentStoreEnv(t interface { t.Fatal(err) } - cs, err := OpenContentStore(tmpdir) + cs, err := Open(tmpdir) if err != nil { os.RemoveAll(tmpdir) t.Fatal(err) @@ -177,9 +233,7 @@ func contentStoreEnv(t interface { } } -func checkCopy(t interface { - Fatal(args ...interface{}) -}, size int64, dst io.Writer, src io.Reader) { +func checkCopy(t checker, size int64, dst io.Writer, src io.Reader) { nn, err := io.Copy(dst, src) if err != nil { t.Fatal(err) @@ -190,7 +244,7 @@ func checkCopy(t interface { } } -func checkBlobPath(t *testing.T, cs *ContentStore, dgst digest.Digest) string { +func checkBlobPath(t *testing.T, cs *Store, dgst digest.Digest) string { path, err := cs.GetPath(dgst) if err != nil { t.Fatal(err, dgst) @@ -211,8 +265,7 @@ func checkBlobPath(t *testing.T, cs *ContentStore, dgst digest.Digest) string { return path } -func checkWrite(t *testing.T, cs *ContentStore, p []byte) digest.Digest { - dgst := digest.FromBytes(p) +func checkWrite(t checker, cs *Store, dgst digest.Digest, p []byte) digest.Digest { if err := WriteBlob(cs, bytes.NewReader(p), int64(len(p)), dgst); err != nil { t.Fatal(err) } diff --git a/content/helpers.go b/content/helpers.go index a4e76c6..111dc55 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -8,11 +8,18 @@ import ( "github.com/pkg/errors" ) +// Provider gives access to blob content by paths. +// +// Typically, this is implemented by `*Store`. +type Provider interface { + GetPath(dgst digest.Digest) (string, error) +} + // OpenBlob opens the blob for reading identified by dgst. // // The opened blob may also implement seek. Callers can detect with io.Seeker. -func OpenBlob(cs *ContentStore, dgst digest.Digest) (io.ReadCloser, error) { - path, err := cs.GetPath(dgst) +func OpenBlob(provider Provider, dgst digest.Digest) (io.ReadCloser, error) { + path, err := provider.GetPath(dgst) if err != nil { return nil, err } @@ -21,6 +28,10 @@ func OpenBlob(cs *ContentStore, dgst digest.Digest) (io.ReadCloser, error) { return fp, err } +type Ingester interface { + Begin(key string) (*Writer, error) +} + // WriteBlob writes data with the expected digest into the content store. If // expected already exists, the method returns immediately and the reader will // not be consumed. @@ -28,7 +39,7 @@ func OpenBlob(cs *ContentStore, dgst digest.Digest) (io.ReadCloser, error) { // This is useful when the digest and size are known beforehand. // // Copy is buffered, so no need to wrap reader in buffered io. -func WriteBlob(cs *ContentStore, r io.Reader, size int64, expected digest.Digest) error { +func WriteBlob(cs Ingester, r io.Reader, size int64, expected digest.Digest) error { cw, err := cs.Begin(expected.Hex()) if err != nil { return err diff --git a/content/writer.go b/content/writer.go index 3a36636..231f68a 100644 --- a/content/writer.go +++ b/content/writer.go @@ -10,29 +10,32 @@ import ( "github.com/pkg/errors" ) -// ContentWriter represents a write transaction against the blob store. -// -// -type ContentWriter struct { - cs *ContentStore +// Writer represents a write transaction against the blob store. +type Writer struct { + cs *Store fp *os.File // opened data file lock lockfile.Lockfile path string // path to writer dir + ref string // ref key offset int64 digester digest.Digester } +func (cw *Writer) Ref() string { + return cw.ref +} + // Write p to the transaction. // // Note that writes are unbuffered to the backing file. When writing, it is // recommended to wrap in a bufio.Writer or, preferably, use io.CopyBuffer. -func (cw *ContentWriter) Write(p []byte) (n int, err error) { +func (cw *Writer) Write(p []byte) (n int, err error) { n, err = cw.fp.Write(p) cw.digester.Hash().Write(p[:n]) return n, err } -func (cw *ContentWriter) Commit(size int64, expected digest.Digest) error { +func (cw *Writer) Commit(size int64, expected digest.Digest) error { if err := cw.fp.Sync(); err != nil { return errors.Wrap(err, "sync failed") } @@ -97,7 +100,7 @@ func (cw *ContentWriter) Commit(size int64, expected digest.Digest) error { // If one needs to resume the transaction, a new writer can be obtained from // `ContentStore.Resume` using the same key. The write can then be continued // from it was left off. -func (cw *ContentWriter) Close() (err error) { +func (cw *Writer) Close() (err error) { if err := unlock(cw.lock); err != nil { log.Printf("unlock failed: %v", err) }