From d99756a8a21b724293022c0aa69bba85a2db6c61 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Mon, 27 Feb 2017 19:02:16 -0800 Subject: [PATCH] content: allow reset via Truncate To make restarting after failed pull less racy, we define `Truncate(size int64) error` on `content.Writer` for the zero offset. Truncating a writer will dump any existing data and digest state and start from the beginning. All subsequent writes will start from the zero offset. For the service, we support this by defining the behavior for a write that changes the offset. To keep this narrow, we only support writes out of order at the offset 0, which causes the writer to dump existing data and reset the local hash. This makes restarting failed pulls much smoother when there was a previously encountered error and the source doesn't support arbitrary seeks or reads at arbitrary offsets. By allowing this to be done while holding the write lock on a ref, we can restart the full download without causing a race condition. Once we implement seeking on the `io.Reader` returned by the fetcher, this will be less useful, but it is good to ensure that our protocol properly supports this use case for when streaming is the only option. Signed-off-by: Stephen J Day --- api/services/content/content.pb.go | 10 ++++++++-- api/services/content/content.proto | 20 +++++++++++++------- content/client.go | 7 +++++++ content/content.go | 1 + content/helpers.go | 19 ++++++++++++++++--- content/writer.go | 11 ++++++++++- services/content/service.go | 9 +++++++++ 7 files changed, 64 insertions(+), 13 deletions(-) diff --git a/api/services/content/content.pb.go b/api/services/content/content.pb.go index 5c368d0..7d668de 100644 --- a/api/services/content/content.pb.go +++ b/api/services/content/content.pb.go @@ -186,8 +186,14 @@ type WriteRequest struct { // with the commit action message. Expected github_com_opencontainers_go_digest.Digest `protobuf:"bytes,4,opt,name=expected,proto3,customtype=github.com/opencontainers/go-digest.Digest" json:"expected"` // Offset specifies the number of bytes from the start at which to begin - // the write. If zero or less, the write will be from the start. This uses - // standard zero-indexed semantics. + // the write. For most implementations, this means from the start of the + // file. This uses standard, zero-indexed semantics. + // + // If the action is write, the remote may remove all previously written + // data up to the offset. Implementations may support arbitrary offsets but + // MUST support reseting this value to zero with with a write. If an + // implementation does not support a write at a particular offset, an + // OutOfRange error must be returned. Offset int64 `protobuf:"varint,5,opt,name=offset,proto3" json:"offset,omitempty"` // Data is the actual bytes to be written. // diff --git a/api/services/content/content.proto b/api/services/content/content.proto index 6e5c4c4..2d4a33d 100644 --- a/api/services/content/content.proto +++ b/api/services/content/content.proto @@ -86,10 +86,10 @@ message ReadResponse { enum WriteAction { option (gogoproto.goproto_enum_prefix) = false; option (gogoproto.enum_customname) = "WriteAction"; - + // WriteActionStat instructs the writer to return the current status while // holding the lock on the write. - STAT = 0 [(gogoproto.enumvalue_customname)="WriteActionStat"]; + STAT = 0 [(gogoproto.enumvalue_customname) = "WriteActionStat"]; // WriteActionWrite sets the action for the write request to write data. // @@ -97,7 +97,7 @@ enum WriteAction { // transaction will be left open for further writes. // // This is the default. - WRITE = 1 [(gogoproto.enumvalue_customname)="WriteActionWrite"]; + WRITE = 1 [(gogoproto.enumvalue_customname) = "WriteActionWrite"]; // WriteActionCommit will write any outstanding data in the message and // commit the write, storing it under the digest. @@ -106,13 +106,13 @@ enum WriteAction { // commit it. // // This action will always terminate the write. - COMMIT = 2 [(gogoproto.enumvalue_customname)="WriteActionCommit"]; + COMMIT = 2 [(gogoproto.enumvalue_customname) = "WriteActionCommit"]; // WriteActionAbort will release any resources associated with the write // and free up the ref for a completely new set of writes. // // This action will always terminate the write. - ABORT = -1 [(gogoproto.enumvalue_customname)="WriteActionAbort"]; + ABORT = -1 [(gogoproto.enumvalue_customname) = "WriteActionAbort"]; } // WriteRequest writes data to the request ref at offset. @@ -156,8 +156,14 @@ message WriteRequest { string expected = 4 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false]; // Offset specifies the number of bytes from the start at which to begin - // the write. If zero or less, the write will be from the start. This uses - // standard zero-indexed semantics. + // the write. For most implementations, this means from the start of the + // file. This uses standard, zero-indexed semantics. + // + // If the action is write, the remote may remove all previously written + // data after the offset. Implementations may support arbitrary offsets but + // MUST support reseting this value to zero with a write. If an + // implementation does not support a write at a particular offset, an + // OutOfRange error must be returned. int64 offset = 5; // Data is the actual bytes to be written. diff --git a/content/client.go b/content/client.go index f2f1e99..4aff02c 100644 --- a/content/client.go +++ b/content/client.go @@ -199,6 +199,7 @@ func (rw *remoteWriter) Commit(size int64, expected digest.Digest) error { resp, err := rw.send(&contentapi.WriteRequest{ Action: contentapi.WriteActionCommit, Total: size, + Offset: rw.offset, Expected: expected, }) if err != nil { @@ -216,6 +217,12 @@ func (rw *remoteWriter) Commit(size int64, expected digest.Digest) error { return nil } +func (rw *remoteWriter) Truncate(size int64) error { + // This truncation won't actually be validated until a write is issued. + rw.offset = size + return nil +} + func (rw *remoteWriter) Close() error { return rw.client.CloseSend() } diff --git a/content/content.go b/content/content.go index 4677dbc..73b2308 100644 --- a/content/content.go +++ b/content/content.go @@ -44,6 +44,7 @@ type Writer interface { Status() (Status, error) Digest() digest.Digest Commit(size int64, expected digest.Digest) error + Truncate(size int64) error } type Ingester interface { diff --git a/content/helpers.go b/content/helpers.go index fc37e13..ea57d03 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -36,7 +36,14 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i if ws.Offset > 0 { r, err = seekReader(r, ws.Offset, size) if err != nil { - return errors.Wrapf(err, "unabled to resume write to %v", ref) + if !isUnseekable(err) { + return errors.Wrapf(err, "unabled to resume write to %v", ref) + } + + // reader is unseekable, try to move the writer back to the start. + if err := cw.Truncate(0); err != nil { + return errors.Wrapf(err, "content writer truncate failed") + } } } @@ -49,13 +56,19 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i if err := cw.Commit(size, expected); err != nil { if !IsExists(err) { - return err + return errors.Wrapf(err, "failed commit on ref %q", ref) } } return nil } +var errUnseekable = errors.New("seek not supported") + +func isUnseekable(err error) bool { + return errors.Cause(err) == errUnseekable +} + // seekReader attempts to seek the reader to the given offset, either by // resolving `io.Seeker` or by detecting `io.ReaderAt`. func seekReader(r io.Reader, offset, size int64) (io.Reader, error) { @@ -81,7 +94,7 @@ func seekReader(r io.Reader, offset, size int64) (io.Reader, error) { return sr, nil } - return nil, errors.Errorf("cannot seek to offset %v", offset) + return r, errors.Wrapf(errUnseekable, "seek to offset %v failed", offset) } func readFileString(path string) (string, error) { diff --git a/content/writer.go b/content/writer.go index 43ea388..2bb86a4 100644 --- a/content/writer.go +++ b/content/writer.go @@ -74,7 +74,7 @@ func (w *writer) Commit(size int64, expected digest.Digest) error { } if size > 0 && size != fi.Size() { - return errors.Errorf("failed size validation: %v != %v", fi.Size(), size) + return errors.Errorf("%q failed size validation: %v != %v", w.ref, fi.Size(), size) } if err := w.fp.Close(); err != nil { @@ -133,3 +133,12 @@ func (cw *writer) Close() (err error) { return nil } + +func (w *writer) Truncate(size int64) error { + if size != 0 { + return errors.New("Truncate: unsupported size") + } + w.offset = 0 + w.digester.Hash().Reset() + return w.fp.Truncate(0) +} diff --git a/services/content/service.go b/services/content/service.go index 6c3ae0f..75e6f5b 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -182,6 +182,8 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) { } ctx = log.WithLogger(ctx, log.G(ctx).WithFields(fields)) + + log.G(ctx).Debug("(*Service).Write started") // this action locks the writer for the session. wr, err := s.store.Writer(ctx, ref, total, expected) if err != nil { @@ -253,6 +255,13 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) { } } + if req.Offset == 0 && ws.Offset > 0 { + if err := wr.Truncate(req.Offset); err != nil { + return errors.Wrapf(err, "truncate failed") + } + msg.Offset = req.Offset + } + // issue the write if we actually have data. if len(req.Data) > 0 { // While this looks like we could use io.WriterAt here, because we