246 lines
5.7 KiB
Go
246 lines
5.7 KiB
Go
|
package main
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"os"
|
||
|
"text/tabwriter"
|
||
|
"time"
|
||
|
|
||
|
contentapi "github.com/docker/containerd/api/services/content"
|
||
|
rootfsapi "github.com/docker/containerd/api/services/rootfs"
|
||
|
"github.com/docker/containerd/content"
|
||
|
"github.com/docker/containerd/image"
|
||
|
"github.com/docker/containerd/log"
|
||
|
"github.com/docker/containerd/progress"
|
||
|
"github.com/docker/containerd/remotes"
|
||
|
contentservice "github.com/docker/containerd/services/content"
|
||
|
rootfsservice "github.com/docker/containerd/services/rootfs"
|
||
|
"github.com/opencontainers/image-spec/identity"
|
||
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||
|
"github.com/urfave/cli"
|
||
|
"golang.org/x/sync/errgroup"
|
||
|
)
|
||
|
|
||
|
var pullCommand = cli.Command{
|
||
|
Name: "pull",
|
||
|
Usage: "pull an image from a remote",
|
||
|
ArgsUsage: "[flags] <ref>",
|
||
|
Description: `Fetch and prepare an image for use in containerd.
|
||
|
|
||
|
After pulling an image, it should be ready to use the same reference in a run
|
||
|
command. As part of this process, we do the following:
|
||
|
|
||
|
1. Fetch all resources into containerd.
|
||
|
2. Prepare the snapshot filesystem with the pulled resources.
|
||
|
3. Register metadata for the image.
|
||
|
`,
|
||
|
Flags: []cli.Flag{},
|
||
|
Action: func(clicontext *cli.Context) error {
|
||
|
var (
|
||
|
ctx = background
|
||
|
ref = clicontext.Args().First()
|
||
|
)
|
||
|
|
||
|
conn, err := connectGRPC(clicontext)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
db, err := getDB(clicontext, false)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer db.Close()
|
||
|
|
||
|
tx, err := db.Begin(true)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer tx.Rollback()
|
||
|
|
||
|
resolver, err := getResolver(ctx)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
ongoing := newJobs()
|
||
|
|
||
|
ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn))
|
||
|
provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
|
||
|
cs, err := resolveContentStore(clicontext)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
eg, ctx := errgroup.WithContext(ctx)
|
||
|
|
||
|
var resolvedImageName string
|
||
|
resolved := make(chan struct{})
|
||
|
eg.Go(func() error {
|
||
|
ongoing.add(ref)
|
||
|
name, desc, fetcher, err := resolver.Resolve(ctx, ref)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
log.G(ctx).WithField("image", name).Debug("fetching")
|
||
|
resolvedImageName = name
|
||
|
close(resolved)
|
||
|
|
||
|
eg.Go(func() error {
|
||
|
return image.Register(tx, name, desc)
|
||
|
})
|
||
|
defer func() {
|
||
|
if err := tx.Commit(); err != nil {
|
||
|
log.G(ctx).WithError(err).Error("commit failed")
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
return image.Dispatch(ctx,
|
||
|
image.Handlers(image.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||
|
ongoing.add(remotes.MakeRefKey(ctx, desc))
|
||
|
return nil, nil
|
||
|
}),
|
||
|
remotes.FetchHandler(ingester, fetcher),
|
||
|
image.ChildrenHandler(provider)),
|
||
|
desc)
|
||
|
|
||
|
})
|
||
|
|
||
|
errs := make(chan error)
|
||
|
go func() {
|
||
|
defer close(errs)
|
||
|
errs <- eg.Wait()
|
||
|
}()
|
||
|
|
||
|
defer func() {
|
||
|
ctx := context.Background()
|
||
|
tx, err := db.Begin(false)
|
||
|
if err != nil {
|
||
|
log.G(ctx).Fatal(err)
|
||
|
}
|
||
|
|
||
|
// TODO(stevvooe): This section unpacks the layers and resolves the
|
||
|
// root filesystem chainid for the image. For now, we just print
|
||
|
// it, but we should keep track of this in the metadata storage.
|
||
|
|
||
|
image, err := image.Get(tx, resolvedImageName)
|
||
|
if err != nil {
|
||
|
log.G(ctx).Fatal(err)
|
||
|
}
|
||
|
|
||
|
provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
|
||
|
|
||
|
p, err := content.ReadBlob(ctx, provider, image.Descriptor.Digest)
|
||
|
if err != nil {
|
||
|
log.G(ctx).Fatal(err)
|
||
|
}
|
||
|
|
||
|
var manifest ocispec.Manifest
|
||
|
if err := json.Unmarshal(p, &manifest); err != nil {
|
||
|
log.G(ctx).Fatal(err)
|
||
|
}
|
||
|
|
||
|
rootfs := rootfsservice.NewUnpackerFromClient(rootfsapi.NewRootFSClient(conn))
|
||
|
|
||
|
log.G(ctx).Info("unpacking rootfs")
|
||
|
chainID, err := rootfs.Unpack(ctx, manifest.Layers)
|
||
|
if err != nil {
|
||
|
log.G(ctx).Fatal(err)
|
||
|
}
|
||
|
|
||
|
diffIDs, err := image.RootFS(ctx, provider)
|
||
|
if err != nil {
|
||
|
log.G(ctx).WithError(err).Fatal("failed resolving rootfs")
|
||
|
}
|
||
|
|
||
|
expectedChainID := identity.ChainID(diffIDs)
|
||
|
if expectedChainID != chainID {
|
||
|
log.G(ctx).Fatal("rootfs service did not match chainid")
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
var (
|
||
|
ticker = time.NewTicker(100 * time.Millisecond)
|
||
|
fw = progress.NewWriter(os.Stdout)
|
||
|
start = time.Now()
|
||
|
done bool
|
||
|
)
|
||
|
defer ticker.Stop()
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case <-ticker.C:
|
||
|
fw.Flush()
|
||
|
|
||
|
tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
|
||
|
js := ongoing.jobs()
|
||
|
statuses := map[string]statusInfo{}
|
||
|
|
||
|
activeSeen := map[string]struct{}{}
|
||
|
if !done {
|
||
|
active, err := cs.Active()
|
||
|
if err != nil {
|
||
|
log.G(ctx).WithError(err).Error("active check failed")
|
||
|
continue
|
||
|
}
|
||
|
// update status of active entries!
|
||
|
for _, active := range active {
|
||
|
statuses[active.Ref] = statusInfo{
|
||
|
Ref: active.Ref,
|
||
|
Status: "downloading",
|
||
|
Offset: active.Offset,
|
||
|
Total: active.Total,
|
||
|
StartedAt: active.StartedAt,
|
||
|
UpdatedAt: active.UpdatedAt,
|
||
|
}
|
||
|
activeSeen[active.Ref] = struct{}{}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// now, update the items in jobs that are not in active
|
||
|
for _, j := range js {
|
||
|
if _, ok := activeSeen[j]; ok {
|
||
|
continue
|
||
|
}
|
||
|
status := "done"
|
||
|
|
||
|
if j == ref {
|
||
|
select {
|
||
|
case <-resolved:
|
||
|
status = "resolved"
|
||
|
default:
|
||
|
status = "resolving"
|
||
|
}
|
||
|
}
|
||
|
|
||
|
statuses[j] = statusInfo{
|
||
|
Ref: j,
|
||
|
Status: status, // for now!
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var ordered []statusInfo
|
||
|
for _, j := range js {
|
||
|
ordered = append(ordered, statuses[j])
|
||
|
}
|
||
|
|
||
|
display(tw, ordered, start)
|
||
|
tw.Flush()
|
||
|
|
||
|
if done {
|
||
|
fw.Flush()
|
||
|
return nil
|
||
|
}
|
||
|
case err := <-errs:
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
done = true
|
||
|
case <-ctx.Done():
|
||
|
done = true // allow ui to update once more
|
||
|
}
|
||
|
}
|
||
|
|
||
|
},
|
||
|
}
|