Merge pull request #576 from stevvooe/seek-reset-handling
content: allow reset via Truncate
This commit is contained in:
commit
317b884110
7 changed files with 64 additions and 13 deletions
|
@ -197,8 +197,14 @@ type WriteRequest struct {
|
||||||
// with the commit action message.
|
// with the commit action message.
|
||||||
Expected github_com_opencontainers_go_digest.Digest `protobuf:"bytes,4,opt,name=expected,proto3,customtype=github.com/opencontainers/go-digest.Digest" json:"expected"`
|
Expected github_com_opencontainers_go_digest.Digest `protobuf:"bytes,4,opt,name=expected,proto3,customtype=github.com/opencontainers/go-digest.Digest" json:"expected"`
|
||||||
// Offset specifies the number of bytes from the start at which to begin
|
// Offset specifies the number of bytes from the start at which to begin
|
||||||
// the write. If zero or less, the write will be from the start. This uses
|
// the write. For most implementations, this means from the start of the
|
||||||
// standard zero-indexed semantics.
|
// file. This uses standard, zero-indexed semantics.
|
||||||
|
//
|
||||||
|
// If the action is write, the remote may remove all previously written
|
||||||
|
// data up to the offset. Implementations may support arbitrary offsets but
|
||||||
|
// MUST support reseting this value to zero with with a write. If an
|
||||||
|
// implementation does not support a write at a particular offset, an
|
||||||
|
// OutOfRange error must be returned.
|
||||||
Offset int64 `protobuf:"varint,5,opt,name=offset,proto3" json:"offset,omitempty"`
|
Offset int64 `protobuf:"varint,5,opt,name=offset,proto3" json:"offset,omitempty"`
|
||||||
// Data is the actual bytes to be written.
|
// Data is the actual bytes to be written.
|
||||||
//
|
//
|
||||||
|
|
|
@ -98,7 +98,7 @@ enum WriteAction {
|
||||||
|
|
||||||
// WriteActionStat instructs the writer to return the current status while
|
// WriteActionStat instructs the writer to return the current status while
|
||||||
// holding the lock on the write.
|
// holding the lock on the write.
|
||||||
STAT = 0 [(gogoproto.enumvalue_customname)="WriteActionStat"];
|
STAT = 0 [(gogoproto.enumvalue_customname) = "WriteActionStat"];
|
||||||
|
|
||||||
// WriteActionWrite sets the action for the write request to write data.
|
// WriteActionWrite sets the action for the write request to write data.
|
||||||
//
|
//
|
||||||
|
@ -106,7 +106,7 @@ enum WriteAction {
|
||||||
// transaction will be left open for further writes.
|
// transaction will be left open for further writes.
|
||||||
//
|
//
|
||||||
// This is the default.
|
// This is the default.
|
||||||
WRITE = 1 [(gogoproto.enumvalue_customname)="WriteActionWrite"];
|
WRITE = 1 [(gogoproto.enumvalue_customname) = "WriteActionWrite"];
|
||||||
|
|
||||||
// WriteActionCommit will write any outstanding data in the message and
|
// WriteActionCommit will write any outstanding data in the message and
|
||||||
// commit the write, storing it under the digest.
|
// commit the write, storing it under the digest.
|
||||||
|
@ -115,13 +115,13 @@ enum WriteAction {
|
||||||
// commit it.
|
// commit it.
|
||||||
//
|
//
|
||||||
// This action will always terminate the write.
|
// This action will always terminate the write.
|
||||||
COMMIT = 2 [(gogoproto.enumvalue_customname)="WriteActionCommit"];
|
COMMIT = 2 [(gogoproto.enumvalue_customname) = "WriteActionCommit"];
|
||||||
|
|
||||||
// WriteActionAbort will release any resources associated with the write
|
// WriteActionAbort will release any resources associated with the write
|
||||||
// and free up the ref for a completely new set of writes.
|
// and free up the ref for a completely new set of writes.
|
||||||
//
|
//
|
||||||
// This action will always terminate the write.
|
// This action will always terminate the write.
|
||||||
ABORT = -1 [(gogoproto.enumvalue_customname)="WriteActionAbort"];
|
ABORT = -1 [(gogoproto.enumvalue_customname) = "WriteActionAbort"];
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteRequest writes data to the request ref at offset.
|
// WriteRequest writes data to the request ref at offset.
|
||||||
|
@ -165,8 +165,14 @@ message WriteRequest {
|
||||||
string expected = 4 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false];
|
string expected = 4 [(gogoproto.customtype) = "github.com/opencontainers/go-digest.Digest", (gogoproto.nullable) = false];
|
||||||
|
|
||||||
// Offset specifies the number of bytes from the start at which to begin
|
// Offset specifies the number of bytes from the start at which to begin
|
||||||
// the write. If zero or less, the write will be from the start. This uses
|
// the write. For most implementations, this means from the start of the
|
||||||
// standard zero-indexed semantics.
|
// file. This uses standard, zero-indexed semantics.
|
||||||
|
//
|
||||||
|
// If the action is write, the remote may remove all previously written
|
||||||
|
// data after the offset. Implementations may support arbitrary offsets but
|
||||||
|
// MUST support reseting this value to zero with a write. If an
|
||||||
|
// implementation does not support a write at a particular offset, an
|
||||||
|
// OutOfRange error must be returned.
|
||||||
int64 offset = 5;
|
int64 offset = 5;
|
||||||
|
|
||||||
// Data is the actual bytes to be written.
|
// Data is the actual bytes to be written.
|
||||||
|
|
|
@ -199,6 +199,7 @@ func (rw *remoteWriter) Commit(size int64, expected digest.Digest) error {
|
||||||
resp, err := rw.send(&contentapi.WriteRequest{
|
resp, err := rw.send(&contentapi.WriteRequest{
|
||||||
Action: contentapi.WriteActionCommit,
|
Action: contentapi.WriteActionCommit,
|
||||||
Total: size,
|
Total: size,
|
||||||
|
Offset: rw.offset,
|
||||||
Expected: expected,
|
Expected: expected,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -216,6 +217,12 @@ func (rw *remoteWriter) Commit(size int64, expected digest.Digest) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rw *remoteWriter) Truncate(size int64) error {
|
||||||
|
// This truncation won't actually be validated until a write is issued.
|
||||||
|
rw.offset = size
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (rw *remoteWriter) Close() error {
|
func (rw *remoteWriter) Close() error {
|
||||||
return rw.client.CloseSend()
|
return rw.client.CloseSend()
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,6 +44,7 @@ type Writer interface {
|
||||||
Status() (Status, error)
|
Status() (Status, error)
|
||||||
Digest() digest.Digest
|
Digest() digest.Digest
|
||||||
Commit(size int64, expected digest.Digest) error
|
Commit(size int64, expected digest.Digest) error
|
||||||
|
Truncate(size int64) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type Ingester interface {
|
type Ingester interface {
|
||||||
|
|
|
@ -36,7 +36,14 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i
|
||||||
if ws.Offset > 0 {
|
if ws.Offset > 0 {
|
||||||
r, err = seekReader(r, ws.Offset, size)
|
r, err = seekReader(r, ws.Offset, size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "unabled to resume write to %v", ref)
|
if !isUnseekable(err) {
|
||||||
|
return errors.Wrapf(err, "unabled to resume write to %v", ref)
|
||||||
|
}
|
||||||
|
|
||||||
|
// reader is unseekable, try to move the writer back to the start.
|
||||||
|
if err := cw.Truncate(0); err != nil {
|
||||||
|
return errors.Wrapf(err, "content writer truncate failed")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,13 +56,19 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i
|
||||||
|
|
||||||
if err := cw.Commit(size, expected); err != nil {
|
if err := cw.Commit(size, expected); err != nil {
|
||||||
if !IsExists(err) {
|
if !IsExists(err) {
|
||||||
return err
|
return errors.Wrapf(err, "failed commit on ref %q", ref)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var errUnseekable = errors.New("seek not supported")
|
||||||
|
|
||||||
|
func isUnseekable(err error) bool {
|
||||||
|
return errors.Cause(err) == errUnseekable
|
||||||
|
}
|
||||||
|
|
||||||
// seekReader attempts to seek the reader to the given offset, either by
|
// seekReader attempts to seek the reader to the given offset, either by
|
||||||
// resolving `io.Seeker` or by detecting `io.ReaderAt`.
|
// resolving `io.Seeker` or by detecting `io.ReaderAt`.
|
||||||
func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
|
func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
|
||||||
|
@ -81,7 +94,7 @@ func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
|
||||||
return sr, nil
|
return sr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, errors.Errorf("cannot seek to offset %v", offset)
|
return r, errors.Wrapf(errUnseekable, "seek to offset %v failed", offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
func readFileString(path string) (string, error) {
|
func readFileString(path string) (string, error) {
|
||||||
|
|
|
@ -74,7 +74,7 @@ func (w *writer) Commit(size int64, expected digest.Digest) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if size > 0 && size != fi.Size() {
|
if size > 0 && size != fi.Size() {
|
||||||
return errors.Errorf("failed size validation: %v != %v", fi.Size(), size)
|
return errors.Errorf("%q failed size validation: %v != %v", w.ref, fi.Size(), size)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := w.fp.Close(); err != nil {
|
if err := w.fp.Close(); err != nil {
|
||||||
|
@ -133,3 +133,12 @@ func (cw *writer) Close() (err error) {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *writer) Truncate(size int64) error {
|
||||||
|
if size != 0 {
|
||||||
|
return errors.New("Truncate: unsupported size")
|
||||||
|
}
|
||||||
|
w.offset = 0
|
||||||
|
w.digester.Hash().Reset()
|
||||||
|
return w.fp.Truncate(0)
|
||||||
|
}
|
||||||
|
|
|
@ -195,6 +195,8 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(fields))
|
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(fields))
|
||||||
|
|
||||||
|
log.G(ctx).Debug("(*Service).Write started")
|
||||||
// this action locks the writer for the session.
|
// this action locks the writer for the session.
|
||||||
wr, err := s.store.Writer(ctx, ref, total, expected)
|
wr, err := s.store.Writer(ctx, ref, total, expected)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -266,6 +268,13 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if req.Offset == 0 && ws.Offset > 0 {
|
||||||
|
if err := wr.Truncate(req.Offset); err != nil {
|
||||||
|
return errors.Wrapf(err, "truncate failed")
|
||||||
|
}
|
||||||
|
msg.Offset = req.Offset
|
||||||
|
}
|
||||||
|
|
||||||
// issue the write if we actually have data.
|
// issue the write if we actually have data.
|
||||||
if len(req.Data) > 0 {
|
if len(req.Data) > 0 {
|
||||||
// While this looks like we could use io.WriterAt here, because we
|
// While this looks like we could use io.WriterAt here, because we
|
||||||
|
|
Loading…
Reference in a new issue