containerd/services/content/service.go

342 lines
8.4 KiB
Go

package content
import (
"io"
"sync"
"github.com/Sirupsen/logrus"
api "github.com/containerd/containerd/api/services/content"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/plugin"
"github.com/golang/protobuf/ptypes/empty"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
type Service struct {
store *content.Store
}
var bufPool = sync.Pool{
New: func() interface{} {
return make([]byte, 1<<20)
},
}
var _ api.ContentServer = &Service{}
func init() {
plugin.Register("content-grpc", &plugin.Registration{
Type: plugin.GRPCPlugin,
Init: NewService,
})
}
func NewService(ic *plugin.InitContext) (interface{}, error) {
return &Service{
store: ic.Store,
}, nil
}
func (s *Service) Register(server *grpc.Server) error {
api.RegisterContentServer(server, s)
return nil
}
func (s *Service) Info(ctx context.Context, req *api.InfoRequest) (*api.InfoResponse, error) {
if err := req.Digest.Validate(); err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, "%q failed validation", req.Digest)
}
bi, err := s.store.Info(req.Digest)
if err != nil {
return nil, maybeNotFoundGRPC(err, req.Digest.String())
}
return &api.InfoResponse{
Digest: req.Digest,
Size_: bi.Size,
CommittedAt: bi.CommittedAt,
}, nil
}
func (s *Service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*empty.Empty, error) {
if err := req.Digest.Validate(); err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, err.Error())
}
if err := s.store.Delete(req.Digest); err != nil {
return nil, maybeNotFoundGRPC(err, req.Digest.String())
}
return &empty.Empty{}, nil
}
func (s *Service) Read(req *api.ReadRequest, session api.Content_ReadServer) error {
if err := req.Digest.Validate(); err != nil {
return grpc.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err)
}
oi, err := s.store.Info(req.Digest)
if err != nil {
return maybeNotFoundGRPC(err, req.Digest.String())
}
rc, err := s.store.Reader(session.Context(), req.Digest)
if err != nil {
return maybeNotFoundGRPC(err, req.Digest.String())
}
defer rc.Close() // TODO(stevvooe): Cache these file descriptors for performance.
ra, ok := rc.(io.ReaderAt)
if !ok {
// TODO(stevvooe): Need to set this up to get correct behavior across
// board. May change interface to store to just return ReaderAtCloser.
// Possibly, we could just return io.ReaderAt and handle file
// descriptors internally.
return errors.New("content service only supports content stores that return ReaderAt")
}
var (
offset = req.Offset
size = req.Size_
// TODO(stevvooe): Using the global buffer pool. At 32KB, it is probably
// little inefficient for work over a fast network. We can tune this later.
p = bufPool.Get().([]byte)
)
defer bufPool.Put(p)
if offset < 0 {
offset = 0
}
if size <= 0 {
size = oi.Size - offset
}
if offset+size > oi.Size {
return grpc.Errorf(codes.OutOfRange, "read past object length %v bytes", oi.Size)
}
if _, err := io.CopyBuffer(
&readResponseWriter{session: session},
io.NewSectionReader(ra, offset, size), p); err != nil {
return err
}
return nil
}
type readResponseWriter struct {
offset int64
session api.Content_ReadServer
}
func (rw *readResponseWriter) Write(p []byte) (n int, err error) {
if err := rw.session.Send(&api.ReadResponse{
Offset: rw.offset,
Data: p,
}); err != nil {
return 0, err
}
rw.offset += int64(len(p))
return len(p), nil
}
func (s *Service) Write(session api.Content_WriteServer) (err error) {
var (
ctx = session.Context()
msg api.WriteResponse
req *api.WriteRequest
ref string
total int64
expected digest.Digest
)
defer func(msg *api.WriteResponse) {
// pump through the last message if no error was encountered
if err != nil {
// TODO(stevvooe): Really need a log line here to track which
// errors are actually causing failure on the server side. May want
// to configure the service with an interceptor to make this work
// identically across all GRPC methods.
//
// This is pretty noisy, so we can remove it but leave it for now.
log.G(ctx).WithError(err).Error("(*Service).Write failed")
return
}
err = session.Send(msg)
}(&msg)
// handle the very first request!
req, err = session.Recv()
if err != nil {
return err
}
ref = req.Ref
if ref == "" {
return grpc.Errorf(codes.InvalidArgument, "first message must have a reference")
}
fields := logrus.Fields{
"ref": ref,
}
total = req.Total
expected = req.Expected
if total > 0 {
fields["total"] = total
}
if expected != "" {
fields["expected"] = expected
}
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(fields))
log.G(ctx).Debug("(*Service).Write started")
// this action locks the writer for the session.
wr, err := s.store.Writer(ctx, ref, total, expected)
if err != nil {
return err
}
defer wr.Close()
for {
msg.Action = req.Action
ws, err := wr.Status()
if err != nil {
return err
}
msg.Offset = ws.Offset // always set the offset.
// NOTE(stevvooe): In general, there are two cases underwhich a remote
// writer is used.
//
// For pull, we almost always have this before fetching large content,
// through descriptors. We allow predeclaration of the expected size
// and digest.
//
// For push, it is more complex. If we want to cut through content into
// storage, we may have no expectation until we are done processing the
// content. The case here is the following:
//
// 1. Start writing content.
// 2. Compress inline.
// 3. Validate digest and size (maybe).
//
// Supporting these two paths is quite awkward but it let's both API
// users use the same writer style for each with a minimum of overhead.
if req.Expected != "" {
if expected != "" && expected != req.Expected {
return grpc.Errorf(codes.InvalidArgument, "inconsistent digest provided: %v != %v", req.Expected, expected)
}
expected = req.Expected
if _, err := s.store.Info(req.Expected); err == nil {
if err := s.store.Abort(ref); err != nil {
log.G(ctx).WithError(err).Error("failed to abort write")
}
return grpc.Errorf(codes.AlreadyExists, "blob with expected digest %v exists", req.Expected)
}
}
if req.Total > 0 {
// Update the expected total. Typically, this could be seen at
// negotiation time or on a commit message.
if total > 0 && req.Total != total {
return grpc.Errorf(codes.InvalidArgument, "inconsistent total provided: %v != %v", req.Total, total)
}
total = req.Total
}
switch req.Action {
case api.WriteActionStat:
msg.Digest = wr.Digest()
msg.StartedAt = ws.StartedAt
msg.UpdatedAt = ws.UpdatedAt
msg.Total = total
case api.WriteActionWrite, api.WriteActionCommit:
if req.Offset > 0 {
// validate the offset if provided
if req.Offset != ws.Offset {
return grpc.Errorf(codes.OutOfRange, "write @%v must occur at current offset %v", req.Offset, ws.Offset)
}
}
if req.Offset == 0 && ws.Offset > 0 {
if err := wr.Truncate(req.Offset); err != nil {
return errors.Wrapf(err, "truncate failed")
}
msg.Offset = req.Offset
}
// issue the write if we actually have data.
if len(req.Data) > 0 {
// While this looks like we could use io.WriterAt here, because we
// maintain the offset as append only, we just issue the write.
n, err := wr.Write(req.Data)
if err != nil {
return err
}
if n != len(req.Data) {
// TODO(stevvooe): Perhaps, we can recover this by including it
// in the offset on the write return.
return grpc.Errorf(codes.DataLoss, "wrote %v of %v bytes", n, len(req.Data))
}
msg.Offset += int64(n)
}
if req.Action == api.WriteActionCommit {
if err := wr.Commit(total, expected); err != nil {
return err
}
msg.Digest = wr.Digest()
}
case api.WriteActionAbort:
return s.store.Abort(ref)
}
if err := session.Send(&msg); err != nil {
return err
}
req, err = session.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
}
return nil
}
func (s *Service) Status(*api.StatusRequest, api.Content_StatusServer) error {
return grpc.Errorf(codes.Unimplemented, "not implemented")
}
func maybeNotFoundGRPC(err error, id string) error {
if content.IsNotFound(err) {
return grpc.Errorf(codes.NotFound, "%v: not found", id)
}
return err
}