containerd/services/content/service.go
Stephen J Day 8c74da3983
cmd/dist, cmd/ctr: move image store access to GRPC
With this changeset, image store access is now moved to completely
accessible over GRPC. No clients manipulate the image store database
directly and the GRPC client is fully featured. The metadata database is
now managed by the daemon and access coordinated via services.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-04-04 10:35:55 -07:00

341 lines
8.4 KiB
Go

package content
import (
"io"
"sync"
"github.com/Sirupsen/logrus"
api "github.com/containerd/containerd/api/services/content"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/plugin"
"github.com/golang/protobuf/ptypes/empty"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
type Service struct {
store *content.Store
}
var bufPool = sync.Pool{
New: func() interface{} {
return make([]byte, 1<<20)
},
}
var _ api.ContentServer = &Service{}
func init() {
plugin.Register("content-grpc", &plugin.Registration{
Type: plugin.GRPCPlugin,
Init: NewService,
})
}
func NewService(ic *plugin.InitContext) (interface{}, error) {
return &Service{
store: ic.Content,
}, nil
}
func (s *Service) Register(server *grpc.Server) error {
api.RegisterContentServer(server, s)
return nil
}
func (s *Service) Info(ctx context.Context, req *api.InfoRequest) (*api.InfoResponse, error) {
if err := req.Digest.Validate(); err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, "%q failed validation", req.Digest)
}
bi, err := s.store.Info(req.Digest)
if err != nil {
return nil, maybeNotFoundGRPC(err, req.Digest.String())
}
return &api.InfoResponse{
Digest: req.Digest,
Size_: bi.Size,
CommittedAt: bi.CommittedAt,
}, nil
}
func (s *Service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*empty.Empty, error) {
if err := req.Digest.Validate(); err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, err.Error())
}
if err := s.store.Delete(req.Digest); err != nil {
return nil, maybeNotFoundGRPC(err, req.Digest.String())
}
return &empty.Empty{}, nil
}
func (s *Service) Read(req *api.ReadRequest, session api.Content_ReadServer) error {
if err := req.Digest.Validate(); err != nil {
return grpc.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err)
}
oi, err := s.store.Info(req.Digest)
if err != nil {
return maybeNotFoundGRPC(err, req.Digest.String())
}
rc, err := s.store.Reader(session.Context(), req.Digest)
if err != nil {
return maybeNotFoundGRPC(err, req.Digest.String())
}
defer rc.Close() // TODO(stevvooe): Cache these file descriptors for performance.
ra, ok := rc.(io.ReaderAt)
if !ok {
// TODO(stevvooe): Need to set this up to get correct behavior across
// board. May change interface to store to just return ReaderAtCloser.
// Possibly, we could just return io.ReaderAt and handle file
// descriptors internally.
return errors.New("content service only supports content stores that return ReaderAt")
}
var (
offset = req.Offset
size = req.Size_
// TODO(stevvooe): Using the global buffer pool. At 32KB, it is probably
// little inefficient for work over a fast network. We can tune this later.
p = bufPool.Get().([]byte)
)
defer bufPool.Put(p)
if offset < 0 {
offset = 0
}
if size <= 0 {
size = oi.Size - offset
}
if offset+size > oi.Size {
return grpc.Errorf(codes.OutOfRange, "read past object length %v bytes", oi.Size)
}
if _, err := io.CopyBuffer(
&readResponseWriter{session: session},
io.NewSectionReader(ra, offset, size), p); err != nil {
return err
}
return nil
}
type readResponseWriter struct {
offset int64
session api.Content_ReadServer
}
func (rw *readResponseWriter) Write(p []byte) (n int, err error) {
if err := rw.session.Send(&api.ReadResponse{
Offset: rw.offset,
Data: p,
}); err != nil {
return 0, err
}
rw.offset += int64(len(p))
return len(p), nil
}
func (s *Service) Write(session api.Content_WriteServer) (err error) {
var (
ctx = session.Context()
msg api.WriteResponse
req *api.WriteRequest
ref string
total int64
expected digest.Digest
)
defer func(msg *api.WriteResponse) {
// pump through the last message if no error was encountered
if err != nil {
// TODO(stevvooe): Really need a log line here to track which
// errors are actually causing failure on the server side. May want
// to configure the service with an interceptor to make this work
// identically across all GRPC methods.
//
// This is pretty noisy, so we can remove it but leave it for now.
log.G(ctx).WithError(err).Error("(*Service).Write failed")
return
}
err = session.Send(msg)
}(&msg)
// handle the very first request!
req, err = session.Recv()
if err != nil {
return err
}
ref = req.Ref
if ref == "" {
return grpc.Errorf(codes.InvalidArgument, "first message must have a reference")
}
fields := logrus.Fields{
"ref": ref,
}
total = req.Total
expected = req.Expected
if total > 0 {
fields["total"] = total
}
if expected != "" {
fields["expected"] = expected
}
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(fields))
log.G(ctx).Debug("(*Service).Write started")
// this action locks the writer for the session.
wr, err := s.store.Writer(ctx, ref, total, expected)
if err != nil {
return err
}
defer wr.Close()
for {
msg.Action = req.Action
ws, err := wr.Status()
if err != nil {
return err
}
msg.Offset = ws.Offset // always set the offset.
// NOTE(stevvooe): In general, there are two cases underwhich a remote
// writer is used.
//
// For pull, we almost always have this before fetching large content,
// through descriptors. We allow predeclaration of the expected size
// and digest.
//
// For push, it is more complex. If we want to cut through content into
// storage, we may have no expectation until we are done processing the
// content. The case here is the following:
//
// 1. Start writing content.
// 2. Compress inline.
// 3. Validate digest and size (maybe).
//
// Supporting these two paths is quite awkward but it let's both API
// users use the same writer style for each with a minimum of overhead.
if req.Expected != "" {
if expected != "" && expected != req.Expected {
return grpc.Errorf(codes.InvalidArgument, "inconsistent digest provided: %v != %v", req.Expected, expected)
}
expected = req.Expected
if _, err := s.store.Info(req.Expected); err == nil {
if err := s.store.Abort(ref); err != nil {
log.G(ctx).WithError(err).Error("failed to abort write")
}
return grpc.Errorf(codes.AlreadyExists, "blob with expected digest %v exists", req.Expected)
}
}
if req.Total > 0 {
// Update the expected total. Typically, this could be seen at
// negotiation time or on a commit message.
if total > 0 && req.Total != total {
return grpc.Errorf(codes.InvalidArgument, "inconsistent total provided: %v != %v", req.Total, total)
}
total = req.Total
}
switch req.Action {
case api.WriteActionStat:
msg.Digest = wr.Digest()
msg.StartedAt = ws.StartedAt
msg.UpdatedAt = ws.UpdatedAt
msg.Total = total
case api.WriteActionWrite, api.WriteActionCommit:
if req.Offset > 0 {
// validate the offset if provided
if req.Offset != ws.Offset {
return grpc.Errorf(codes.OutOfRange, "write @%v must occur at current offset %v", req.Offset, ws.Offset)
}
}
if req.Offset == 0 && ws.Offset > 0 {
if err := wr.Truncate(req.Offset); err != nil {
return errors.Wrapf(err, "truncate failed")
}
msg.Offset = req.Offset
}
// issue the write if we actually have data.
if len(req.Data) > 0 {
// While this looks like we could use io.WriterAt here, because we
// maintain the offset as append only, we just issue the write.
n, err := wr.Write(req.Data)
if err != nil {
return err
}
if n != len(req.Data) {
// TODO(stevvooe): Perhaps, we can recover this by including it
// in the offset on the write return.
return grpc.Errorf(codes.DataLoss, "wrote %v of %v bytes", n, len(req.Data))
}
msg.Offset += int64(n)
}
if req.Action == api.WriteActionCommit {
if err := wr.Commit(total, expected); err != nil {
return err
}
msg.Digest = wr.Digest()
}
case api.WriteActionAbort:
return s.store.Abort(ref)
}
if err := session.Send(&msg); err != nil {
return err
}
req, err = session.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
}
return nil
}
func (s *Service) Status(*api.StatusRequest, api.Content_StatusServer) error {
return grpc.Errorf(codes.Unimplemented, "not implemented")
}
func maybeNotFoundGRPC(err error, id string) error {
if content.IsNotFound(err) {
return grpc.Errorf(codes.NotFound, "%v: not found", id)
}
return err
}