diff --git a/api/services/content/content.pb.go b/api/services/content/content.pb.go index 5c368d0..7d668de 100644 --- a/api/services/content/content.pb.go +++ b/api/services/content/content.pb.go @@ -186,8 +186,14 @@ type WriteRequest struct { // with the commit action message. Expected github_com_opencontainers_go_digest.Digest `protobuf:"bytes,4,opt,name=expected,proto3,customtype=github.com/opencontainers/go-digest.Digest" json:"expected"` // Offset specifies the number of bytes from the start at which to begin - // the write. If zero or less, the write will be from the start. This uses - // standard zero-indexed semantics. + // the write. For most implementations, this means from the start of the + // file. This uses standard, zero-indexed semantics. + // + // If the action is write, the remote may remove all previously written + // data up to the offset. Implementations may support arbitrary offsets but + // MUST support reseting this value to zero with with a write. If an + // implementation does not support a write at a particular offset, an + // OutOfRange error must be returned. Offset int64 `protobuf:"varint,5,opt,name=offset,proto3" json:"offset,omitempty"` // Data is the actual bytes to be written. // diff --git a/api/services/content/content.proto b/api/services/content/content.proto index 6e5c4c4..2d4a33d 100644 --- a/api/services/content/content.proto +++ b/api/services/content/content.proto @@ -86,10 +86,10 @@ message ReadResponse { enum WriteAction { option (gogoproto.goproto_enum_prefix) = false; option (gogoproto.enum_customname) = "WriteAction"; - + // WriteActionStat instructs the writer to return the current status while // holding the lock on the write. - STAT = 0 [(gogoproto.enumvalue_customname)="WriteActionStat"]; + STAT = 0 [(gogoproto.enumvalue_customname) = "WriteActionStat"]; // WriteActionWrite sets the action for the write request to write data. // @@ -97,7 +97,7 @@ enum WriteAction { // transaction will be left open for further writes. // // This is the default. - WRITE = 1 [(gogoproto.enumvalue_customname)="WriteActionWrite"]; + WRITE = 1 [(gogoproto.enumvalue_customname) = "WriteActionWrite"]; // WriteActionCommit will write any outstanding data in the message and // commit the write, storing it under the digest. @@ -106,13 +106,13 @@ enum WriteAction { // commit it. // // This action will always terminate the write. - COMMIT = 2 [(gogoproto.enumvalue_customname)="WriteActionCommit"]; + COMMIT = 2 [(gogoproto.enumvalue_customname) = "WriteActionCommit"]; // WriteActionAbort will release any resources associated with the write // and free up the ref for a completely new set of writes. // // This action will always terminate the write. - ABORT = -1 [(gogoproto.enumvalue_customname)="WriteActionAbort"]; + ABORT = -1 [(gogoproto.enumvalue_customname) = "WriteActionAbort"]; } // WriteRequest writes data to the request ref at offset. @@ -156,8 +156,14 @@ message WriteRequest { string expected = 4 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false]; // Offset specifies the number of bytes from the start at which to begin - // the write. If zero or less, the write will be from the start. This uses - // standard zero-indexed semantics. + // the write. For most implementations, this means from the start of the + // file. This uses standard, zero-indexed semantics. + // + // If the action is write, the remote may remove all previously written + // data after the offset. Implementations may support arbitrary offsets but + // MUST support reseting this value to zero with a write. If an + // implementation does not support a write at a particular offset, an + // OutOfRange error must be returned. int64 offset = 5; // Data is the actual bytes to be written. diff --git a/content/client.go b/content/client.go index f2f1e99..4aff02c 100644 --- a/content/client.go +++ b/content/client.go @@ -199,6 +199,7 @@ func (rw *remoteWriter) Commit(size int64, expected digest.Digest) error { resp, err := rw.send(&contentapi.WriteRequest{ Action: contentapi.WriteActionCommit, Total: size, + Offset: rw.offset, Expected: expected, }) if err != nil { @@ -216,6 +217,12 @@ func (rw *remoteWriter) Commit(size int64, expected digest.Digest) error { return nil } +func (rw *remoteWriter) Truncate(size int64) error { + // This truncation won't actually be validated until a write is issued. + rw.offset = size + return nil +} + func (rw *remoteWriter) Close() error { return rw.client.CloseSend() } diff --git a/content/content.go b/content/content.go index 4677dbc..73b2308 100644 --- a/content/content.go +++ b/content/content.go @@ -44,6 +44,7 @@ type Writer interface { Status() (Status, error) Digest() digest.Digest Commit(size int64, expected digest.Digest) error + Truncate(size int64) error } type Ingester interface { diff --git a/content/helpers.go b/content/helpers.go index fc37e13..ea57d03 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -36,7 +36,14 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i if ws.Offset > 0 { r, err = seekReader(r, ws.Offset, size) if err != nil { - return errors.Wrapf(err, "unabled to resume write to %v", ref) + if !isUnseekable(err) { + return errors.Wrapf(err, "unabled to resume write to %v", ref) + } + + // reader is unseekable, try to move the writer back to the start. + if err := cw.Truncate(0); err != nil { + return errors.Wrapf(err, "content writer truncate failed") + } } } @@ -49,13 +56,19 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i if err := cw.Commit(size, expected); err != nil { if !IsExists(err) { - return err + return errors.Wrapf(err, "failed commit on ref %q", ref) } } return nil } +var errUnseekable = errors.New("seek not supported") + +func isUnseekable(err error) bool { + return errors.Cause(err) == errUnseekable +} + // seekReader attempts to seek the reader to the given offset, either by // resolving `io.Seeker` or by detecting `io.ReaderAt`. func seekReader(r io.Reader, offset, size int64) (io.Reader, error) { @@ -81,7 +94,7 @@ func seekReader(r io.Reader, offset, size int64) (io.Reader, error) { return sr, nil } - return nil, errors.Errorf("cannot seek to offset %v", offset) + return r, errors.Wrapf(errUnseekable, "seek to offset %v failed", offset) } func readFileString(path string) (string, error) { diff --git a/content/writer.go b/content/writer.go index 43ea388..2bb86a4 100644 --- a/content/writer.go +++ b/content/writer.go @@ -74,7 +74,7 @@ func (w *writer) Commit(size int64, expected digest.Digest) error { } if size > 0 && size != fi.Size() { - return errors.Errorf("failed size validation: %v != %v", fi.Size(), size) + return errors.Errorf("%q failed size validation: %v != %v", w.ref, fi.Size(), size) } if err := w.fp.Close(); err != nil { @@ -133,3 +133,12 @@ func (cw *writer) Close() (err error) { return nil } + +func (w *writer) Truncate(size int64) error { + if size != 0 { + return errors.New("Truncate: unsupported size") + } + w.offset = 0 + w.digester.Hash().Reset() + return w.fp.Truncate(0) +} diff --git a/services/content/service.go b/services/content/service.go index 6c3ae0f..75e6f5b 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -182,6 +182,8 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) { } ctx = log.WithLogger(ctx, log.G(ctx).WithFields(fields)) + + log.G(ctx).Debug("(*Service).Write started") // this action locks the writer for the session. wr, err := s.store.Writer(ctx, ref, total, expected) if err != nil { @@ -253,6 +255,13 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) { } } + if req.Offset == 0 && ws.Offset > 0 { + if err := wr.Truncate(req.Offset); err != nil { + return errors.Wrapf(err, "truncate failed") + } + msg.Offset = req.Offset + } + // issue the write if we actually have data. if len(req.Data) > 0 { // While this looks like we could use io.WriterAt here, because we