cc983be17a
This adds very simple deletion of images by name. We still need to consider the approach to handling image name, so this may change. For the time being, it allows one to delete an image entry in the metadata database. Signed-off-by: Stephen J Day <stephen.day@docker.com>
246 lines
5.7 KiB
Go
246 lines
5.7 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"os"
|
|
"text/tabwriter"
|
|
"time"
|
|
|
|
contentapi "github.com/docker/containerd/api/services/content"
|
|
rootfsapi "github.com/docker/containerd/api/services/rootfs"
|
|
"github.com/docker/containerd/content"
|
|
"github.com/docker/containerd/images"
|
|
"github.com/docker/containerd/log"
|
|
"github.com/docker/containerd/progress"
|
|
"github.com/docker/containerd/remotes"
|
|
contentservice "github.com/docker/containerd/services/content"
|
|
rootfsservice "github.com/docker/containerd/services/rootfs"
|
|
"github.com/opencontainers/image-spec/identity"
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
"github.com/urfave/cli"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
var pullCommand = cli.Command{
|
|
Name: "pull",
|
|
Usage: "pull an image from a remote",
|
|
ArgsUsage: "[flags] <ref>",
|
|
Description: `Fetch and prepare an image for use in containerd.
|
|
|
|
After pulling an image, it should be ready to use the same reference in a run
|
|
command. As part of this process, we do the following:
|
|
|
|
1. Fetch all resources into containerd.
|
|
2. Prepare the snapshot filesystem with the pulled resources.
|
|
3. Register metadata for the image.
|
|
`,
|
|
Flags: []cli.Flag{},
|
|
Action: func(clicontext *cli.Context) error {
|
|
var (
|
|
ctx = background
|
|
ref = clicontext.Args().First()
|
|
)
|
|
|
|
conn, err := connectGRPC(clicontext)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
db, err := getDB(clicontext, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer db.Close()
|
|
|
|
tx, err := db.Begin(true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
resolver, err := getResolver(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ongoing := newJobs()
|
|
|
|
ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn))
|
|
provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
|
|
|
|
cs, err := resolveContentStore(clicontext)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
eg, ctx := errgroup.WithContext(ctx)
|
|
|
|
var resolvedImageName string
|
|
resolved := make(chan struct{})
|
|
eg.Go(func() error {
|
|
ongoing.add(ref)
|
|
name, desc, fetcher, err := resolver.Resolve(ctx, ref)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.G(ctx).WithField("image", name).Debug("fetching")
|
|
resolvedImageName = name
|
|
close(resolved)
|
|
|
|
eg.Go(func() error {
|
|
return images.Register(tx, name, desc)
|
|
})
|
|
defer func() {
|
|
if err := tx.Commit(); err != nil {
|
|
log.G(ctx).WithError(err).Error("commit failed")
|
|
}
|
|
}()
|
|
|
|
return images.Dispatch(ctx,
|
|
images.Handlers(images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
|
ongoing.add(remotes.MakeRefKey(ctx, desc))
|
|
return nil, nil
|
|
}),
|
|
remotes.FetchHandler(ingester, fetcher),
|
|
images.ChildrenHandler(provider)),
|
|
desc)
|
|
|
|
})
|
|
|
|
errs := make(chan error)
|
|
go func() {
|
|
defer close(errs)
|
|
errs <- eg.Wait()
|
|
}()
|
|
|
|
defer func() {
|
|
ctx := context.Background()
|
|
tx, err := db.Begin(false)
|
|
if err != nil {
|
|
log.G(ctx).Fatal(err)
|
|
}
|
|
|
|
// TODO(stevvooe): This section unpacks the layers and resolves the
|
|
// root filesystem chainid for the image. For now, we just print
|
|
// it, but we should keep track of this in the metadata storage.
|
|
|
|
image, err := images.Get(tx, resolvedImageName)
|
|
if err != nil {
|
|
log.G(ctx).Fatal(err)
|
|
}
|
|
|
|
provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
|
|
|
|
p, err := content.ReadBlob(ctx, provider, image.Descriptor.Digest)
|
|
if err != nil {
|
|
log.G(ctx).Fatal(err)
|
|
}
|
|
|
|
var manifest ocispec.Manifest
|
|
if err := json.Unmarshal(p, &manifest); err != nil {
|
|
log.G(ctx).Fatal(err)
|
|
}
|
|
|
|
rootfs := rootfsservice.NewUnpackerFromClient(rootfsapi.NewRootFSClient(conn))
|
|
|
|
log.G(ctx).Info("unpacking rootfs")
|
|
chainID, err := rootfs.Unpack(ctx, manifest.Layers)
|
|
if err != nil {
|
|
log.G(ctx).Fatal(err)
|
|
}
|
|
|
|
diffIDs, err := image.RootFS(ctx, provider)
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Fatal("failed resolving rootfs")
|
|
}
|
|
|
|
expectedChainID := identity.ChainID(diffIDs)
|
|
if expectedChainID != chainID {
|
|
log.G(ctx).Fatal("rootfs service did not match chainid")
|
|
}
|
|
}()
|
|
|
|
var (
|
|
ticker = time.NewTicker(100 * time.Millisecond)
|
|
fw = progress.NewWriter(os.Stdout)
|
|
start = time.Now()
|
|
done bool
|
|
)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
fw.Flush()
|
|
|
|
tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
|
|
js := ongoing.jobs()
|
|
statuses := map[string]statusInfo{}
|
|
|
|
activeSeen := map[string]struct{}{}
|
|
if !done {
|
|
active, err := cs.Active()
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Error("active check failed")
|
|
continue
|
|
}
|
|
// update status of active entries!
|
|
for _, active := range active {
|
|
statuses[active.Ref] = statusInfo{
|
|
Ref: active.Ref,
|
|
Status: "downloading",
|
|
Offset: active.Offset,
|
|
Total: active.Total,
|
|
StartedAt: active.StartedAt,
|
|
UpdatedAt: active.UpdatedAt,
|
|
}
|
|
activeSeen[active.Ref] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// now, update the items in jobs that are not in active
|
|
for _, j := range js {
|
|
if _, ok := activeSeen[j]; ok {
|
|
continue
|
|
}
|
|
status := "done"
|
|
|
|
if j == ref {
|
|
select {
|
|
case <-resolved:
|
|
status = "resolved"
|
|
default:
|
|
status = "resolving"
|
|
}
|
|
}
|
|
|
|
statuses[j] = statusInfo{
|
|
Ref: j,
|
|
Status: status, // for now!
|
|
}
|
|
}
|
|
|
|
var ordered []statusInfo
|
|
for _, j := range js {
|
|
ordered = append(ordered, statuses[j])
|
|
}
|
|
|
|
display(tw, ordered, start)
|
|
tw.Flush()
|
|
|
|
if done {
|
|
fw.Flush()
|
|
return nil
|
|
}
|
|
case err := <-errs:
|
|
if err != nil {
|
|
return err
|
|
}
|
|
done = true
|
|
case <-ctx.Done():
|
|
done = true // allow ui to update once more
|
|
}
|
|
}
|
|
|
|
},
|
|
}
|