diff --git a/cmd/dist/get.go b/cmd/dist/get.go index f4bfbb2..39a4f10 100644 --- a/cmd/dist/get.go +++ b/cmd/dist/get.go @@ -5,7 +5,7 @@ import ( "os" contentapi "github.com/docker/containerd/api/services/content" - "github.com/docker/containerd/content" + contentservice "github.com/docker/containerd/services/content" digest "github.com/opencontainers/go-digest" "github.com/urfave/cli" ) @@ -33,7 +33,7 @@ Output paths can be used to directly access blobs on disk.`, return err } - cs := content.NewProviderFromClient(contentapi.NewContentClient(conn)) + cs := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)) rc, err := cs.Reader(ctx, dgst) if err != nil { diff --git a/cmd/dist/ingest.go b/cmd/dist/ingest.go index e9c95a0..16a1cef 100644 --- a/cmd/dist/ingest.go +++ b/cmd/dist/ingest.go @@ -6,9 +6,10 @@ import ( contentapi "github.com/docker/containerd/api/services/content" "github.com/docker/containerd/content" + contentservice "github.com/docker/containerd/services/content" "github.com/opencontainers/go-digest" - "github.com/urfave/cli" "github.com/pkg/errors" + "github.com/urfave/cli" ) var ingestCommand = cli.Command{ @@ -51,7 +52,7 @@ var ingestCommand = cli.Command{ return errors.New("must specify a transaction reference") } - ingester := content.NewIngesterFromClient(contentapi.NewContentClient(conn)) + ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn)) // TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect // all data to be written in a single invocation. Allow multiple writes diff --git a/content/content.go b/content/content.go index 73b2308..1f97ff7 100644 --- a/content/content.go +++ b/content/content.go @@ -11,10 +11,17 @@ import ( ) var ( - errNotFound = errors.New("content: not found") - errExists = errors.New("content: exists") + // ErrNotFound is returned when an item is not found. + // + // Use IsNotFound(err) to detect this condition. + ErrNotFound = errors.New("content: not found") - BufPool = sync.Pool{ + // ErrExists is returned when something exists when it may not be expected. + // + // Use IsExists(err) to detect this condition. + ErrExists = errors.New("content: exists") + + bufPool = sync.Pool{ New: func() interface{} { return make([]byte, 1<<20) }, @@ -52,9 +59,9 @@ type Ingester interface { } func IsNotFound(err error) bool { - return errors.Cause(err) == errNotFound + return errors.Cause(err) == ErrNotFound } func IsExists(err error) bool { - return errors.Cause(err) == errExists + return errors.Cause(err) == ErrExists } diff --git a/content/helpers.go b/content/helpers.go index ea57d03..b92d011 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -47,8 +47,8 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i } } - buf := BufPool.Get().([]byte) - defer BufPool.Put(buf) + buf := bufPool.Get().([]byte) + defer bufPool.Put(buf) if _, err := io.CopyBuffer(cw, r, buf); err != nil { return err diff --git a/content/store.go b/content/store.go index 3af68fd..4789756 100644 --- a/content/store.go +++ b/content/store.go @@ -41,7 +41,7 @@ func (s *Store) Info(dgst digest.Digest) (Info, error) { fi, err := os.Stat(p) if err != nil { if os.IsNotExist(err) { - err = errNotFound + err = ErrNotFound } return Info{}, err @@ -63,7 +63,7 @@ func (s *Store) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, fp, err := os.Open(s.blobPath(dgst)) if err != nil { if os.IsNotExist(err) { - err = errNotFound + err = ErrNotFound } return nil, err } @@ -81,7 +81,7 @@ func (cs *Store) Delete(dgst digest.Digest) error { return err } - return errNotFound + return ErrNotFound } return nil @@ -243,8 +243,8 @@ func (s *Store) Writer(ctx context.Context, ref string, total int64, expected di } defer fp.Close() - p := BufPool.Get().([]byte) - defer BufPool.Put(p) + p := bufPool.Get().([]byte) + defer bufPool.Put(p) offset, err = io.CopyBuffer(digester.Hash(), fp, p) if err != nil { diff --git a/content/writer.go b/content/writer.go index 2bb86a4..2f72021 100644 --- a/content/writer.go +++ b/content/writer.go @@ -102,7 +102,7 @@ func (w *writer) Commit(size int64, expected digest.Digest) error { if err := os.Rename(ingest, target); err != nil { if os.IsExist(err) { // collision with the target file! - return errExists + return ErrExists } return err } diff --git a/services/content/helpers.go b/services/content/helpers.go new file mode 100644 index 0000000..6de84f1 --- /dev/null +++ b/services/content/helpers.go @@ -0,0 +1,19 @@ +package content + +import ( + "github.com/docker/containerd/content" + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +func rewriteGRPCError(err error) error { + switch grpc.Code(errors.Cause(err)) { + case codes.AlreadyExists: + return content.ErrExists + case codes.NotFound: + return content.ErrNotFound + } + + return err +} diff --git a/content/client.go b/services/content/ingester.go similarity index 66% rename from content/client.go rename to services/content/ingester.go index 4aff02c..cd67100 100644 --- a/content/client.go +++ b/services/content/ingester.go @@ -4,79 +4,14 @@ import ( "context" "io" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - contentapi "github.com/docker/containerd/api/services/content" + "github.com/docker/containerd/content" + digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) -func NewProviderFromClient(client contentapi.ContentClient) Provider { - return &remoteProvider{ - client: client, - } -} - -type remoteProvider struct { - client contentapi.ContentClient -} - -func (rp *remoteProvider) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) { - client, err := rp.client.Read(ctx, &contentapi.ReadRequest{Digest: dgst}) - if err != nil { - return nil, err - } - - return &remoteReader{ - client: client, - }, nil -} - -type remoteReader struct { - client contentapi.Content_ReadClient - extra []byte -} - -func (rr *remoteReader) Read(p []byte) (n int, err error) { - n += copy(p, rr.extra) - if n >= len(p) { - if n <= len(rr.extra) { - rr.extra = rr.extra[n:] - } else { - rr.extra = rr.extra[:0] - } - return - } - - p = p[n:] - for len(p) > 0 { - var resp *contentapi.ReadResponse - // fill our buffer up until we can fill p. - resp, err = rr.client.Recv() - if err != nil { - return - } - - copied := copy(p, resp.Data) - n += copied - p = p[copied:] - - if copied < len(p) { - continue - } - - rr.extra = append(rr.extra, resp.Data[copied:]...) - } - - return -} - -func (rr *remoteReader) Close() error { - return rr.client.CloseSend() -} - -func NewIngesterFromClient(client contentapi.ContentClient) Ingester { +func NewIngesterFromClient(client contentapi.ContentClient) content.Ingester { return &remoteIngester{ client: client, } @@ -86,7 +21,7 @@ type remoteIngester struct { client contentapi.ContentClient } -func (ri *remoteIngester) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (Writer, error) { +func (ri *remoteIngester) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { wrclient, offset, err := ri.negotiate(ctx, ref, size, expected) if err != nil { return nil, rewriteGRPCError(err) @@ -154,15 +89,15 @@ func (rw *remoteWriter) send(req *contentapi.WriteRequest) (*contentapi.WriteRes return resp, err } -func (rw *remoteWriter) Status() (Status, error) { +func (rw *remoteWriter) Status() (content.Status, error) { resp, err := rw.send(&contentapi.WriteRequest{ Action: contentapi.WriteActionStat, }) if err != nil { - return Status{}, err + return content.Status{}, err } - return Status{ + return content.Status{ Ref: rw.ref, Offset: resp.Offset, StartedAt: resp.StartedAt, @@ -226,14 +161,3 @@ func (rw *remoteWriter) Truncate(size int64) error { func (rw *remoteWriter) Close() error { return rw.client.CloseSend() } - -func rewriteGRPCError(err error) error { - switch grpc.Code(errors.Cause(err)) { - case codes.AlreadyExists: - return errExists - case codes.NotFound: - return errNotFound - } - - return err -} diff --git a/services/content/provider.go b/services/content/provider.go new file mode 100644 index 0000000..b8c1aac --- /dev/null +++ b/services/content/provider.go @@ -0,0 +1,76 @@ +package content + +import ( + "context" + "io" + + contentapi "github.com/docker/containerd/api/services/content" + "github.com/docker/containerd/content" + digest "github.com/opencontainers/go-digest" +) + +func NewProviderFromClient(client contentapi.ContentClient) content.Provider { + return &remoteProvider{ + client: client, + } +} + +type remoteProvider struct { + client contentapi.ContentClient +} + +func (rp *remoteProvider) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) { + client, err := rp.client.Read(ctx, &contentapi.ReadRequest{Digest: dgst}) + if err != nil { + return nil, err + } + + return &remoteReader{ + client: client, + }, nil +} + +type remoteReader struct { + client contentapi.Content_ReadClient + extra []byte +} + +func (rr *remoteReader) Read(p []byte) (n int, err error) { + n += copy(p, rr.extra) + if n >= len(p) { + if n <= len(rr.extra) { + rr.extra = rr.extra[n:] + } else { + rr.extra = rr.extra[:0] + } + return + } + + p = p[n:] + for len(p) > 0 { + var resp *contentapi.ReadResponse + // fill our buffer up until we can fill p. + resp, err = rr.client.Recv() + if err != nil { + return + } + + copied := copy(p, resp.Data) + n += copied + p = p[copied:] + + if copied < len(p) { + continue + } + + rr.extra = append(rr.extra, resp.Data[copied:]...) + } + + return +} + +// TODO(stevvooe): Implemente io.ReaderAt. + +func (rr *remoteReader) Close() error { + return rr.client.CloseSend() +} diff --git a/services/content/service.go b/services/content/service.go index 288adb0..6635bf0 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -2,6 +2,7 @@ package content import ( "io" + "sync" "github.com/Sirupsen/logrus" "github.com/docker/containerd" @@ -20,6 +21,12 @@ type Service struct { store *content.Store } +var bufPool = sync.Pool{ + New: func() interface{} { + return make([]byte, 1<<20) + }, +} + var _ api.ContentServer = &Service{} func init() { @@ -100,9 +107,9 @@ func (s *Service) Read(req *api.ReadRequest, session api.Content_ReadServer) err // TODO(stevvooe): Using the global buffer pool. At 32KB, it is probably // little inefficient for work over a fast network. We can tune this later. - p = content.BufPool.Get().([]byte) + p = bufPool.Get().([]byte) ) - defer content.BufPool.Put(p) + defer bufPool.Put(p) if offset < 0 { offset = 0