diff --git a/cmd/ctr/main.go b/cmd/ctr/main.go index 6ecfe27..c7c3f5a 100644 --- a/cmd/ctr/main.go +++ b/cmd/ctr/main.go @@ -38,6 +38,13 @@ containerd client Usage: "socket path for containerd's GRPC server", Value: "/run/containerd/containerd.sock", }, + cli.StringFlag{ + // TODO(stevvooe): for now, we allow circumventing the GRPC. Once + // we have clear separation, this will likely go away. + Name: "root", + Usage: "path to content store root", + Value: "/var/lib/containerd", + }, } app.Commands = []cli.Command{ runCommand, diff --git a/cmd/ctr/run.go b/cmd/ctr/run.go index 661cea3..85958c8 100644 --- a/cmd/ctr/run.go +++ b/cmd/ctr/run.go @@ -7,11 +7,16 @@ import ( "path/filepath" "runtime" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "github.com/Sirupsen/logrus" "github.com/crosbymichael/console" "github.com/docker/containerd/api/services/execution" - "github.com/docker/containerd/api/types/mount" + rootfsapi "github.com/docker/containerd/api/services/rootfs" + "github.com/docker/containerd/image" protobuf "github.com/gogo/protobuf/types" + "github.com/opencontainers/image-spec/identity" "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" "github.com/urfave/cli" @@ -176,6 +181,7 @@ var runCommand = cli.Command{ }, }, Action: func(context *cli.Context) error { + ctx := gocontext.Background() id := context.String("id") if id == "" { return errors.New("container id must be provided") @@ -189,29 +195,69 @@ var runCommand = cli.Command{ if err != nil { return err } - events, err := containers.Events(gocontext.Background(), &execution.EventsRequest{}) + events, err := containers.Events(ctx, &execution.EventsRequest{}) if err != nil { return err } - abs, err := filepath.Abs(context.String("rootfs")) + + provider, err := getContentProvider(context) if err != nil { return err } - // for ctr right now just do a bind mount - rootfs := []*mount.Mount{ - { - Type: "bind", - Source: abs, - Options: []string{ - "rw", - "rbind", - }, - }, + + rootfsClient, err := getRootFSService(context) + if err != nil { + return err } + db, err := getDB(context, false) + if err != nil { + return errors.Wrap(err, "failed opening database") + } + defer db.Close() + + tx, err := db.Begin(false) + if err != nil { + return err + } + defer tx.Rollback() + + ref := context.Args().First() + + im, err := image.Get(tx, ref) + if err != nil { + return errors.Wrapf(err, "could not resolve %q", ref) + } + // let's close out our db and tx so we don't hold the lock whilst running. + tx.Rollback() + db.Close() + + diffIDs, err := im.RootFS(ctx, provider) + if err != nil { + return err + } + + if _, err := rootfsClient.Prepare(gocontext.TODO(), &rootfsapi.PrepareRequest{ + Name: id, + ChainID: identity.ChainID(diffIDs), + }); err != nil { + if grpc.Code(err) != codes.AlreadyExists { + return err + } + } + + resp, err := rootfsClient.Mounts(gocontext.TODO(), &rootfsapi.MountsRequest{ + Name: id, + }) + if err != nil { + return err + } + + rootfs := resp.Mounts + var s *specs.Spec if config := context.String("runtime-config"); config == "" { - s = spec(id, []string(context.Args()), context.Bool("tty")) + s = spec(id, []string(context.Args().Tail()), context.Bool("tty")) } else { s, err = customSpec(config) if err != nil { @@ -251,6 +297,7 @@ var runCommand = cli.Command{ if err != nil { return err } + if _, err := containers.Start(gocontext.Background(), &execution.StartRequest{ ID: response.ID, }); err != nil { diff --git a/cmd/ctr/utils.go b/cmd/ctr/utils.go index df2f790..a68cee2 100644 --- a/cmd/ctr/utils.go +++ b/cmd/ctr/utils.go @@ -14,8 +14,14 @@ import ( gocontext "context" + "github.com/boltdb/bolt" + contentapi "github.com/docker/containerd/api/services/content" "github.com/docker/containerd/api/services/execution" + rootfsapi "github.com/docker/containerd/api/services/rootfs" "github.com/docker/containerd/api/types/container" + "github.com/docker/containerd/content" + "github.com/docker/containerd/image" + contentservice "github.com/docker/containerd/services/content" "github.com/pkg/errors" "github.com/tonistiigi/fifo" "github.com/urfave/cli" @@ -112,6 +118,43 @@ func getExecutionService(context *cli.Context) (execution.ContainerServiceClient return execution.NewContainerServiceClient(conn), nil } +func getContentProvider(context *cli.Context) (content.Provider, error) { + conn, err := getGRPCConnection(context) + if err != nil { + return nil, err + } + return contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)), nil +} + +func getRootFSService(context *cli.Context) (rootfsapi.RootFSClient, error) { + conn, err := getGRPCConnection(context) + if err != nil { + return nil, err + } + return rootfsapi.NewRootFSClient(conn), nil +} + +func getDB(ctx *cli.Context, readonly bool) (*bolt.DB, error) { + // TODO(stevvooe): For now, we operate directly on the database. We will + // replace this with a GRPC service when the details are more concrete. + path := filepath.Join(ctx.GlobalString("root"), "meta.db") + + db, err := bolt.Open(path, 0644, &bolt.Options{ + ReadOnly: readonly, + }) + if err != nil { + return nil, err + } + + if !readonly { + if err := image.InitDB(db); err != nil { + return nil, err + } + } + + return db, nil +} + func getTempDir(id string) (string, error) { err := os.MkdirAll(filepath.Join(os.TempDir(), "ctr"), 0700) if err != nil { diff --git a/cmd/dist/common.go b/cmd/dist/common.go index 0411fa0..ff00710 100644 --- a/cmd/dist/common.go +++ b/cmd/dist/common.go @@ -5,7 +5,9 @@ import ( "path/filepath" "time" + "github.com/boltdb/bolt" "github.com/docker/containerd/content" + "github.com/docker/containerd/image" "github.com/urfave/cli" "google.golang.org/grpc" ) @@ -34,3 +36,24 @@ func connectGRPC(context *cli.Context) (*grpc.ClientConn, error) { }), ) } + +func getDB(ctx *cli.Context, readonly bool) (*bolt.DB, error) { + // TODO(stevvooe): For now, we operate directly on the database. We will + // replace this with a GRPC service when the details are more concrete. + path := filepath.Join(ctx.GlobalString("root"), "meta.db") + + db, err := bolt.Open(path, 0644, &bolt.Options{ + ReadOnly: readonly, + }) + if err != nil { + return nil, err + } + + if !readonly { + if err := image.InitDB(db); err != nil { + return nil, err + } + } + + return db, nil +} diff --git a/cmd/dist/images.go b/cmd/dist/images.go new file mode 100644 index 0000000..6b66569 --- /dev/null +++ b/cmd/dist/images.go @@ -0,0 +1,46 @@ +package main + +import ( + "fmt" + "os" + "text/tabwriter" + + "github.com/docker/containerd/image" + "github.com/docker/containerd/progress" + "github.com/pkg/errors" + "github.com/urfave/cli" +) + +var imagesCommand = cli.Command{ + Name: "images", + Usage: "list images known to containerd", + ArgsUsage: "[flags] ", + Description: `List images registered with containerd.`, + Flags: []cli.Flag{}, + Action: func(clicontext *cli.Context) error { + db, err := getDB(clicontext, true) + if err != nil { + return errors.Wrap(err, "failed to open database") + } + + tx, err := db.Begin(false) + if err != nil { + return errors.Wrap(err, "could not start transaction") + } + defer tx.Rollback() + + images, err := image.List(tx) + if err != nil { + return errors.Wrap(err, "failed to list images") + } + + tw := tabwriter.NewWriter(os.Stdout, 1, 8, 1, ' ', 0) + fmt.Fprintln(tw, "REF\tTYPE\tDIGEST\tSIZE\t") + for _, image := range images { + fmt.Fprintf(tw, "%v\t%v\t%v\t%v\t\n", image.Name, image.Descriptor.MediaType, image.Descriptor.Digest, progress.Bytes(image.Descriptor.Size)) + } + tw.Flush() + + return nil + }, +} diff --git a/cmd/dist/main.go b/cmd/dist/main.go index f29af17..d0070cf 100644 --- a/cmd/dist/main.go +++ b/cmd/dist/main.go @@ -61,6 +61,8 @@ distribution tool }, } app.Commands = []cli.Command{ + imagesCommand, + pullCommand, fetchCommand, fetchObjectCommand, ingestCommand, diff --git a/cmd/dist/pull.go b/cmd/dist/pull.go new file mode 100644 index 0000000..a3f2121 --- /dev/null +++ b/cmd/dist/pull.go @@ -0,0 +1,245 @@ +package main + +import ( + "context" + "encoding/json" + "os" + "text/tabwriter" + "time" + + contentapi "github.com/docker/containerd/api/services/content" + rootfsapi "github.com/docker/containerd/api/services/rootfs" + "github.com/docker/containerd/content" + "github.com/docker/containerd/image" + "github.com/docker/containerd/log" + "github.com/docker/containerd/progress" + "github.com/docker/containerd/remotes" + contentservice "github.com/docker/containerd/services/content" + rootfsservice "github.com/docker/containerd/services/rootfs" + "github.com/opencontainers/image-spec/identity" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/urfave/cli" + "golang.org/x/sync/errgroup" +) + +var pullCommand = cli.Command{ + Name: "pull", + Usage: "pull an image from a remote", + ArgsUsage: "[flags] ", + Description: `Fetch and prepare an image for use in containerd. + +After pulling an image, it should be ready to use the same reference in a run +command. As part of this process, we do the following: + +1. Fetch all resources into containerd. +2. Prepare the snapshot filesystem with the pulled resources. +3. Register metadata for the image. +`, + Flags: []cli.Flag{}, + Action: func(clicontext *cli.Context) error { + var ( + ctx = background + ref = clicontext.Args().First() + ) + + conn, err := connectGRPC(clicontext) + if err != nil { + return err + } + + db, err := getDB(clicontext, false) + if err != nil { + return err + } + defer db.Close() + + tx, err := db.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + + resolver, err := getResolver(ctx) + if err != nil { + return err + } + ongoing := newJobs() + + ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn)) + provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)) + cs, err := resolveContentStore(clicontext) + if err != nil { + return err + } + + eg, ctx := errgroup.WithContext(ctx) + + var resolvedImageName string + resolved := make(chan struct{}) + eg.Go(func() error { + ongoing.add(ref) + name, desc, fetcher, err := resolver.Resolve(ctx, ref) + if err != nil { + return err + } + log.G(ctx).WithField("image", name).Debug("fetching") + resolvedImageName = name + close(resolved) + + eg.Go(func() error { + return image.Register(tx, name, desc) + }) + defer func() { + if err := tx.Commit(); err != nil { + log.G(ctx).WithError(err).Error("commit failed") + } + }() + + return image.Dispatch(ctx, + image.Handlers(image.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + ongoing.add(remotes.MakeRefKey(ctx, desc)) + return nil, nil + }), + remotes.FetchHandler(ingester, fetcher), + image.ChildrenHandler(provider)), + desc) + + }) + + errs := make(chan error) + go func() { + defer close(errs) + errs <- eg.Wait() + }() + + defer func() { + ctx := context.Background() + tx, err := db.Begin(false) + if err != nil { + log.G(ctx).Fatal(err) + } + + // TODO(stevvooe): This section unpacks the layers and resolves the + // root filesystem chainid for the image. For now, we just print + // it, but we should keep track of this in the metadata storage. + + image, err := image.Get(tx, resolvedImageName) + if err != nil { + log.G(ctx).Fatal(err) + } + + provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)) + + p, err := content.ReadBlob(ctx, provider, image.Descriptor.Digest) + if err != nil { + log.G(ctx).Fatal(err) + } + + var manifest ocispec.Manifest + if err := json.Unmarshal(p, &manifest); err != nil { + log.G(ctx).Fatal(err) + } + + rootfs := rootfsservice.NewUnpackerFromClient(rootfsapi.NewRootFSClient(conn)) + + log.G(ctx).Info("unpacking rootfs") + chainID, err := rootfs.Unpack(ctx, manifest.Layers) + if err != nil { + log.G(ctx).Fatal(err) + } + + diffIDs, err := image.RootFS(ctx, provider) + if err != nil { + log.G(ctx).WithError(err).Fatal("failed resolving rootfs") + } + + expectedChainID := identity.ChainID(diffIDs) + if expectedChainID != chainID { + log.G(ctx).Fatal("rootfs service did not match chainid") + } + }() + + var ( + ticker = time.NewTicker(100 * time.Millisecond) + fw = progress.NewWriter(os.Stdout) + start = time.Now() + done bool + ) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + fw.Flush() + + tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0) + js := ongoing.jobs() + statuses := map[string]statusInfo{} + + activeSeen := map[string]struct{}{} + if !done { + active, err := cs.Active() + if err != nil { + log.G(ctx).WithError(err).Error("active check failed") + continue + } + // update status of active entries! + for _, active := range active { + statuses[active.Ref] = statusInfo{ + Ref: active.Ref, + Status: "downloading", + Offset: active.Offset, + Total: active.Total, + StartedAt: active.StartedAt, + UpdatedAt: active.UpdatedAt, + } + activeSeen[active.Ref] = struct{}{} + } + } + + // now, update the items in jobs that are not in active + for _, j := range js { + if _, ok := activeSeen[j]; ok { + continue + } + status := "done" + + if j == ref { + select { + case <-resolved: + status = "resolved" + default: + status = "resolving" + } + } + + statuses[j] = statusInfo{ + Ref: j, + Status: status, // for now! + } + } + + var ordered []statusInfo + for _, j := range js { + ordered = append(ordered, statuses[j]) + } + + display(tw, ordered, start) + tw.Flush() + + if done { + fw.Flush() + return nil + } + case err := <-errs: + if err != nil { + return err + } + done = true + case <-ctx.Done(): + done = true // allow ui to update once more + } + } + + }, +} diff --git a/content/helpers.go b/content/helpers.go index b92d011..859de70 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -10,6 +10,19 @@ import ( "github.com/pkg/errors" ) +// ReadBlob retrieves the entire contents of the blob from the provider. +// +// Avoid using this for large blobs, such as layers. +func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byte, error) { + rc, err := provider.Reader(ctx, dgst) + if err != nil { + return nil, err + } + defer rc.Close() + + return ioutil.ReadAll(rc) +} + // WriteBlob writes data with the expected digest into the content store. If // expected already exists, the method returns immediately and the reader will // not be consumed. diff --git a/image/handlers.go b/image/handlers.go index 44c7563..4f97b95 100644 --- a/image/handlers.go +++ b/image/handlers.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io/ioutil" "github.com/docker/containerd/content" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -131,13 +130,7 @@ func ChildrenHandler(provider content.Provider) HandlerFunc { return nil, fmt.Errorf("%v not yet supported", desc.MediaType) } - rc, err := provider.Reader(ctx, desc.Digest) - if err != nil { - return nil, err - } - defer rc.Close() - - p, err := ioutil.ReadAll(rc) + p, err := content.ReadBlob(ctx, provider, desc.Digest) if err != nil { return nil, err } diff --git a/image/image.go b/image/image.go new file mode 100644 index 0000000..edacda1 --- /dev/null +++ b/image/image.go @@ -0,0 +1,88 @@ +package image + +import ( + "context" + "encoding/json" + "errors" + "io/ioutil" + + "github.com/docker/containerd/content" + "github.com/docker/containerd/log" + digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +// Image provides the model for how containerd views container images. +type Image struct { + Name string + Descriptor ocispec.Descriptor +} + +// TODO(stevvooe): Many of these functions make strong platform assumptions, +// which are untrue in a lot of cases. More refactoring must be done here to +// make this work in all cases. + +// Config resolves the image configuration descriptor. +// +// The caller can then use the descriptor to resolve and process the +// configuration of the image. +func (image *Image) Config(ctx context.Context, provider content.Provider) (ocispec.Descriptor, error) { + var configDesc ocispec.Descriptor + return configDesc, Walk(ctx, HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + switch image.Descriptor.MediaType { + case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: + rc, err := provider.Reader(ctx, image.Descriptor.Digest) + if err != nil { + return nil, err + } + defer rc.Close() + + p, err := ioutil.ReadAll(rc) + if err != nil { + return nil, err + } + + var manifest ocispec.Manifest + if err := json.Unmarshal(p, &manifest); err != nil { + return nil, err + } + + configDesc = manifest.Config + + return nil, nil + default: + return nil, errors.New("could not resolve config") + } + + }), image.Descriptor) +} + +// RootFS returns the unpacked diffids that make up and images rootfs. +// +// These are used to verify that a set of layers unpacked to the expected +// values. +func (image *Image) RootFS(ctx context.Context, provider content.Provider) ([]digest.Digest, error) { + desc, err := image.Config(ctx, provider) + if err != nil { + return nil, err + } + + p, err := content.ReadBlob(ctx, provider, desc.Digest) + if err != nil { + log.G(ctx).Fatal(err) + } + + var config ocispec.Image + if err := json.Unmarshal(p, &config); err != nil { + log.G(ctx).Fatal(err) + } + + // TODO(stevvooe): Remove this bit when OCI structure uses correct type for + // rootfs.DiffIDs. + var diffIDs []digest.Digest + for _, diffID := range config.RootFS.DiffIDs { + diffIDs = append(diffIDs, digest.Digest(diffID)) + } + + return diffIDs, nil +} diff --git a/image/storage.go b/image/storage.go new file mode 100644 index 0000000..28b3070 --- /dev/null +++ b/image/storage.go @@ -0,0 +1,181 @@ +package image + +import ( + "encoding/binary" + "fmt" + + "github.com/boltdb/bolt" + "github.com/docker/containerd/log" + digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +var ( + errImageUnknown = fmt.Errorf("image: unknown") + errNoTx = fmt.Errorf("no transaction available") +) + +var ( + bucketKeyStorageVersion = []byte("v1") + bucketKeyImages = []byte("images") + bucketKeyDigest = []byte("digest") + bucketKeyMediaType = []byte("mediatype") + bucketKeySize = []byte("size") +) + +// TODO(stevvooe): This file comprises the data required to implement the +// "metadata" store. For now, it is bound tightly to the local machine and bolt +// but we can take this and use it to define a service interface. + +func InitDB(db *bolt.DB) error { + log.L.Debug("init db") + return db.Update(func(tx *bolt.Tx) error { + _, err := createBucketIfNotExists(tx, bucketKeyStorageVersion, bucketKeyImages) + return err + }) +} + +func Register(tx *bolt.Tx, name string, desc ocispec.Descriptor) error { + return withImagesBucket(tx, func(bkt *bolt.Bucket) error { + ibkt, err := bkt.CreateBucketIfNotExists([]byte(name)) + if err != nil { + return err + } + + var ( + buf [binary.MaxVarintLen64]byte + sizeEncoded []byte = buf[:] + ) + sizeEncoded = sizeEncoded[:binary.PutVarint(sizeEncoded, desc.Size)] + + if len(sizeEncoded) == 0 { + return fmt.Errorf("failed encoding size = %v", desc.Size) + } + + for _, v := range [][2][]byte{ + {bucketKeyDigest, []byte(desc.Digest)}, + {bucketKeyMediaType, []byte(desc.MediaType)}, + {bucketKeySize, sizeEncoded}, + } { + if err := ibkt.Put(v[0], v[1]); err != nil { + return err + } + } + + return nil + }) +} + +func Get(tx *bolt.Tx, name string) (Image, error) { + var image Image + if err := withImageBucket(tx, name, func(bkt *bolt.Bucket) error { + image.Name = name + return readImage(&image, bkt) + }); err != nil { + return Image{}, err + } + + return image, nil +} + +func List(tx *bolt.Tx) ([]Image, error) { + var images []Image + + if err := withImagesBucket(tx, func(bkt *bolt.Bucket) error { + return bkt.ForEach(func(k, v []byte) error { + var ( + image = Image{ + Name: string(k), + } + kbkt = bkt.Bucket(k) + ) + + if err := readImage(&image, kbkt); err != nil { + return err + } + + images = append(images, image) + return nil + }) + }); err != nil { + return nil, err + } + + return images, nil +} + +func readImage(image *Image, bkt *bolt.Bucket) error { + return bkt.ForEach(func(k, v []byte) error { + if v == nil { + return nil // skip it? a bkt maybe? + } + + // TODO(stevvooe): This is why we need to use byte values for + // keys, rather than full arrays. + switch string(k) { + case string(bucketKeyDigest): + image.Descriptor.Digest = digest.Digest(v) + case string(bucketKeyMediaType): + image.Descriptor.MediaType = string(v) + case string(bucketKeySize): + image.Descriptor.Size, _ = binary.Varint(v) + } + + return nil + }) +} + +func createBucketIfNotExists(tx *bolt.Tx, keys ...[]byte) (*bolt.Bucket, error) { + bkt, err := tx.CreateBucketIfNotExists(keys[0]) + if err != nil { + return nil, err + } + + for _, key := range keys[1:] { + bkt, err = bkt.CreateBucketIfNotExists(key) + if err != nil { + return nil, err + } + } + + return bkt, nil +} + +func withImagesBucket(tx *bolt.Tx, fn func(bkt *bolt.Bucket) error) error { + bkt := getImagesBucket(tx) + if bkt == nil { + return errImageUnknown + } + + return fn(bkt) +} + +func withImageBucket(tx *bolt.Tx, name string, fn func(bkt *bolt.Bucket) error) error { + bkt := getImageBucket(tx, name) + if bkt == nil { + return errImageUnknown + } + + return fn(bkt) +} + +func getImagesBucket(tx *bolt.Tx) *bolt.Bucket { + return getBucket(tx, bucketKeyStorageVersion, bucketKeyImages) +} + +func getImageBucket(tx *bolt.Tx, name string) *bolt.Bucket { + return getBucket(tx, bucketKeyStorageVersion, bucketKeyImages, []byte(name)) +} + +func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket { + bkt := tx.Bucket(keys[0]) + + for _, key := range keys[1:] { + if bkt == nil { + break + } + bkt = bkt.Bucket(key) + } + + return bkt +}