content: add cross-process ingest locking

Allow content stores to ingest content without coordination of a daemon
to manage locks. Supports coordinated ingest and cross-process ingest
status.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2016-11-15 19:46:24 -08:00
parent f832e757f8
commit b6e446e7be
No known key found for this signature in database
GPG key ID: FB5F6B2905D7ECF3
5 changed files with 112 additions and 12 deletions

View file

@ -8,6 +8,7 @@ import (
"sync" "sync"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/nightlyone/lockfile"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -40,6 +41,9 @@ func OpenContentStore(root string) (*ContentStore, error) {
}, nil }, nil
} }
// TODO(stevvooe): Work out how we can export the status of an ongoing download.
// TODO(stevvooe): Allow querying the set of blobs in the blob store.
func (cs *ContentStore) GetPath(dgst digest.Digest) (string, error) { func (cs *ContentStore) GetPath(dgst digest.Digest) (string, error) {
p := filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) p := filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex())
if _, err := os.Stat(p); err != nil { if _, err := os.Stat(p); err != nil {
@ -61,7 +65,7 @@ func (cs *ContentStore) GetPath(dgst digest.Digest) (string, error) {
// //
// TODO(stevvooe): Figure out minimum common set of characters, basically common // TODO(stevvooe): Figure out minimum common set of characters, basically common
func (cs *ContentStore) Begin(ref string) (*ContentWriter, error) { func (cs *ContentStore) Begin(ref string) (*ContentWriter, error) {
path, data, err := cs.ingestPaths(ref) path, data, lock, err := cs.ingestPaths(ref)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -72,6 +76,10 @@ func (cs *ContentStore) Begin(ref string) (*ContentWriter, error) {
return nil, err return nil, err
} }
if err := tryLock(lock); err != nil {
return nil, err
}
fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0666) fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0666)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to open data file") return nil, errors.Wrap(err, "failed to open data file")
@ -87,27 +95,36 @@ func (cs *ContentStore) Begin(ref string) (*ContentWriter, error) {
return &ContentWriter{ return &ContentWriter{
cs: cs, cs: cs,
fp: fp, fp: fp,
lock: lock,
path: path, path: path,
digester: digest.Canonical.New(), digester: digest.Canonical.New(),
}, nil }, nil
} }
func (cs *ContentStore) Resume(ref string) (*ContentWriter, error) { func (cs *ContentStore) Resume(ref string) (*ContentWriter, error) {
path, data, err := cs.ingestPaths(ref) path, data, lock, err := cs.ingestPaths(ref)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := tryLock(lock); err != nil {
return nil, err
}
digester := digest.Canonical.New() digester := digest.Canonical.New()
// slow slow slow!!, send to goroutine // slow slow slow!!, send to goroutine or use resumable hashes
fp, err := os.Open(data) fp, err := os.Open(data)
offset, err := io.Copy(digester.Hash(), fp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer fp.Close() defer fp.Close()
offset, err := io.Copy(digester.Hash(), fp)
if err != nil {
return nil, err
}
fp1, err := os.OpenFile(data, os.O_WRONLY|os.O_APPEND, 0666) fp1, err := os.OpenFile(data, os.O_WRONLY|os.O_APPEND, 0666)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -120,29 +137,35 @@ func (cs *ContentStore) Resume(ref string) (*ContentWriter, error) {
return &ContentWriter{ return &ContentWriter{
cs: cs, cs: cs,
fp: fp1, fp: fp1,
lock: lock,
path: path, path: path,
offset: offset, offset: offset,
digester: digester, digester: digester,
}, nil }, nil
} }
func (cs *ContentStore) ingestPaths(ref string) (string, string, error) { func (cs *ContentStore) ingestPaths(ref string) (string, string, lockfile.Lockfile, error) {
cref := filepath.Clean(ref) cref := filepath.Clean(ref)
if cref != ref { if cref != ref {
return "", "", errors.Errorf("invalid path after clean") return "", "", "", errors.Errorf("invalid path after clean")
} }
fp := filepath.Join(cs.root, "ingest", ref) fp := filepath.Join(cs.root, "ingest", ref)
// ensure we don't escape root // ensure we don't escape root
if !strings.HasPrefix(fp, cs.root) { if !strings.HasPrefix(fp, cs.root) {
return "", "", errors.Errorf("path %q escapes root", ref) return "", "", "", errors.Errorf("path %q escapes root", ref)
} }
// ensure we are just a single path component // ensure we are just a single path component
if ref != filepath.Base(fp) { if ref != filepath.Base(fp) {
return "", "", errors.Errorf("ref must be a single path component") return "", "", "", errors.Errorf("ref must be a single path component")
} }
return fp, filepath.Join(fp, "data"), nil lock, err := lockfile.New(filepath.Join(fp, "lock"))
if err != nil {
return "", "", "", errors.Wrap(err, "error creating lockfile")
}
return fp, filepath.Join(fp, "data"), lock, nil
} }

View file

@ -47,6 +47,13 @@ func TestContentWriter(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// make sure that second resume also fails
if _, err = cs.Resume("myref"); err == nil {
// TODO(stevvooe): This also works across processes. Need to find a way
// to test that, as well.
t.Fatal("no error on second resume")
}
p := make([]byte, 4<<20) p := make([]byte, 4<<20)
if _, err := rand.Read(p); err != nil { if _, err := rand.Read(p); err != nil {
t.Fatal(err) t.Fatal(err)

50
content/locks.go Normal file
View file

@ -0,0 +1,50 @@
package content
import (
"errors"
"sync"
"github.com/nightlyone/lockfile"
)
// In addition to providing inter-process locks for content ingest, we also
// define a global in process lock to prevent two goroutines writing to the
// same file.
//
// This is prety unsophisticated for now. In the future, we'd probably like to
// have more information about who is holding which locks, as well as better
// error reporting.
var (
// locks lets us lock in process, as well as output of process.
locks = map[lockfile.Lockfile]struct{}{}
locksMu sync.Mutex
)
func tryLock(lock lockfile.Lockfile) error {
locksMu.Lock()
defer locksMu.Unlock()
if _, ok := locks[lock]; ok {
return errors.New("file in use")
}
if err := lock.TryLock(); err != nil {
return err
}
locks[lock] = struct{}{}
return nil
}
func unlock(lock lockfile.Lockfile) error {
locksMu.Lock()
defer locksMu.Unlock()
if _, ok := locks[lock]; !ok {
return nil
}
delete(locks, lock)
return lock.Unlock()
}

View file

@ -1,10 +1,12 @@
package content package content
import ( import (
"log"
"os" "os"
"path/filepath" "path/filepath"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/nightlyone/lockfile"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -14,7 +16,8 @@ import (
type ContentWriter struct { type ContentWriter struct {
cs *ContentStore cs *ContentStore
fp *os.File // opened data file fp *os.File // opened data file
path string // path to writer dir lock lockfile.Lockfile
path string // path to writer dir
offset int64 offset int64
digester digest.Digester digester digest.Digester
} }
@ -52,6 +55,10 @@ func (cw *ContentWriter) Commit(size int64, expected digest.Digest) error {
return errors.Errorf("failed size validation: %v != %v", fi.Size(), size) return errors.Errorf("failed size validation: %v != %v", fi.Size(), size)
} }
if err := cw.fp.Close(); err != nil {
return errors.Wrap(err, "failed closing ingest")
}
dgst := cw.digester.Digest() dgst := cw.digester.Digest()
if expected != dgst { if expected != dgst {
return errors.Errorf("unexpected digest: %v != %v", dgst, expected) return errors.Errorf("unexpected digest: %v != %v", dgst, expected)
@ -77,6 +84,8 @@ func (cw *ContentWriter) Commit(size int64, expected digest.Digest) error {
return err return err
} }
unlock(cw.lock)
cw.fp = nil
return nil return nil
} }
@ -87,6 +96,14 @@ func (cw *ContentWriter) Commit(size int64, expected digest.Digest) error {
// `ContentStore.Resume` using the same key. The write can then be continued // `ContentStore.Resume` using the same key. The write can then be continued
// from it was left off. // from it was left off.
func (cw *ContentWriter) Close() (err error) { func (cw *ContentWriter) Close() (err error) {
cw.fp.Sync() if err := unlock(cw.lock); err != nil {
return cw.fp.Close() log.Printf("unlock failed: %v", err)
}
if cw.fp != nil {
cw.fp.Sync()
return cw.fp.Close()
}
return nil
} }

View file

@ -285,6 +285,9 @@ type Change struct {
Path string Path string
} }
// TODO(stevvooe): Make this change emit through a Walk-like interface. We can
// see this patten used in several tar'ing methods in pkg/archive.
// Changes returns the list of changes from the diff's parent. // Changes returns the list of changes from the diff's parent.
func (lm *LayerManipulator) Changes(diff string) ([]Change, error) { func (lm *LayerManipulator) Changes(diff string) ([]Change, error) {
return nil, errNotImplemented return nil, errNotImplemented