diff --git a/cmd/containerd/builtins.go b/cmd/containerd/builtins.go index e9ee8ad..793e446 100644 --- a/cmd/containerd/builtins.go +++ b/cmd/containerd/builtins.go @@ -7,6 +7,7 @@ import ( _ "github.com/containerd/containerd/services/content" _ "github.com/containerd/containerd/services/execution" _ "github.com/containerd/containerd/services/healthcheck" + _ "github.com/containerd/containerd/services/images" _ "github.com/containerd/containerd/services/metrics" _ "github.com/containerd/containerd/services/rootfs" _ "github.com/containerd/containerd/snapshot/btrfs" diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index fa0bffd..2c0227a 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -11,6 +11,7 @@ import ( "syscall" "time" + "github.com/boltdb/bolt" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" gocontext "golang.org/x/net/context" "google.golang.org/grpc" @@ -19,8 +20,10 @@ import ( "github.com/containerd/containerd" contentapi "github.com/containerd/containerd/api/services/content" api "github.com/containerd/containerd/api/services/execution" + imagesapi "github.com/containerd/containerd/api/services/images" rootfsapi "github.com/containerd/containerd/api/services/rootfs" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/reaper" @@ -115,11 +118,16 @@ func main() { if err != nil { return err } + meta, err := resolveMetaDB(context) + if err != nil { + return err + } + defer meta.Close() snapshotter, err := loadSnapshotter(store) if err != nil { return err } - services, err := loadServices(runtimes, store, snapshotter) + services, err := loadServices(runtimes, store, snapshotter, meta) if err != nil { return err } @@ -240,6 +248,22 @@ func resolveContentStore() (*content.Store, error) { return content.NewStore(cp) } +func resolveMetaDB(ctx *cli.Context) (*bolt.DB, error) { + path := filepath.Join(conf.Root, "meta.db") + + db, err := bolt.Open(path, 0644, nil) + if err != nil { + return nil, err + } + + // TODO(stevvooe): Break these down into components to be initialized. + if err := images.InitDB(db); err != nil { + return nil, err + } + + return db, nil +} + func loadRuntimes(monitor plugin.ContainerMonitor) (map[string]containerd.Runtime, error) { o := make(map[string]containerd.Runtime) for name, rr := range plugin.Registrations() { @@ -306,7 +330,7 @@ func loadSnapshotter(store *content.Store) (snapshot.Snapshotter, error) { ic := &plugin.InitContext{ Root: conf.Root, State: conf.State, - Store: store, + Content: store, Context: log.WithModule(global, moduleName), } if sr.Config != nil { @@ -333,7 +357,7 @@ func newGRPCServer() *grpc.Server { return s } -func loadServices(runtimes map[string]containerd.Runtime, store *content.Store, sn snapshot.Snapshotter) ([]plugin.Service, error) { +func loadServices(runtimes map[string]containerd.Runtime, store *content.Store, sn snapshot.Snapshotter, meta *bolt.DB) ([]plugin.Service, error) { var o []plugin.Service for name, sr := range plugin.Registrations() { if sr.Type != plugin.GRPCPlugin { @@ -345,7 +369,8 @@ func loadServices(runtimes map[string]containerd.Runtime, store *content.Store, State: conf.State, Context: log.WithModule(global, fmt.Sprintf("service-%s", name)), Runtimes: runtimes, - Store: store, + Content: store, + Meta: meta, Snapshotter: sn, } if sr.Config != nil { @@ -397,6 +422,8 @@ func interceptor(ctx gocontext.Context, ctx = log.WithModule(ctx, "content") case rootfsapi.RootFSServer: ctx = log.WithModule(ctx, "rootfs") + case imagesapi.ImagesServer: + ctx = log.WithModule(ctx, "images") default: fmt.Printf("unknown GRPC server type: %#v\n", info.Server) } diff --git a/cmd/ctr/run.go b/cmd/ctr/run.go index d1a4dd8..1152c36 100644 --- a/cmd/ctr/run.go +++ b/cmd/ctr/run.go @@ -277,27 +277,18 @@ var runCommand = cli.Command{ return err } - db, err := getDB(context, false) + imageStore, err := getImageStore(context) if err != nil { - return errors.Wrap(err, "failed opening database") + return errors.Wrap(err, "failed resolving image store") } - defer db.Close() - - tx, err := db.Begin(false) - if err != nil { - return err - } - defer tx.Rollback() ref := context.Args().First() - image, err := images.Get(tx, ref) + image, err := imageStore.Get(ctx, ref) if err != nil { return errors.Wrapf(err, "could not resolve %q", ref) } // let's close out our db and tx so we don't hold the lock whilst running. - tx.Rollback() - db.Close() diffIDs, err := image.RootFS(ctx, provider) if err != nil { diff --git a/cmd/ctr/utils.go b/cmd/ctr/utils.go index e727cbf..7dc3628 100644 --- a/cmd/ctr/utils.go +++ b/cmd/ctr/utils.go @@ -14,14 +14,15 @@ import ( gocontext "context" - "github.com/boltdb/bolt" contentapi "github.com/containerd/containerd/api/services/content" "github.com/containerd/containerd/api/services/execution" + imagesapi "github.com/containerd/containerd/api/services/images" rootfsapi "github.com/containerd/containerd/api/services/rootfs" "github.com/containerd/containerd/api/types/container" "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" contentservice "github.com/containerd/containerd/services/content" + imagesservice "github.com/containerd/containerd/services/images" "github.com/pkg/errors" "github.com/tonistiigi/fifo" "github.com/urfave/cli" @@ -134,25 +135,12 @@ func getRootFSService(context *cli.Context) (rootfsapi.RootFSClient, error) { return rootfsapi.NewRootFSClient(conn), nil } -func getDB(ctx *cli.Context, readonly bool) (*bolt.DB, error) { - // TODO(stevvooe): For now, we operate directly on the database. We will - // replace this with a GRPC service when the details are more concrete. - path := filepath.Join(ctx.GlobalString("root"), "meta.db") - - db, err := bolt.Open(path, 0644, &bolt.Options{ - ReadOnly: readonly, - }) +func getImageStore(clicontext *cli.Context) (images.Store, error) { + conn, err := getGRPCConnection(clicontext) if err != nil { return nil, err } - - if !readonly { - if err := images.InitDB(db); err != nil { - return nil, err - } - } - - return db, nil + return imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)), nil } func getTempDir(id string) (string, error) { diff --git a/cmd/dist/common.go b/cmd/dist/common.go index ab52816..1c819d0 100644 --- a/cmd/dist/common.go +++ b/cmd/dist/common.go @@ -6,11 +6,12 @@ import ( "path/filepath" "time" - "github.com/boltdb/bolt" + imagesapi "github.com/containerd/containerd/api/services/images" "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" + imagesservice "github.com/containerd/containerd/services/images" "github.com/urfave/cli" "google.golang.org/grpc" ) @@ -27,6 +28,14 @@ func resolveContentStore(context *cli.Context) (*content.Store, error) { return content.NewStore(root) } +func resolveImageStore(clicontext *cli.Context) (images.Store, error) { + conn, err := connectGRPC(clicontext) + if err != nil { + return nil, err + } + return imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)), nil +} + func connectGRPC(context *cli.Context) (*grpc.ClientConn, error) { socket := context.GlobalString("socket") timeout := context.GlobalDuration("connect-timeout") @@ -40,27 +49,6 @@ func connectGRPC(context *cli.Context) (*grpc.ClientConn, error) { ) } -func getDB(ctx *cli.Context, readonly bool) (*bolt.DB, error) { - // TODO(stevvooe): For now, we operate directly on the database. We will - // replace this with a GRPC service when the details are more concrete. - path := filepath.Join(ctx.GlobalString("root"), "meta.db") - - db, err := bolt.Open(path, 0644, &bolt.Options{ - ReadOnly: readonly, - }) - if err != nil { - return nil, err - } - - if !readonly { - if err := images.InitDB(db); err != nil { - return nil, err - } - } - - return db, nil -} - // getResolver prepares the resolver from the environment and options. func getResolver(ctx context.Context) (remotes.Resolver, error) { return docker.NewResolver(), nil diff --git a/cmd/dist/images.go b/cmd/dist/images.go index 1647855..984ae82 100644 --- a/cmd/dist/images.go +++ b/cmd/dist/images.go @@ -6,7 +6,6 @@ import ( "text/tabwriter" contentapi "github.com/containerd/containerd/api/services/content" - "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" "github.com/containerd/containerd/progress" contentservice "github.com/containerd/containerd/services/content" @@ -25,23 +24,19 @@ var imagesCommand = cli.Command{ ctx = background ) - db, err := getDB(clicontext, true) + imageStore, err := resolveImageStore(clicontext) if err != nil { - return errors.Wrap(err, "failed to open database") + return err } - tx, err := db.Begin(false) - if err != nil { - return errors.Wrap(err, "could not start transaction") - } - defer tx.Rollback() conn, err := connectGRPC(clicontext) if err != nil { return err } + provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)) - images, err := images.List(tx) + images, err := imageStore.List(ctx) if err != nil { return errors.Wrap(err, "failed to list images") } @@ -54,7 +49,7 @@ var imagesCommand = cli.Command{ log.G(ctx).WithError(err).Errorf("failed calculating size for image %s", image.Name) } - fmt.Fprintf(tw, "%v\t%v\t%v\t%v\t\n", image.Name, image.Descriptor.MediaType, image.Descriptor.Digest, progress.Bytes(size)) + fmt.Fprintf(tw, "%v\t%v\t%v\t%v\t\n", image.Name, image.Target.MediaType, image.Target.Digest, progress.Bytes(size)) } tw.Flush() @@ -74,19 +69,13 @@ var rmiCommand = cli.Command{ exitErr error ) - db, err := getDB(clicontext, false) + imageStore, err := resolveImageStore(clicontext) if err != nil { - return errors.Wrap(err, "failed to open database") + return err } - tx, err := db.Begin(true) - if err != nil { - return errors.Wrap(err, "could not start transaction") - } - defer tx.Rollback() - for _, target := range clicontext.Args() { - if err := images.Delete(tx, target); err != nil { + if err := imageStore.Delete(ctx, target); err != nil { if exitErr == nil { exitErr = errors.Wrapf(err, "unable to delete %v", target) } diff --git a/cmd/dist/pull.go b/cmd/dist/pull.go index f00ca69..8484034 100644 --- a/cmd/dist/pull.go +++ b/cmd/dist/pull.go @@ -47,17 +47,10 @@ command. As part of this process, we do the following: return err } - db, err := getDB(clicontext, false) + imageStore, err := resolveImageStore(clicontext) if err != nil { return err } - defer db.Close() - - tx, err := db.Begin(true) - if err != nil { - return err - } - defer tx.Rollback() resolver, err := getResolver(ctx) if err != nil { @@ -65,6 +58,7 @@ command. As part of this process, we do the following: } ongoing := newJobs() + // TODO(stevvooe): Must unify this type. ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn)) provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)) @@ -88,13 +82,8 @@ command. As part of this process, we do the following: close(resolved) eg.Go(func() error { - return images.Register(tx, name, desc) + return imageStore.Put(ctx, name, desc) }) - defer func() { - if err := tx.Commit(); err != nil { - log.G(ctx).WithError(err).Error("commit failed") - } - }() return images.Dispatch(ctx, images.Handlers(images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { @@ -114,24 +103,20 @@ command. As part of this process, we do the following: }() defer func() { - ctx := context.Background() - tx, err := db.Begin(false) - if err != nil { - log.G(ctx).Fatal(err) - } + ctx := background // TODO(stevvooe): This section unpacks the layers and resolves the // root filesystem chainid for the image. For now, we just print // it, but we should keep track of this in the metadata storage. - image, err := images.Get(tx, resolvedImageName) + image, err := imageStore.Get(ctx, resolvedImageName) if err != nil { log.G(ctx).Fatal(err) } provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)) - p, err := content.ReadBlob(ctx, provider, image.Descriptor.Digest) + p, err := content.ReadBlob(ctx, provider, image.Target.Digest) if err != nil { log.G(ctx).Fatal(err) } diff --git a/plugin/plugin.go b/plugin/plugin.go index 96a8c07..095b9e8 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -4,6 +4,7 @@ import ( "fmt" "sync" + "github.com/boltdb/bolt" "github.com/containerd/containerd" "github.com/containerd/containerd/content" "github.com/containerd/containerd/snapshot" @@ -32,7 +33,8 @@ type InitContext struct { Root string State string Runtimes map[string]containerd.Runtime - Store *content.Store + Content *content.Store + Meta *bolt.DB Snapshotter snapshot.Snapshotter Config interface{} Context context.Context diff --git a/services/content/service.go b/services/content/service.go index c311124..128104f 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -38,7 +38,7 @@ func init() { func NewService(ic *plugin.InitContext) (interface{}, error) { return &Service{ - store: ic.Store, + store: ic.Content, }, nil } diff --git a/services/rootfs/service.go b/services/rootfs/service.go index 146c13c..539d36e 100644 --- a/services/rootfs/service.go +++ b/services/rootfs/service.go @@ -22,7 +22,7 @@ func init() { plugin.Register("rootfs-grpc", &plugin.Registration{ Type: plugin.GRPCPlugin, Init: func(ic *plugin.InitContext) (interface{}, error) { - return NewService(ic.Store, ic.Snapshotter) + return NewService(ic.Content, ic.Snapshotter) }, }) }