content: cleanup package exports
The package exports are now cleaned up to remove a lot of stuttering in the API. We also remove direct mapping of refs to the filesystem, opting for a hash-based approach. This *does* affect insertion performance, since it requires more individual file ios. A benchmark baseline has been added and we can fix this later. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
edcf44ea57
commit
f27d43285a
4 changed files with 179 additions and 77 deletions
|
@ -2,9 +2,9 @@ package content
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/docker/containerd/log"
|
"github.com/docker/containerd/log"
|
||||||
|
@ -23,21 +23,21 @@ var (
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// ContentStore is digest-keyed store for content. All data written into the
|
// Store is digest-keyed store for content. All data written into the store is
|
||||||
// store is stored under a verifiable digest.
|
// stored under a verifiable digest.
|
||||||
//
|
//
|
||||||
// ContentStore can generally support multi-reader, single-writer ingest of
|
// Store can generally support multi-reader, single-writer ingest of data,
|
||||||
// data, including resumable ingest.
|
// including resumable ingest.
|
||||||
type ContentStore struct {
|
type Store struct {
|
||||||
root string
|
root string
|
||||||
}
|
}
|
||||||
|
|
||||||
func OpenContentStore(root string) (*ContentStore, error) {
|
func Open(root string) (*Store, error) {
|
||||||
if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil && !os.IsExist(err) {
|
if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil && !os.IsExist(err) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &ContentStore{
|
return &Store{
|
||||||
root: root,
|
root: root,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -48,8 +48,20 @@ type Status struct {
|
||||||
Meta interface{}
|
Meta interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ContentStore) Stat(ref string) (Status, error) {
|
func (cs *Store) Stat(ref string) (Status, error) {
|
||||||
dfi, err := os.Stat(filepath.Join(cs.root, "ingest", ref, "data"))
|
dp := filepath.Join(cs.ingestRoot(ref), "data")
|
||||||
|
return cs.stat(dp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// stat works like stat above except uses the path to the ingest.
|
||||||
|
func (cs *Store) stat(ingestPath string) (Status, error) {
|
||||||
|
dp := filepath.Join(ingestPath, "data")
|
||||||
|
dfi, err := os.Stat(dp)
|
||||||
|
if err != nil {
|
||||||
|
return Status{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ref, err := readFileString(filepath.Join(ingestPath, "ref"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Status{}, err
|
return Status{}, err
|
||||||
}
|
}
|
||||||
|
@ -58,9 +70,10 @@ func (cs *ContentStore) Stat(ref string) (Status, error) {
|
||||||
Ref: ref,
|
Ref: ref,
|
||||||
Size: dfi.Size(),
|
Size: dfi.Size(),
|
||||||
}, nil
|
}, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ContentStore) Active() ([]Status, error) {
|
func (cs *Store) Active() ([]Status, error) {
|
||||||
ip := filepath.Join(cs.root, "ingest")
|
ip := filepath.Join(cs.root, "ingest")
|
||||||
|
|
||||||
fp, err := os.Open(ip)
|
fp, err := os.Open(ip)
|
||||||
|
@ -75,7 +88,8 @@ func (cs *ContentStore) Active() ([]Status, error) {
|
||||||
|
|
||||||
var active []Status
|
var active []Status
|
||||||
for _, fi := range fis {
|
for _, fi := range fis {
|
||||||
stat, err := cs.Stat(fi.Name())
|
p := filepath.Join(ip, fi.Name())
|
||||||
|
stat, err := cs.stat(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !os.IsNotExist(err) {
|
if !os.IsNotExist(err) {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -100,7 +114,7 @@ func (cs *ContentStore) Active() ([]Status, error) {
|
||||||
|
|
||||||
// TODO(stevvooe): Allow querying the set of blobs in the blob store.
|
// TODO(stevvooe): Allow querying the set of blobs in the blob store.
|
||||||
|
|
||||||
func (cs *ContentStore) Walk(fn func(path string, dgst digest.Digest) error) error {
|
func (cs *Store) Walk(fn func(path string, dgst digest.Digest) error) error {
|
||||||
root := filepath.Join(cs.root, "blobs")
|
root := filepath.Join(cs.root, "blobs")
|
||||||
var alg digest.Algorithm
|
var alg digest.Algorithm
|
||||||
return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {
|
return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {
|
||||||
|
@ -138,7 +152,7 @@ func (cs *ContentStore) Walk(fn func(path string, dgst digest.Digest) error) err
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ContentStore) GetPath(dgst digest.Digest) (string, error) {
|
func (cs *Store) GetPath(dgst digest.Digest) (string, error) {
|
||||||
p := filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex())
|
p := filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex())
|
||||||
if _, err := os.Stat(p); err != nil {
|
if _, err := os.Stat(p); err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
|
@ -156,10 +170,8 @@ func (cs *ContentStore) GetPath(dgst digest.Digest) (string, error) {
|
||||||
// The argument `ref` is used to identify the transaction. It must be a valid
|
// The argument `ref` is used to identify the transaction. It must be a valid
|
||||||
// path component, meaning it has no `/` characters and no `:` (we'll ban
|
// path component, meaning it has no `/` characters and no `:` (we'll ban
|
||||||
// others fs characters, as needed).
|
// others fs characters, as needed).
|
||||||
//
|
func (cs *Store) Begin(ref string) (*Writer, error) {
|
||||||
// TODO(stevvooe): Figure out minimum common set of characters, basically common
|
path, refp, data, lock, err := cs.ingestPaths(ref)
|
||||||
func (cs *ContentStore) Begin(ref string) (*ContentWriter, error) {
|
|
||||||
path, data, lock, err := cs.ingestPaths(ref)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -174,6 +186,11 @@ func (cs *ContentStore) Begin(ref string) (*ContentWriter, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// write the ref to a file for later use
|
||||||
|
if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0666)
|
fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "failed to open data file")
|
return nil, errors.Wrap(err, "failed to open data file")
|
||||||
|
@ -186,7 +203,7 @@ func (cs *ContentStore) Begin(ref string) (*ContentWriter, error) {
|
||||||
return nil, errors.Wrap(err, "error opening for append")
|
return nil, errors.Wrap(err, "error opening for append")
|
||||||
}
|
}
|
||||||
|
|
||||||
return &ContentWriter{
|
return &Writer{
|
||||||
cs: cs,
|
cs: cs,
|
||||||
fp: fp,
|
fp: fp,
|
||||||
lock: lock,
|
lock: lock,
|
||||||
|
@ -195,8 +212,8 @@ func (cs *ContentStore) Begin(ref string) (*ContentWriter, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ContentStore) Resume(ref string) (*ContentWriter, error) {
|
func (cs *Store) Resume(ref string) (*Writer, error) {
|
||||||
path, data, lock, err := cs.ingestPaths(ref)
|
path, refp, data, lock, err := cs.ingestPaths(ref)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -205,6 +222,17 @@ func (cs *ContentStore) Resume(ref string) (*ContentWriter, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
refraw, err := readFileString(refp)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not read ref")
|
||||||
|
}
|
||||||
|
|
||||||
|
if ref != refraw {
|
||||||
|
// NOTE(stevvooe): This is fairly catastrophic. Either we have some
|
||||||
|
// layout corruption or a hash collision for the ref key.
|
||||||
|
return nil, errors.Wrapf(err, "ref key does not match: %v != %v", ref, refraw)
|
||||||
|
}
|
||||||
|
|
||||||
digester := digest.Canonical.Digester()
|
digester := digest.Canonical.Digester()
|
||||||
|
|
||||||
// slow slow slow!!, send to goroutine or use resumable hashes
|
// slow slow slow!!, send to goroutine or use resumable hashes
|
||||||
|
@ -228,39 +256,46 @@ func (cs *ContentStore) Resume(ref string) (*ContentWriter, error) {
|
||||||
return nil, errors.Wrap(err, "error opening for append")
|
return nil, errors.Wrap(err, "error opening for append")
|
||||||
}
|
}
|
||||||
|
|
||||||
return &ContentWriter{
|
return &Writer{
|
||||||
cs: cs,
|
cs: cs,
|
||||||
fp: fp1,
|
fp: fp1,
|
||||||
lock: lock,
|
lock: lock,
|
||||||
|
ref: ref,
|
||||||
path: path,
|
path: path,
|
||||||
offset: offset,
|
offset: offset,
|
||||||
digester: digester,
|
digester: digester,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ContentStore) ingestPaths(ref string) (string, string, lockfile.Lockfile, error) {
|
func (cs *Store) ingestRoot(ref string) string {
|
||||||
cref := filepath.Clean(ref)
|
dgst := digest.FromString(ref)
|
||||||
if cref != ref {
|
return filepath.Join(cs.root, "ingest", dgst.Hex())
|
||||||
return "", "", "", errors.Errorf("invalid path after clean")
|
}
|
||||||
}
|
|
||||||
|
// ingestPaths are returned, including the lockfile. The paths are the following:
|
||||||
fp := filepath.Join(cs.root, "ingest", ref)
|
//
|
||||||
|
// - root: entire ingest directory
|
||||||
// ensure we don't escape root
|
// - ref: name of the starting ref, must be unique
|
||||||
if !strings.HasPrefix(fp, cs.root) {
|
// - data: file where data is written
|
||||||
return "", "", "", errors.Errorf("path %q escapes root", ref)
|
// - lock: lock file location
|
||||||
}
|
//
|
||||||
|
func (cs *Store) ingestPaths(ref string) (string, string, string, lockfile.Lockfile, error) {
|
||||||
// ensure we are just a single path component
|
var (
|
||||||
if ref != filepath.Base(fp) {
|
fp = cs.ingestRoot(ref)
|
||||||
return "", "", "", errors.Errorf("ref must be a single path component")
|
rp = filepath.Join(fp, "ref")
|
||||||
}
|
lp = filepath.Join(fp, "lock")
|
||||||
|
dp = filepath.Join(fp, "data")
|
||||||
lockfilePath := filepath.Join(fp, "lock")
|
)
|
||||||
lock, err := lockfile.New(lockfilePath)
|
|
||||||
if err != nil {
|
lock, err := lockfile.New(lp)
|
||||||
return "", "", "", errors.Wrapf(err, "error creating lockfile %v", lockfilePath)
|
if err != nil {
|
||||||
}
|
return "", "", "", "", errors.Wrapf(err, "error creating lockfile %v", lp)
|
||||||
|
}
|
||||||
return fp, filepath.Join(fp, "data"), lock, nil
|
|
||||||
|
return fp, rp, dp, lock, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readFileString(path string) (string, error) {
|
||||||
|
p, err := ioutil.ReadFile(path)
|
||||||
|
return string(p), err
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,20 +120,12 @@ func TestWalkBlobs(t *testing.T) {
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
blobs = map[digest.Digest][]byte{}
|
blobs = populateBlobStore(t, cs, nblobs, maxsize)
|
||||||
expected = map[digest.Digest]struct{}{}
|
expected = map[digest.Digest]struct{}{}
|
||||||
found = map[digest.Digest]struct{}{}
|
found = map[digest.Digest]struct{}{}
|
||||||
)
|
)
|
||||||
|
|
||||||
for i := 0; i < nblobs; i++ {
|
for dgst := range blobs {
|
||||||
p := make([]byte, mrand.Intn(maxsize))
|
|
||||||
|
|
||||||
if _, err := rand.Read(p); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
dgst := checkWrite(t, cs, p)
|
|
||||||
blobs[dgst] = p
|
|
||||||
expected[dgst] = struct{}{}
|
expected[dgst] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,9 +144,73 @@ func TestWalkBlobs(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func contentStoreEnv(t interface {
|
// BenchmarkIngests checks the insertion time over varying blob sizes.
|
||||||
|
//
|
||||||
|
// Note that at the time of writing there is roughly a 4ms insertion overhead
|
||||||
|
// for blobs. This seems to be due to the number of syscalls and file io we do
|
||||||
|
// coordinating the ingestion.
|
||||||
|
func BenchmarkIngests(b *testing.B) {
|
||||||
|
_, cs, cleanup := contentStoreEnv(b)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
for _, size := range []int64{
|
||||||
|
1 << 10,
|
||||||
|
4 << 10,
|
||||||
|
512 << 10,
|
||||||
|
1 << 20,
|
||||||
|
} {
|
||||||
|
size := size
|
||||||
|
b.Run(fmt.Sprint(size), func(b *testing.B) {
|
||||||
|
b.StopTimer()
|
||||||
|
blobs := generateBlobs(b, int64(b.N), size)
|
||||||
|
|
||||||
|
var bytes int64
|
||||||
|
for _, blob := range blobs {
|
||||||
|
bytes += int64(len(blob))
|
||||||
|
}
|
||||||
|
b.SetBytes(bytes)
|
||||||
|
|
||||||
|
b.StartTimer()
|
||||||
|
|
||||||
|
for dgst, p := range blobs {
|
||||||
|
checkWrite(b, cs, dgst, p)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type checker interface {
|
||||||
Fatal(args ...interface{})
|
Fatal(args ...interface{})
|
||||||
}) (string, *ContentStore, func()) {
|
}
|
||||||
|
|
||||||
|
func generateBlobs(t checker, nblobs, maxsize int64) map[digest.Digest][]byte {
|
||||||
|
blobs := map[digest.Digest][]byte{}
|
||||||
|
|
||||||
|
for i := int64(0); i < nblobs; i++ {
|
||||||
|
p := make([]byte, mrand.Int63n(maxsize))
|
||||||
|
|
||||||
|
if _, err := rand.Read(p); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dgst := digest.FromBytes(p)
|
||||||
|
blobs[dgst] = p
|
||||||
|
}
|
||||||
|
|
||||||
|
return blobs
|
||||||
|
}
|
||||||
|
|
||||||
|
func populateBlobStore(t checker, cs *Store, nblobs, maxsize int64) map[digest.Digest][]byte {
|
||||||
|
blobs := generateBlobs(t, nblobs, maxsize)
|
||||||
|
|
||||||
|
for dgst, p := range blobs {
|
||||||
|
checkWrite(t, cs, dgst, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
return blobs
|
||||||
|
}
|
||||||
|
|
||||||
|
func contentStoreEnv(t checker) (string, *Store, func()) {
|
||||||
pc, _, _, ok := runtime.Caller(1)
|
pc, _, _, ok := runtime.Caller(1)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatal("failed to resolve caller")
|
t.Fatal("failed to resolve caller")
|
||||||
|
@ -166,7 +222,7 @@ func contentStoreEnv(t interface {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cs, err := OpenContentStore(tmpdir)
|
cs, err := Open(tmpdir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
os.RemoveAll(tmpdir)
|
os.RemoveAll(tmpdir)
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -177,9 +233,7 @@ func contentStoreEnv(t interface {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkCopy(t interface {
|
func checkCopy(t checker, size int64, dst io.Writer, src io.Reader) {
|
||||||
Fatal(args ...interface{})
|
|
||||||
}, size int64, dst io.Writer, src io.Reader) {
|
|
||||||
nn, err := io.Copy(dst, src)
|
nn, err := io.Copy(dst, src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -190,7 +244,7 @@ func checkCopy(t interface {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkBlobPath(t *testing.T, cs *ContentStore, dgst digest.Digest) string {
|
func checkBlobPath(t *testing.T, cs *Store, dgst digest.Digest) string {
|
||||||
path, err := cs.GetPath(dgst)
|
path, err := cs.GetPath(dgst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err, dgst)
|
t.Fatal(err, dgst)
|
||||||
|
@ -211,8 +265,7 @@ func checkBlobPath(t *testing.T, cs *ContentStore, dgst digest.Digest) string {
|
||||||
return path
|
return path
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkWrite(t *testing.T, cs *ContentStore, p []byte) digest.Digest {
|
func checkWrite(t checker, cs *Store, dgst digest.Digest, p []byte) digest.Digest {
|
||||||
dgst := digest.FromBytes(p)
|
|
||||||
if err := WriteBlob(cs, bytes.NewReader(p), int64(len(p)), dgst); err != nil {
|
if err := WriteBlob(cs, bytes.NewReader(p), int64(len(p)), dgst); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,11 +8,18 @@ import (
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Provider gives access to blob content by paths.
|
||||||
|
//
|
||||||
|
// Typically, this is implemented by `*Store`.
|
||||||
|
type Provider interface {
|
||||||
|
GetPath(dgst digest.Digest) (string, error)
|
||||||
|
}
|
||||||
|
|
||||||
// OpenBlob opens the blob for reading identified by dgst.
|
// OpenBlob opens the blob for reading identified by dgst.
|
||||||
//
|
//
|
||||||
// The opened blob may also implement seek. Callers can detect with io.Seeker.
|
// The opened blob may also implement seek. Callers can detect with io.Seeker.
|
||||||
func OpenBlob(cs *ContentStore, dgst digest.Digest) (io.ReadCloser, error) {
|
func OpenBlob(provider Provider, dgst digest.Digest) (io.ReadCloser, error) {
|
||||||
path, err := cs.GetPath(dgst)
|
path, err := provider.GetPath(dgst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -21,6 +28,10 @@ func OpenBlob(cs *ContentStore, dgst digest.Digest) (io.ReadCloser, error) {
|
||||||
return fp, err
|
return fp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Ingester interface {
|
||||||
|
Begin(key string) (*Writer, error)
|
||||||
|
}
|
||||||
|
|
||||||
// WriteBlob writes data with the expected digest into the content store. If
|
// WriteBlob writes data with the expected digest into the content store. If
|
||||||
// expected already exists, the method returns immediately and the reader will
|
// expected already exists, the method returns immediately and the reader will
|
||||||
// not be consumed.
|
// not be consumed.
|
||||||
|
@ -28,7 +39,7 @@ func OpenBlob(cs *ContentStore, dgst digest.Digest) (io.ReadCloser, error) {
|
||||||
// This is useful when the digest and size are known beforehand.
|
// This is useful when the digest and size are known beforehand.
|
||||||
//
|
//
|
||||||
// Copy is buffered, so no need to wrap reader in buffered io.
|
// Copy is buffered, so no need to wrap reader in buffered io.
|
||||||
func WriteBlob(cs *ContentStore, r io.Reader, size int64, expected digest.Digest) error {
|
func WriteBlob(cs Ingester, r io.Reader, size int64, expected digest.Digest) error {
|
||||||
cw, err := cs.Begin(expected.Hex())
|
cw, err := cs.Begin(expected.Hex())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -10,29 +10,32 @@ import (
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ContentWriter represents a write transaction against the blob store.
|
// Writer represents a write transaction against the blob store.
|
||||||
//
|
type Writer struct {
|
||||||
//
|
cs *Store
|
||||||
type ContentWriter struct {
|
|
||||||
cs *ContentStore
|
|
||||||
fp *os.File // opened data file
|
fp *os.File // opened data file
|
||||||
lock lockfile.Lockfile
|
lock lockfile.Lockfile
|
||||||
path string // path to writer dir
|
path string // path to writer dir
|
||||||
|
ref string // ref key
|
||||||
offset int64
|
offset int64
|
||||||
digester digest.Digester
|
digester digest.Digester
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cw *Writer) Ref() string {
|
||||||
|
return cw.ref
|
||||||
|
}
|
||||||
|
|
||||||
// Write p to the transaction.
|
// Write p to the transaction.
|
||||||
//
|
//
|
||||||
// Note that writes are unbuffered to the backing file. When writing, it is
|
// Note that writes are unbuffered to the backing file. When writing, it is
|
||||||
// recommended to wrap in a bufio.Writer or, preferably, use io.CopyBuffer.
|
// recommended to wrap in a bufio.Writer or, preferably, use io.CopyBuffer.
|
||||||
func (cw *ContentWriter) Write(p []byte) (n int, err error) {
|
func (cw *Writer) Write(p []byte) (n int, err error) {
|
||||||
n, err = cw.fp.Write(p)
|
n, err = cw.fp.Write(p)
|
||||||
cw.digester.Hash().Write(p[:n])
|
cw.digester.Hash().Write(p[:n])
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cw *ContentWriter) Commit(size int64, expected digest.Digest) error {
|
func (cw *Writer) Commit(size int64, expected digest.Digest) error {
|
||||||
if err := cw.fp.Sync(); err != nil {
|
if err := cw.fp.Sync(); err != nil {
|
||||||
return errors.Wrap(err, "sync failed")
|
return errors.Wrap(err, "sync failed")
|
||||||
}
|
}
|
||||||
|
@ -97,7 +100,7 @@ func (cw *ContentWriter) Commit(size int64, expected digest.Digest) error {
|
||||||
// If one needs to resume the transaction, a new writer can be obtained from
|
// If one needs to resume the transaction, a new writer can be obtained from
|
||||||
// `ContentStore.Resume` using the same key. The write can then be continued
|
// `ContentStore.Resume` using the same key. The write can then be continued
|
||||||
// from it was left off.
|
// from it was left off.
|
||||||
func (cw *ContentWriter) Close() (err error) {
|
func (cw *Writer) Close() (err error) {
|
||||||
if err := unlock(cw.lock); err != nil {
|
if err := unlock(cw.lock); err != nil {
|
||||||
log.Printf("unlock failed: %v", err)
|
log.Printf("unlock failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue