content: cleanup service and interfaces
After implementing pull, a few changes are required to the content store interface to make sure that the implementation works smoothly. Specifically, we work to make sure the predeclaration path for digests works the same between remote and local writers. Before, we were hesitent to require the the size and digest up front, but it became clear that having this provided significant benefit. There are also several cleanups related to naming. We now call the expected digest `Expected` consistently across the board and `Total` is used to mark the expected size. This whole effort comes together to provide a very smooth status reporting workflow for image pull and push. This will be more obvious when the bulk of pull code lands. There are a few other changes to make `content.WriteBlob` more broadly useful. In accordance with addition for predeclaring expected size when getting a `Writer`, `WriteBlob` now supports this fully. It will also resume downloads if provided an `io.Seeker` or `io.ReaderAt`. Coupled with the `httpReadSeeker` from `docker/distribution`, we should only be a lines of code away from resumable downloads. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
0a5544d8c4
commit
c062a85782
15 changed files with 568 additions and 337 deletions
|
@ -4,6 +4,9 @@ import (
|
|||
"context"
|
||||
"io"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
|
||||
contentapi "github.com/docker/containerd/api/services/content"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -83,10 +86,10 @@ type remoteIngester struct {
|
|||
client contentapi.ContentClient
|
||||
}
|
||||
|
||||
func (ri *remoteIngester) Writer(ctx context.Context, ref string) (Writer, error) {
|
||||
wrclient, offset, err := ri.negotiate(ctx, ref)
|
||||
func (ri *remoteIngester) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (Writer, error) {
|
||||
wrclient, offset, err := ri.negotiate(ctx, ref, size, expected)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, rewriteGRPCError(err)
|
||||
}
|
||||
|
||||
return &remoteWriter{
|
||||
|
@ -95,15 +98,17 @@ func (ri *remoteIngester) Writer(ctx context.Context, ref string) (Writer, error
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (ri *remoteIngester) negotiate(ctx context.Context, ref string) (contentapi.Content_WriteClient, int64, error) {
|
||||
func (ri *remoteIngester) negotiate(ctx context.Context, ref string, size int64, expected digest.Digest) (contentapi.Content_WriteClient, int64, error) {
|
||||
wrclient, err := ri.client.Write(ctx)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if err := wrclient.Send(&contentapi.WriteRequest{
|
||||
Action: contentapi.WriteActionStat,
|
||||
Ref: ref,
|
||||
Action: contentapi.WriteActionStat,
|
||||
Ref: ref,
|
||||
Total: size,
|
||||
Expected: expected,
|
||||
}); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
@ -192,12 +197,12 @@ func (rw *remoteWriter) Write(p []byte) (n int, err error) {
|
|||
|
||||
func (rw *remoteWriter) Commit(size int64, expected digest.Digest) error {
|
||||
resp, err := rw.send(&contentapi.WriteRequest{
|
||||
Action: contentapi.WriteActionCommit,
|
||||
ExpectedSize: size,
|
||||
ExpectedDigest: expected,
|
||||
Action: contentapi.WriteActionCommit,
|
||||
Total: size,
|
||||
Expected: expected,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
return rewriteGRPCError(err)
|
||||
}
|
||||
|
||||
if size != 0 && resp.Offset != size {
|
||||
|
@ -205,7 +210,7 @@ func (rw *remoteWriter) Commit(size int64, expected digest.Digest) error {
|
|||
}
|
||||
|
||||
if expected != "" && resp.Digest != expected {
|
||||
return errors.New("unexpected digest")
|
||||
return errors.Errorf("unexpected digest: %v != %v", resp.Digest, expected)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -214,3 +219,14 @@ func (rw *remoteWriter) Commit(size int64, expected digest.Digest) error {
|
|||
func (rw *remoteWriter) Close() error {
|
||||
return rw.client.CloseSend()
|
||||
}
|
||||
|
||||
func rewriteGRPCError(err error) error {
|
||||
switch grpc.Code(errors.Cause(err)) {
|
||||
case codes.AlreadyExists:
|
||||
return errExists
|
||||
case codes.NotFound:
|
||||
return errNotFound
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
|
||||
var (
|
||||
errNotFound = errors.New("content: not found")
|
||||
errExists = errors.New("content: exists")
|
||||
|
||||
BufPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
|
@ -33,6 +34,7 @@ type Provider interface {
|
|||
type Status struct {
|
||||
Ref string
|
||||
Offset int64
|
||||
Total int64
|
||||
StartedAt time.Time
|
||||
UpdatedAt time.Time
|
||||
}
|
||||
|
@ -45,9 +47,13 @@ type Writer interface {
|
|||
}
|
||||
|
||||
type Ingester interface {
|
||||
Writer(ctx context.Context, ref string) (Writer, error)
|
||||
Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (Writer, error)
|
||||
}
|
||||
|
||||
func IsNotFound(err error) bool {
|
||||
return errors.Cause(err) == errNotFound
|
||||
}
|
||||
|
||||
func IsExists(err error) bool {
|
||||
return errors.Cause(err) == errExists
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ func TestContentWriter(t *testing.T) {
|
|||
t.Fatal("ingest dir should be created", err)
|
||||
}
|
||||
|
||||
cw, err := cs.Writer(ctx, "myref")
|
||||
cw, err := cs.Writer(ctx, "myref", 0, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -39,13 +39,13 @@ func TestContentWriter(t *testing.T) {
|
|||
}
|
||||
|
||||
// reopen, so we can test things
|
||||
cw, err = cs.Writer(ctx, "myref")
|
||||
cw, err = cs.Writer(ctx, "myref", 0, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// make sure that second resume also fails
|
||||
if _, err = cs.Writer(ctx, "myref"); err == nil {
|
||||
if _, err = cs.Writer(ctx, "myref", 0, ""); err == nil {
|
||||
// TODO(stevvooe): This also works across processes. Need to find a way
|
||||
// to test that, as well.
|
||||
t.Fatal("no error on second resume")
|
||||
|
@ -88,7 +88,7 @@ func TestContentWriter(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cw, err = cs.Writer(ctx, "aref")
|
||||
cw, err = cs.Writer(ctx, "aref", 0, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -269,7 +269,7 @@ func checkBlobPath(t *testing.T, cs *Store, dgst digest.Digest) string {
|
|||
}
|
||||
|
||||
func checkWrite(t checker, ctx context.Context, cs *Store, dgst digest.Digest, p []byte) digest.Digest {
|
||||
if err := WriteBlob(ctx, cs, bytes.NewReader(p), dgst.String(), int64(len(p)), dgst); err != nil {
|
||||
if err := WriteBlob(ctx, cs, dgst.String(), bytes.NewReader(p), int64(len(p)), dgst); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package content
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
||||
|
@ -16,10 +17,14 @@ import (
|
|||
// This is useful when the digest and size are known beforehand.
|
||||
//
|
||||
// Copy is buffered, so no need to wrap reader in buffered io.
|
||||
func WriteBlob(ctx context.Context, cs Ingester, r io.Reader, ref string, size int64, expected digest.Digest) error {
|
||||
cw, err := cs.Writer(ctx, ref)
|
||||
func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size int64, expected digest.Digest) error {
|
||||
cw, err := cs.Writer(ctx, ref, size, expected)
|
||||
if err != nil {
|
||||
return err
|
||||
if !IsExists(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil // all ready present
|
||||
}
|
||||
|
||||
ws, err := cw.Status()
|
||||
|
@ -28,30 +33,56 @@ func WriteBlob(ctx context.Context, cs Ingester, r io.Reader, ref string, size i
|
|||
}
|
||||
|
||||
if ws.Offset > 0 {
|
||||
// Arbitrary limitation for now. We can detect io.Seeker on r and
|
||||
// resume.
|
||||
return errors.Errorf("cannot resume already started write")
|
||||
r, err = seekReader(r, ws.Offset, size)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "unabled to resume write to %v", ref)
|
||||
}
|
||||
}
|
||||
|
||||
buf := BufPool.Get().([]byte)
|
||||
defer BufPool.Put(buf)
|
||||
|
||||
nn, err := io.CopyBuffer(cw, r, buf)
|
||||
if err != nil {
|
||||
if _, err := io.CopyBuffer(cw, r, buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if size > 0 && nn != size {
|
||||
return errors.Errorf("failed size verification: %v != %v", nn, size)
|
||||
}
|
||||
|
||||
if err := cw.Commit(size, expected); err != nil {
|
||||
return err
|
||||
if !IsExists(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// seekReader attempts to seek the reader to the given offset, either by
|
||||
// resolving `io.Seeker` or by detecting `io.ReaderAt`.
|
||||
func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
|
||||
// attempt to resolve r as a seeker and setup the offset.
|
||||
seeker, ok := r.(io.Seeker)
|
||||
if ok {
|
||||
nn, err := seeker.Seek(offset, io.SeekStart)
|
||||
if nn != offset {
|
||||
return nil, fmt.Errorf("failed to seek to offset %v", offset)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// ok, let's try io.ReaderAt!
|
||||
readerAt, ok := r.(io.ReaderAt)
|
||||
if ok && size > offset {
|
||||
sr := io.NewSectionReader(readerAt, offset, size)
|
||||
return sr, nil
|
||||
}
|
||||
|
||||
return nil, errors.Errorf("cannot seek to offset %v", offset)
|
||||
}
|
||||
|
||||
func readFileString(path string) (string, error) {
|
||||
p, err := ioutil.ReadFile(path)
|
||||
return string(p), err
|
||||
|
|
|
@ -2,10 +2,12 @@ package content
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
|
@ -165,17 +167,34 @@ func (s *Store) status(ingestPath string) (Status, error) {
|
|||
return Status{
|
||||
Ref: ref,
|
||||
Offset: fi.Size(),
|
||||
Total: s.total(ingestPath),
|
||||
UpdatedAt: fi.ModTime(),
|
||||
StartedAt: startedAt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// total attempts to resolve the total expected size for the write.
|
||||
func (s *Store) total(ingestPath string) int64 {
|
||||
totalS, err := readFileString(filepath.Join(ingestPath, "total"))
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
total, err := strconv.ParseInt(totalS, 10, 64)
|
||||
if err != nil {
|
||||
// represents a corrupted file, should probably remove.
|
||||
return 0
|
||||
}
|
||||
|
||||
return total
|
||||
}
|
||||
|
||||
// Writer begins or resumes the active writer identified by ref. If the writer
|
||||
// is already in use, an error is returned. Only one writer may be in use per
|
||||
// ref at a time.
|
||||
//
|
||||
// The argument `ref` is used to uniquely identify a long-lived writer transaction.
|
||||
func (s *Store) Writer(ctx context.Context, ref string) (Writer, error) {
|
||||
func (s *Store) Writer(ctx context.Context, ref string, total int64, expected digest.Digest) (Writer, error) {
|
||||
path, refp, data, lock, err := s.ingestPaths(ref)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -202,16 +221,19 @@ func (s *Store) Writer(ctx context.Context, ref string) (Writer, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// validate that we have no collision for the ref.
|
||||
refraw, err := readFileString(refp)
|
||||
status, err := s.status(path)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not read ref")
|
||||
return nil, errors.Wrap(err, "failed reading status of resume write")
|
||||
}
|
||||
|
||||
if ref != refraw {
|
||||
if ref != status.Ref {
|
||||
// NOTE(stevvooe): This is fairly catastrophic. Either we have some
|
||||
// layout corruption or a hash collision for the ref key.
|
||||
return nil, errors.Wrapf(err, "ref key does not match: %v != %v", ref, refraw)
|
||||
return nil, errors.Wrapf(err, "ref key does not match: %v != %v", ref, status.Ref)
|
||||
}
|
||||
|
||||
if total > 0 && status.Total > 0 && total != status.Total {
|
||||
return nil, errors.Errorf("provided total differs from status: %v != %v", total, status.Total)
|
||||
}
|
||||
|
||||
// slow slow slow!!, send to goroutine or use resumable hashes
|
||||
|
@ -229,18 +251,9 @@ func (s *Store) Writer(ctx context.Context, ref string) (Writer, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
fi, err := os.Stat(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
updatedAt = fi.ModTime()
|
||||
|
||||
if st, ok := fi.Sys().(*syscall.Stat_t); ok {
|
||||
startedAt = time.Unix(st.Ctim.Sec, st.Ctim.Nsec)
|
||||
} else {
|
||||
startedAt = updatedAt
|
||||
}
|
||||
updatedAt = status.UpdatedAt
|
||||
startedAt = status.StartedAt
|
||||
total = status.Total
|
||||
} else {
|
||||
// the ingest is new, we need to setup the target location.
|
||||
// write the ref to a file for later use
|
||||
|
@ -248,6 +261,12 @@ func (s *Store) Writer(ctx context.Context, ref string) (Writer, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if total > 0 {
|
||||
if err := ioutil.WriteFile(filepath.Join(path, "total"), []byte(fmt.Sprint(total)), 0666); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
startedAt = time.Now()
|
||||
updatedAt = startedAt
|
||||
}
|
||||
|
@ -264,6 +283,7 @@ func (s *Store) Writer(ctx context.Context, ref string) (Writer, error) {
|
|||
ref: ref,
|
||||
path: path,
|
||||
offset: offset,
|
||||
total: total,
|
||||
digester: digester,
|
||||
startedAt: startedAt,
|
||||
updatedAt: updatedAt,
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
package content
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/docker/containerd/log"
|
||||
"github.com/nightlyone/lockfile"
|
||||
"github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -19,6 +19,7 @@ type writer struct {
|
|||
path string // path to writer dir
|
||||
ref string // ref key
|
||||
offset int64
|
||||
total int64
|
||||
digester digest.Digester
|
||||
startedAt time.Time
|
||||
updatedAt time.Time
|
||||
|
@ -28,6 +29,7 @@ func (w *writer) Status() (Status, error) {
|
|||
return Status{
|
||||
Ref: w.ref,
|
||||
Offset: w.offset,
|
||||
Total: w.total,
|
||||
StartedAt: w.startedAt,
|
||||
UpdatedAt: w.updatedAt,
|
||||
}, nil
|
||||
|
@ -52,12 +54,12 @@ func (w *writer) Write(p []byte) (n int, err error) {
|
|||
return n, err
|
||||
}
|
||||
|
||||
func (cw *writer) Commit(size int64, expected digest.Digest) error {
|
||||
if err := cw.fp.Sync(); err != nil {
|
||||
func (w *writer) Commit(size int64, expected digest.Digest) error {
|
||||
if err := w.fp.Sync(); err != nil {
|
||||
return errors.Wrap(err, "sync failed")
|
||||
}
|
||||
|
||||
fi, err := cw.fp.Stat()
|
||||
fi, err := w.fp.Stat()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "stat on ingest file failed")
|
||||
}
|
||||
|
@ -67,7 +69,7 @@ func (cw *writer) Commit(size int64, expected digest.Digest) error {
|
|||
// only allowing reads honoring the umask on creation.
|
||||
//
|
||||
// This removes write and exec, only allowing read per the creation umask.
|
||||
if err := cw.fp.Chmod((fi.Mode() & os.ModePerm) &^ 0333); err != nil {
|
||||
if err := w.fp.Chmod((fi.Mode() & os.ModePerm) &^ 0333); err != nil {
|
||||
return errors.Wrap(err, "failed to change ingest file permissions")
|
||||
}
|
||||
|
||||
|
@ -75,18 +77,18 @@ func (cw *writer) Commit(size int64, expected digest.Digest) error {
|
|||
return errors.Errorf("failed size validation: %v != %v", fi.Size(), size)
|
||||
}
|
||||
|
||||
if err := cw.fp.Close(); err != nil {
|
||||
if err := w.fp.Close(); err != nil {
|
||||
return errors.Wrap(err, "failed closing ingest")
|
||||
}
|
||||
|
||||
dgst := cw.digester.Digest()
|
||||
dgst := w.digester.Digest()
|
||||
if expected != "" && expected != dgst {
|
||||
return errors.Errorf("unexpected digest: %v != %v", dgst, expected)
|
||||
}
|
||||
|
||||
var (
|
||||
ingest = filepath.Join(cw.path, "data")
|
||||
target = cw.s.blobPath(dgst)
|
||||
ingest = filepath.Join(w.path, "data")
|
||||
target = w.s.blobPath(dgst)
|
||||
)
|
||||
|
||||
// make sure parent directories of blob exist
|
||||
|
@ -95,18 +97,18 @@ func (cw *writer) Commit(size int64, expected digest.Digest) error {
|
|||
}
|
||||
|
||||
// clean up!!
|
||||
defer os.RemoveAll(cw.path)
|
||||
defer os.RemoveAll(w.path)
|
||||
|
||||
if err := os.Rename(ingest, target); err != nil {
|
||||
if os.IsExist(err) {
|
||||
// collision with the target file!
|
||||
return nil
|
||||
return errExists
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
unlock(cw.lock)
|
||||
cw.fp = nil
|
||||
unlock(w.lock)
|
||||
w.fp = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -121,7 +123,7 @@ func (cw *writer) Commit(size int64, expected digest.Digest) error {
|
|||
// clean up the associated resources.
|
||||
func (cw *writer) Close() (err error) {
|
||||
if err := unlock(cw.lock); err != nil {
|
||||
log.Printf("unlock failed: %v", err)
|
||||
log.L.Debug("unlock failed: %v", err)
|
||||
}
|
||||
|
||||
if cw.fp != nil {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue