Merge pull request #675 from stevvooe/images-service
api/services/images: define images metadata service
This commit is contained in:
commit
e2b042e7c1
20 changed files with 1814 additions and 143 deletions
|
@ -7,6 +7,7 @@ import (
|
|||
_ "github.com/containerd/containerd/services/content"
|
||||
_ "github.com/containerd/containerd/services/execution"
|
||||
_ "github.com/containerd/containerd/services/healthcheck"
|
||||
_ "github.com/containerd/containerd/services/images"
|
||||
_ "github.com/containerd/containerd/services/metrics"
|
||||
_ "github.com/containerd/containerd/services/rootfs"
|
||||
_ "github.com/containerd/containerd/snapshot/btrfs"
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
gocontext "golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
|
@ -20,8 +21,10 @@ import (
|
|||
"github.com/containerd/containerd"
|
||||
contentapi "github.com/containerd/containerd/api/services/content"
|
||||
api "github.com/containerd/containerd/api/services/execution"
|
||||
imagesapi "github.com/containerd/containerd/api/services/images"
|
||||
rootfsapi "github.com/containerd/containerd/api/services/rootfs"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
"github.com/containerd/containerd/reaper"
|
||||
|
@ -116,11 +119,16 @@ func main() {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
meta, err := resolveMetaDB(context)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer meta.Close()
|
||||
snapshotter, err := loadSnapshotter(store)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
services, err := loadServices(runtimes, store, snapshotter)
|
||||
services, err := loadServices(runtimes, store, snapshotter, meta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -266,6 +274,22 @@ func resolveContentStore() (*content.Store, error) {
|
|||
return content.NewStore(cp)
|
||||
}
|
||||
|
||||
func resolveMetaDB(ctx *cli.Context) (*bolt.DB, error) {
|
||||
path := filepath.Join(conf.Root, "meta.db")
|
||||
|
||||
db, err := bolt.Open(path, 0644, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO(stevvooe): Break these down into components to be initialized.
|
||||
if err := images.InitDB(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func loadRuntimes(monitor plugin.ContainerMonitor) (map[string]containerd.Runtime, error) {
|
||||
o := make(map[string]containerd.Runtime)
|
||||
for name, rr := range plugin.Registrations() {
|
||||
|
@ -332,7 +356,7 @@ func loadSnapshotter(store *content.Store) (snapshot.Snapshotter, error) {
|
|||
ic := &plugin.InitContext{
|
||||
Root: conf.Root,
|
||||
State: conf.State,
|
||||
Store: store,
|
||||
Content: store,
|
||||
Context: log.WithModule(global, moduleName),
|
||||
}
|
||||
if sr.Config != nil {
|
||||
|
@ -359,7 +383,7 @@ func newGRPCServer() *grpc.Server {
|
|||
return s
|
||||
}
|
||||
|
||||
func loadServices(runtimes map[string]containerd.Runtime, store *content.Store, sn snapshot.Snapshotter) ([]plugin.Service, error) {
|
||||
func loadServices(runtimes map[string]containerd.Runtime, store *content.Store, sn snapshot.Snapshotter, meta *bolt.DB) ([]plugin.Service, error) {
|
||||
var o []plugin.Service
|
||||
for name, sr := range plugin.Registrations() {
|
||||
if sr.Type != plugin.GRPCPlugin {
|
||||
|
@ -371,7 +395,8 @@ func loadServices(runtimes map[string]containerd.Runtime, store *content.Store,
|
|||
State: conf.State,
|
||||
Context: log.WithModule(global, fmt.Sprintf("service-%s", name)),
|
||||
Runtimes: runtimes,
|
||||
Store: store,
|
||||
Content: store,
|
||||
Meta: meta,
|
||||
Snapshotter: sn,
|
||||
}
|
||||
if sr.Config != nil {
|
||||
|
@ -423,6 +448,8 @@ func interceptor(ctx gocontext.Context,
|
|||
ctx = log.WithModule(ctx, "content")
|
||||
case rootfsapi.RootFSServer:
|
||||
ctx = log.WithModule(ctx, "rootfs")
|
||||
case imagesapi.ImagesServer:
|
||||
ctx = log.WithModule(ctx, "images")
|
||||
default:
|
||||
fmt.Printf("unknown GRPC server type: %#v\n", info.Server)
|
||||
}
|
||||
|
|
|
@ -277,27 +277,18 @@ var runCommand = cli.Command{
|
|||
return err
|
||||
}
|
||||
|
||||
db, err := getDB(context, false)
|
||||
imageStore, err := getImageStore(context)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed opening database")
|
||||
return errors.Wrap(err, "failed resolving image store")
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
tx, err := db.Begin(false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
ref := context.Args().First()
|
||||
|
||||
image, err := images.Get(tx, ref)
|
||||
image, err := imageStore.Get(ctx, ref)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "could not resolve %q", ref)
|
||||
}
|
||||
// let's close out our db and tx so we don't hold the lock whilst running.
|
||||
tx.Rollback()
|
||||
db.Close()
|
||||
|
||||
diffIDs, err := image.RootFS(ctx, provider)
|
||||
if err != nil {
|
||||
|
|
|
@ -14,14 +14,15 @@ import (
|
|||
|
||||
gocontext "context"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
contentapi "github.com/containerd/containerd/api/services/content"
|
||||
"github.com/containerd/containerd/api/services/execution"
|
||||
imagesapi "github.com/containerd/containerd/api/services/images"
|
||||
rootfsapi "github.com/containerd/containerd/api/services/rootfs"
|
||||
"github.com/containerd/containerd/api/types/container"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/images"
|
||||
contentservice "github.com/containerd/containerd/services/content"
|
||||
imagesservice "github.com/containerd/containerd/services/images"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tonistiigi/fifo"
|
||||
"github.com/urfave/cli"
|
||||
|
@ -134,25 +135,12 @@ func getRootFSService(context *cli.Context) (rootfsapi.RootFSClient, error) {
|
|||
return rootfsapi.NewRootFSClient(conn), nil
|
||||
}
|
||||
|
||||
func getDB(ctx *cli.Context, readonly bool) (*bolt.DB, error) {
|
||||
// TODO(stevvooe): For now, we operate directly on the database. We will
|
||||
// replace this with a GRPC service when the details are more concrete.
|
||||
path := filepath.Join(ctx.GlobalString("root"), "meta.db")
|
||||
|
||||
db, err := bolt.Open(path, 0644, &bolt.Options{
|
||||
ReadOnly: readonly,
|
||||
})
|
||||
func getImageStore(clicontext *cli.Context) (images.Store, error) {
|
||||
conn, err := getGRPCConnection(clicontext)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !readonly {
|
||||
if err := images.InitDB(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return db, nil
|
||||
return imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)), nil
|
||||
}
|
||||
|
||||
func getTempDir(id string) (string, error) {
|
||||
|
|
32
cmd/dist/common.go
vendored
32
cmd/dist/common.go
vendored
|
@ -6,11 +6,12 @@ import (
|
|||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
imagesapi "github.com/containerd/containerd/api/services/images"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/remotes"
|
||||
"github.com/containerd/containerd/remotes/docker"
|
||||
imagesservice "github.com/containerd/containerd/services/images"
|
||||
"github.com/urfave/cli"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
@ -27,6 +28,14 @@ func resolveContentStore(context *cli.Context) (*content.Store, error) {
|
|||
return content.NewStore(root)
|
||||
}
|
||||
|
||||
func resolveImageStore(clicontext *cli.Context) (images.Store, error) {
|
||||
conn, err := connectGRPC(clicontext)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)), nil
|
||||
}
|
||||
|
||||
func connectGRPC(context *cli.Context) (*grpc.ClientConn, error) {
|
||||
socket := context.GlobalString("socket")
|
||||
timeout := context.GlobalDuration("connect-timeout")
|
||||
|
@ -40,27 +49,6 @@ func connectGRPC(context *cli.Context) (*grpc.ClientConn, error) {
|
|||
)
|
||||
}
|
||||
|
||||
func getDB(ctx *cli.Context, readonly bool) (*bolt.DB, error) {
|
||||
// TODO(stevvooe): For now, we operate directly on the database. We will
|
||||
// replace this with a GRPC service when the details are more concrete.
|
||||
path := filepath.Join(ctx.GlobalString("root"), "meta.db")
|
||||
|
||||
db, err := bolt.Open(path, 0644, &bolt.Options{
|
||||
ReadOnly: readonly,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !readonly {
|
||||
if err := images.InitDB(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
// getResolver prepares the resolver from the environment and options.
|
||||
func getResolver(ctx context.Context) (remotes.Resolver, error) {
|
||||
return docker.NewResolver(), nil
|
||||
|
|
27
cmd/dist/images.go
vendored
27
cmd/dist/images.go
vendored
|
@ -6,7 +6,6 @@ import (
|
|||
"text/tabwriter"
|
||||
|
||||
contentapi "github.com/containerd/containerd/api/services/content"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/progress"
|
||||
contentservice "github.com/containerd/containerd/services/content"
|
||||
|
@ -25,23 +24,19 @@ var imagesCommand = cli.Command{
|
|||
ctx = background
|
||||
)
|
||||
|
||||
db, err := getDB(clicontext, true)
|
||||
imageStore, err := resolveImageStore(clicontext)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to open database")
|
||||
return err
|
||||
}
|
||||
tx, err := db.Begin(false)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not start transaction")
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
conn, err := connectGRPC(clicontext)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
|
||||
|
||||
images, err := images.List(tx)
|
||||
images, err := imageStore.List(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to list images")
|
||||
}
|
||||
|
@ -54,7 +49,7 @@ var imagesCommand = cli.Command{
|
|||
log.G(ctx).WithError(err).Errorf("failed calculating size for image %s", image.Name)
|
||||
}
|
||||
|
||||
fmt.Fprintf(tw, "%v\t%v\t%v\t%v\t\n", image.Name, image.Descriptor.MediaType, image.Descriptor.Digest, progress.Bytes(size))
|
||||
fmt.Fprintf(tw, "%v\t%v\t%v\t%v\t\n", image.Name, image.Target.MediaType, image.Target.Digest, progress.Bytes(size))
|
||||
}
|
||||
tw.Flush()
|
||||
|
||||
|
@ -74,19 +69,13 @@ var rmiCommand = cli.Command{
|
|||
exitErr error
|
||||
)
|
||||
|
||||
db, err := getDB(clicontext, false)
|
||||
imageStore, err := resolveImageStore(clicontext)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to open database")
|
||||
return err
|
||||
}
|
||||
|
||||
tx, err := db.Begin(true)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not start transaction")
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
for _, target := range clicontext.Args() {
|
||||
if err := images.Delete(tx, target); err != nil {
|
||||
if err := imageStore.Delete(ctx, target); err != nil {
|
||||
if exitErr == nil {
|
||||
exitErr = errors.Wrapf(err, "unable to delete %v", target)
|
||||
}
|
||||
|
|
27
cmd/dist/pull.go
vendored
27
cmd/dist/pull.go
vendored
|
@ -47,17 +47,10 @@ command. As part of this process, we do the following:
|
|||
return err
|
||||
}
|
||||
|
||||
db, err := getDB(clicontext, false)
|
||||
imageStore, err := resolveImageStore(clicontext)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
tx, err := db.Begin(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
resolver, err := getResolver(ctx)
|
||||
if err != nil {
|
||||
|
@ -65,6 +58,7 @@ command. As part of this process, we do the following:
|
|||
}
|
||||
ongoing := newJobs()
|
||||
|
||||
// TODO(stevvooe): Must unify this type.
|
||||
ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn))
|
||||
provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
|
||||
|
||||
|
@ -88,13 +82,8 @@ command. As part of this process, we do the following:
|
|||
close(resolved)
|
||||
|
||||
eg.Go(func() error {
|
||||
return images.Register(tx, name, desc)
|
||||
return imageStore.Put(ctx, name, desc)
|
||||
})
|
||||
defer func() {
|
||||
if err := tx.Commit(); err != nil {
|
||||
log.G(ctx).WithError(err).Error("commit failed")
|
||||
}
|
||||
}()
|
||||
|
||||
return images.Dispatch(ctx,
|
||||
images.Handlers(images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||
|
@ -114,24 +103,20 @@ command. As part of this process, we do the following:
|
|||
}()
|
||||
|
||||
defer func() {
|
||||
ctx := context.Background()
|
||||
tx, err := db.Begin(false)
|
||||
if err != nil {
|
||||
log.G(ctx).Fatal(err)
|
||||
}
|
||||
ctx := background
|
||||
|
||||
// TODO(stevvooe): This section unpacks the layers and resolves the
|
||||
// root filesystem chainid for the image. For now, we just print
|
||||
// it, but we should keep track of this in the metadata storage.
|
||||
|
||||
image, err := images.Get(tx, resolvedImageName)
|
||||
image, err := imageStore.Get(ctx, resolvedImageName)
|
||||
if err != nil {
|
||||
log.G(ctx).Fatal(err)
|
||||
}
|
||||
|
||||
provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
|
||||
|
||||
p, err := content.ReadBlob(ctx, provider, image.Descriptor.Digest)
|
||||
p, err := content.ReadBlob(ctx, provider, image.Target.Digest)
|
||||
if err != nil {
|
||||
log.G(ctx).Fatal(err)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue