content: allow reset via Truncate

To make restarting after failed pull less racy, we define `Truncate(size
int64) error` on `content.Writer` for the zero offset. Truncating a
writer will dump any existing data and digest state and start from the
beginning. All subsequent writes will start from the zero offset.

For the service, we support this by defining the behavior for a write
that changes the offset. To keep this narrow, we only support writes out
of order at the offset 0, which causes the writer to dump existing data
and reset the local hash.

This makes restarting failed pulls much smoother when there was a
previously encountered error and the source doesn't support arbitrary
seeks or reads at arbitrary offsets. By allowing this to be done while
holding the write lock on a ref, we can restart the full download
without causing a race condition.

Once we implement seeking on the `io.Reader` returned by the fetcher,
this will be less useful, but it is good to ensure that our protocol
properly supports this use case for when streaming is the only option.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2017-02-27 19:02:16 -08:00
parent be20bb1eb8
commit d99756a8a2
No known key found for this signature in database
GPG Key ID: 67B3DED84EDC823F
7 changed files with 64 additions and 13 deletions

View File

@ -186,8 +186,14 @@ type WriteRequest struct {
// with the commit action message.
Expected github_com_opencontainers_go_digest.Digest `protobuf:"bytes,4,opt,name=expected,proto3,customtype=github.com/opencontainers/go-digest.Digest" json:"expected"`
// Offset specifies the number of bytes from the start at which to begin
// the write. If zero or less, the write will be from the start. This uses
// standard zero-indexed semantics.
// the write. For most implementations, this means from the start of the
// file. This uses standard, zero-indexed semantics.
//
// If the action is write, the remote may remove all previously written
// data up to the offset. Implementations may support arbitrary offsets but
// MUST support reseting this value to zero with with a write. If an
// implementation does not support a write at a particular offset, an
// OutOfRange error must be returned.
Offset int64 `protobuf:"varint,5,opt,name=offset,proto3" json:"offset,omitempty"`
// Data is the actual bytes to be written.
//

View File

@ -86,10 +86,10 @@ message ReadResponse {
enum WriteAction {
option (gogoproto.goproto_enum_prefix) = false;
option (gogoproto.enum_customname) = "WriteAction";
// WriteActionStat instructs the writer to return the current status while
// holding the lock on the write.
STAT = 0 [(gogoproto.enumvalue_customname)="WriteActionStat"];
STAT = 0 [(gogoproto.enumvalue_customname) = "WriteActionStat"];
// WriteActionWrite sets the action for the write request to write data.
//
@ -97,7 +97,7 @@ enum WriteAction {
// transaction will be left open for further writes.
//
// This is the default.
WRITE = 1 [(gogoproto.enumvalue_customname)="WriteActionWrite"];
WRITE = 1 [(gogoproto.enumvalue_customname) = "WriteActionWrite"];
// WriteActionCommit will write any outstanding data in the message and
// commit the write, storing it under the digest.
@ -106,13 +106,13 @@ enum WriteAction {
// commit it.
//
// This action will always terminate the write.
COMMIT = 2 [(gogoproto.enumvalue_customname)="WriteActionCommit"];
COMMIT = 2 [(gogoproto.enumvalue_customname) = "WriteActionCommit"];
// WriteActionAbort will release any resources associated with the write
// and free up the ref for a completely new set of writes.
//
// This action will always terminate the write.
ABORT = -1 [(gogoproto.enumvalue_customname)="WriteActionAbort"];
ABORT = -1 [(gogoproto.enumvalue_customname) = "WriteActionAbort"];
}
// WriteRequest writes data to the request ref at offset.
@ -156,8 +156,14 @@ message WriteRequest {
string expected = 4 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false];
// Offset specifies the number of bytes from the start at which to begin
// the write. If zero or less, the write will be from the start. This uses
// standard zero-indexed semantics.
// the write. For most implementations, this means from the start of the
// file. This uses standard, zero-indexed semantics.
//
// If the action is write, the remote may remove all previously written
// data after the offset. Implementations may support arbitrary offsets but
// MUST support reseting this value to zero with a write. If an
// implementation does not support a write at a particular offset, an
// OutOfRange error must be returned.
int64 offset = 5;
// Data is the actual bytes to be written.

View File

@ -199,6 +199,7 @@ func (rw *remoteWriter) Commit(size int64, expected digest.Digest) error {
resp, err := rw.send(&contentapi.WriteRequest{
Action: contentapi.WriteActionCommit,
Total: size,
Offset: rw.offset,
Expected: expected,
})
if err != nil {
@ -216,6 +217,12 @@ func (rw *remoteWriter) Commit(size int64, expected digest.Digest) error {
return nil
}
func (rw *remoteWriter) Truncate(size int64) error {
// This truncation won't actually be validated until a write is issued.
rw.offset = size
return nil
}
func (rw *remoteWriter) Close() error {
return rw.client.CloseSend()
}

View File

@ -44,6 +44,7 @@ type Writer interface {
Status() (Status, error)
Digest() digest.Digest
Commit(size int64, expected digest.Digest) error
Truncate(size int64) error
}
type Ingester interface {

View File

@ -36,7 +36,14 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i
if ws.Offset > 0 {
r, err = seekReader(r, ws.Offset, size)
if err != nil {
return errors.Wrapf(err, "unabled to resume write to %v", ref)
if !isUnseekable(err) {
return errors.Wrapf(err, "unabled to resume write to %v", ref)
}
// reader is unseekable, try to move the writer back to the start.
if err := cw.Truncate(0); err != nil {
return errors.Wrapf(err, "content writer truncate failed")
}
}
}
@ -49,13 +56,19 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i
if err := cw.Commit(size, expected); err != nil {
if !IsExists(err) {
return err
return errors.Wrapf(err, "failed commit on ref %q", ref)
}
}
return nil
}
var errUnseekable = errors.New("seek not supported")
func isUnseekable(err error) bool {
return errors.Cause(err) == errUnseekable
}
// seekReader attempts to seek the reader to the given offset, either by
// resolving `io.Seeker` or by detecting `io.ReaderAt`.
func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
@ -81,7 +94,7 @@ func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
return sr, nil
}
return nil, errors.Errorf("cannot seek to offset %v", offset)
return r, errors.Wrapf(errUnseekable, "seek to offset %v failed", offset)
}
func readFileString(path string) (string, error) {

View File

@ -74,7 +74,7 @@ func (w *writer) Commit(size int64, expected digest.Digest) error {
}
if size > 0 && size != fi.Size() {
return errors.Errorf("failed size validation: %v != %v", fi.Size(), size)
return errors.Errorf("%q failed size validation: %v != %v", w.ref, fi.Size(), size)
}
if err := w.fp.Close(); err != nil {
@ -133,3 +133,12 @@ func (cw *writer) Close() (err error) {
return nil
}
func (w *writer) Truncate(size int64) error {
if size != 0 {
return errors.New("Truncate: unsupported size")
}
w.offset = 0
w.digester.Hash().Reset()
return w.fp.Truncate(0)
}

View File

@ -182,6 +182,8 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) {
}
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(fields))
log.G(ctx).Debug("(*Service).Write started")
// this action locks the writer for the session.
wr, err := s.store.Writer(ctx, ref, total, expected)
if err != nil {
@ -253,6 +255,13 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) {
}
}
if req.Offset == 0 && ws.Offset > 0 {
if err := wr.Truncate(req.Offset); err != nil {
return errors.Wrapf(err, "truncate failed")
}
msg.Offset = req.Offset
}
// issue the write if we actually have data.
if len(req.Data) > 0 {
// While this looks like we could use io.WriterAt here, because we