containerd/content/helpers.go
Stephen J Day d99756a8a2
content: allow reset via Truncate
To make restarting after failed pull less racy, we define `Truncate(size
int64) error` on `content.Writer` for the zero offset. Truncating a
writer will dump any existing data and digest state and start from the
beginning. All subsequent writes will start from the zero offset.

For the service, we support this by defining the behavior for a write
that changes the offset. To keep this narrow, we only support writes out
of order at the offset 0, which causes the writer to dump existing data
and reset the local hash.

This makes restarting failed pulls much smoother when there was a
previously encountered error and the source doesn't support arbitrary
seeks or reads at arbitrary offsets. By allowing this to be done while
holding the write lock on a ref, we can restart the full download
without causing a race condition.

Once we implement seeking on the `io.Reader` returned by the fetcher,
this will be less useful, but it is good to ensure that our protocol
properly supports this use case for when streaming is the only option.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-02-28 10:40:02 -08:00

103 lines
2.4 KiB
Go

package content
import (
"context"
"fmt"
"io"
"io/ioutil"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
// WriteBlob writes data with the expected digest into the content store. If
// expected already exists, the method returns immediately and the reader will
// not be consumed.
//
// This is useful when the digest and size are known beforehand.
//
// Copy is buffered, so no need to wrap reader in buffered io.
func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size int64, expected digest.Digest) error {
cw, err := cs.Writer(ctx, ref, size, expected)
if err != nil {
if !IsExists(err) {
return err
}
return nil // all ready present
}
defer cw.Close()
ws, err := cw.Status()
if err != nil {
return err
}
if ws.Offset > 0 {
r, err = seekReader(r, ws.Offset, size)
if err != nil {
if !isUnseekable(err) {
return errors.Wrapf(err, "unabled to resume write to %v", ref)
}
// reader is unseekable, try to move the writer back to the start.
if err := cw.Truncate(0); err != nil {
return errors.Wrapf(err, "content writer truncate failed")
}
}
}
buf := BufPool.Get().([]byte)
defer BufPool.Put(buf)
if _, err := io.CopyBuffer(cw, r, buf); err != nil {
return err
}
if err := cw.Commit(size, expected); err != nil {
if !IsExists(err) {
return errors.Wrapf(err, "failed commit on ref %q", ref)
}
}
return nil
}
var errUnseekable = errors.New("seek not supported")
func isUnseekable(err error) bool {
return errors.Cause(err) == errUnseekable
}
// seekReader attempts to seek the reader to the given offset, either by
// resolving `io.Seeker` or by detecting `io.ReaderAt`.
func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
// attempt to resolve r as a seeker and setup the offset.
seeker, ok := r.(io.Seeker)
if ok {
nn, err := seeker.Seek(offset, io.SeekStart)
if nn != offset {
return nil, fmt.Errorf("failed to seek to offset %v", offset)
}
if err != nil {
return nil, err
}
return r, nil
}
// ok, let's try io.ReaderAt!
readerAt, ok := r.(io.ReaderAt)
if ok && size > offset {
sr := io.NewSectionReader(readerAt, offset, size)
return sr, nil
}
return r, errors.Wrapf(errUnseekable, "seek to offset %v failed", offset)
}
func readFileString(path string) (string, error) {
p, err := ioutil.ReadFile(path)
return string(p), err
}