diff --git a/content/content.go b/content/content.go index ecb8b9a..ab09a9f 100644 --- a/content/content.go +++ b/content/content.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/docker/distribution/digest" + "github.com/nightlyone/lockfile" "github.com/pkg/errors" ) @@ -40,6 +41,9 @@ func OpenContentStore(root string) (*ContentStore, error) { }, nil } +// TODO(stevvooe): Work out how we can export the status of an ongoing download. +// TODO(stevvooe): Allow querying the set of blobs in the blob store. + func (cs *ContentStore) GetPath(dgst digest.Digest) (string, error) { p := filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) if _, err := os.Stat(p); err != nil { @@ -61,7 +65,7 @@ func (cs *ContentStore) GetPath(dgst digest.Digest) (string, error) { // // TODO(stevvooe): Figure out minimum common set of characters, basically common func (cs *ContentStore) Begin(ref string) (*ContentWriter, error) { - path, data, err := cs.ingestPaths(ref) + path, data, lock, err := cs.ingestPaths(ref) if err != nil { return nil, err } @@ -72,6 +76,10 @@ func (cs *ContentStore) Begin(ref string) (*ContentWriter, error) { return nil, err } + if err := tryLock(lock); err != nil { + return nil, err + } + fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0666) if err != nil { return nil, errors.Wrap(err, "failed to open data file") @@ -87,27 +95,36 @@ func (cs *ContentStore) Begin(ref string) (*ContentWriter, error) { return &ContentWriter{ cs: cs, fp: fp, + lock: lock, path: path, digester: digest.Canonical.New(), }, nil } func (cs *ContentStore) Resume(ref string) (*ContentWriter, error) { - path, data, err := cs.ingestPaths(ref) + path, data, lock, err := cs.ingestPaths(ref) if err != nil { return nil, err } + if err := tryLock(lock); err != nil { + return nil, err + } + digester := digest.Canonical.New() - // slow slow slow!!, send to goroutine + // slow slow slow!!, send to goroutine or use resumable hashes fp, err := os.Open(data) - offset, err := io.Copy(digester.Hash(), fp) if err != nil { return nil, err } defer fp.Close() + offset, err := io.Copy(digester.Hash(), fp) + if err != nil { + return nil, err + } + fp1, err := os.OpenFile(data, os.O_WRONLY|os.O_APPEND, 0666) if err != nil { if os.IsNotExist(err) { @@ -120,29 +137,35 @@ func (cs *ContentStore) Resume(ref string) (*ContentWriter, error) { return &ContentWriter{ cs: cs, fp: fp1, + lock: lock, path: path, offset: offset, digester: digester, }, nil } -func (cs *ContentStore) ingestPaths(ref string) (string, string, error) { +func (cs *ContentStore) ingestPaths(ref string) (string, string, lockfile.Lockfile, error) { cref := filepath.Clean(ref) if cref != ref { - return "", "", errors.Errorf("invalid path after clean") + return "", "", "", errors.Errorf("invalid path after clean") } fp := filepath.Join(cs.root, "ingest", ref) // ensure we don't escape root if !strings.HasPrefix(fp, cs.root) { - return "", "", errors.Errorf("path %q escapes root", ref) + return "", "", "", errors.Errorf("path %q escapes root", ref) } // ensure we are just a single path component if ref != filepath.Base(fp) { - return "", "", errors.Errorf("ref must be a single path component") + return "", "", "", errors.Errorf("ref must be a single path component") } - return fp, filepath.Join(fp, "data"), nil + lock, err := lockfile.New(filepath.Join(fp, "lock")) + if err != nil { + return "", "", "", errors.Wrap(err, "error creating lockfile") + } + + return fp, filepath.Join(fp, "data"), lock, nil } diff --git a/content/content_test.go b/content/content_test.go index 4d7eeee..f6e3978 100644 --- a/content/content_test.go +++ b/content/content_test.go @@ -47,6 +47,13 @@ func TestContentWriter(t *testing.T) { t.Fatal(err) } + // make sure that second resume also fails + if _, err = cs.Resume("myref"); err == nil { + // TODO(stevvooe): This also works across processes. Need to find a way + // to test that, as well. + t.Fatal("no error on second resume") + } + p := make([]byte, 4<<20) if _, err := rand.Read(p); err != nil { t.Fatal(err) diff --git a/content/locks.go b/content/locks.go new file mode 100644 index 0000000..c7c5768 --- /dev/null +++ b/content/locks.go @@ -0,0 +1,50 @@ +package content + +import ( + "errors" + "sync" + + "github.com/nightlyone/lockfile" +) + +// In addition to providing inter-process locks for content ingest, we also +// define a global in process lock to prevent two goroutines writing to the +// same file. +// +// This is prety unsophisticated for now. In the future, we'd probably like to +// have more information about who is holding which locks, as well as better +// error reporting. + +var ( + // locks lets us lock in process, as well as output of process. + locks = map[lockfile.Lockfile]struct{}{} + locksMu sync.Mutex +) + +func tryLock(lock lockfile.Lockfile) error { + locksMu.Lock() + defer locksMu.Unlock() + + if _, ok := locks[lock]; ok { + return errors.New("file in use") + } + + if err := lock.TryLock(); err != nil { + return err + } + + locks[lock] = struct{}{} + return nil +} + +func unlock(lock lockfile.Lockfile) error { + locksMu.Lock() + defer locksMu.Unlock() + + if _, ok := locks[lock]; !ok { + return nil + } + + delete(locks, lock) + return lock.Unlock() +} diff --git a/content/writer.go b/content/writer.go index 83b2ed6..ea563f0 100644 --- a/content/writer.go +++ b/content/writer.go @@ -1,10 +1,12 @@ package content import ( + "log" "os" "path/filepath" "github.com/docker/distribution/digest" + "github.com/nightlyone/lockfile" "github.com/pkg/errors" ) @@ -14,7 +16,8 @@ import ( type ContentWriter struct { cs *ContentStore fp *os.File // opened data file - path string // path to writer dir + lock lockfile.Lockfile + path string // path to writer dir offset int64 digester digest.Digester } @@ -52,6 +55,10 @@ func (cw *ContentWriter) Commit(size int64, expected digest.Digest) error { return errors.Errorf("failed size validation: %v != %v", fi.Size(), size) } + if err := cw.fp.Close(); err != nil { + return errors.Wrap(err, "failed closing ingest") + } + dgst := cw.digester.Digest() if expected != dgst { return errors.Errorf("unexpected digest: %v != %v", dgst, expected) @@ -77,6 +84,8 @@ func (cw *ContentWriter) Commit(size int64, expected digest.Digest) error { return err } + unlock(cw.lock) + cw.fp = nil return nil } @@ -87,6 +96,14 @@ func (cw *ContentWriter) Commit(size int64, expected digest.Digest) error { // `ContentStore.Resume` using the same key. The write can then be continued // from it was left off. func (cw *ContentWriter) Close() (err error) { - cw.fp.Sync() - return cw.fp.Close() + if err := unlock(cw.lock); err != nil { + log.Printf("unlock failed: %v", err) + } + + if cw.fp != nil { + cw.fp.Sync() + return cw.fp.Close() + } + + return nil } diff --git a/layers.go b/layers.go index 5b502c2..096485b 100644 --- a/layers.go +++ b/layers.go @@ -285,6 +285,9 @@ type Change struct { Path string } +// TODO(stevvooe): Make this change emit through a Walk-like interface. We can +// see this patten used in several tar'ing methods in pkg/archive. + // Changes returns the list of changes from the diff's parent. func (lm *LayerManipulator) Changes(diff string) ([]Change, error) { return nil, errNotImplemented