cmd/dist, image, remotes: introduce image handlers

With this PR, we introduce the concept of image handlers. They support
walking a tree of image resource descriptors for doing various tasks
related to processing them. Handlers can be dispatched sequentially or
in parallel and can be stacked for various effects.

The main functionality we introduce here is parameterized fetch without
coupling format resolution to the process itself. Two important
handlers, `remotes.FetchHandler` and `image.ChildrenHandler` can be
composed to implement recursive fetch with full status reporting. The
approach can also be modified to filter based on platform or other
constraints, unlocking a lot of possibilities.

This also includes some light refactoring in the fetch command, in
preparation for submission of end to end pull.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2017-03-17 14:46:25 -07:00
parent f95ba7c5ea
commit 5a3151eefc
No known key found for this signature in database
GPG key ID: 67B3DED84EDC823F
3 changed files with 288 additions and 148 deletions

209
cmd/dist/fetch.go vendored
View file

@ -1,19 +1,16 @@
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"os"
"sync"
"text/tabwriter"
"time"
"github.com/Sirupsen/logrus"
contentapi "github.com/docker/containerd/api/services/content"
"github.com/docker/containerd/content"
"github.com/docker/containerd/image"
"github.com/docker/containerd/log"
"github.com/docker/containerd/progress"
"github.com/docker/containerd/remotes"
@ -58,9 +55,13 @@ Most of this is experimental and there are few leaps to make this work.`,
if err != nil {
return err
}
ctx = withJobsContext(ctx)
ongoing := newJobs()
ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn))
provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
// TODO(stevvooe): Need to replace this with content store client.
cs, err := resolveContentStore(clicontext)
if err != nil {
return err
@ -70,7 +71,7 @@ Most of this is experimental and there are few leaps to make this work.`,
resolved := make(chan struct{})
eg.Go(func() error {
addJob(ctx, ref)
ongoing.add(ref)
name, desc, fetcher, err := resolver.Resolve(ctx, ref)
if err != nil {
return err
@ -78,7 +79,15 @@ Most of this is experimental and there are few leaps to make this work.`,
log.G(ctx).WithField("image", name).Debug("fetching")
close(resolved)
return dispatch(ctx, ingester, fetcher, desc)
return image.Dispatch(ctx,
image.Handlers(image.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ongoing.add(remotes.MakeRefKey(ctx, desc))
return nil, nil
}),
remotes.FetchHandler(ingester, fetcher),
image.ChildrenHandler(provider),
),
desc)
})
errs := make(chan error)
@ -99,17 +108,7 @@ Most of this is experimental and there are few leaps to make this work.`,
fw.Flush()
tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
var total int64
js := getJobs(ctx)
type statusInfo struct {
Ref string
Status string
Offset int64
Total int64
StartedAt time.Time
UpdatedAt time.Time
}
statuses := map[string]statusInfo{}
activeSeen := map[string]struct{}{}
@ -133,6 +132,7 @@ Most of this is experimental and there are few leaps to make this work.`,
}
}
js := ongoing.jobs()
// now, update the items in jobs that are not in active
for _, j := range js {
if _, ok := activeSeen[j]; ok {
@ -155,42 +155,12 @@ Most of this is experimental and there are few leaps to make this work.`,
}
}
var ordered []statusInfo
for _, j := range js {
status := statuses[j]
total += status.Offset
switch status.Status {
case "downloading":
bar := progress.Bar(float64(status.Offset) / float64(status.Total))
fmt.Fprintf(tw, "%s:\t%s\t%40r\t%8.8s/%s\t\n",
status.Ref,
status.Status,
bar,
progress.Bytes(status.Offset), progress.Bytes(status.Total))
case "resolving":
bar := progress.Bar(0.0)
fmt.Fprintf(tw, "%s:\t%s\t%40r\t\n",
status.Ref,
status.Status,
bar)
default:
bar := progress.Bar(1.0)
fmt.Fprintf(tw, "%s:\t%s\t%40r\t\n",
status.Ref,
status.Status,
bar)
}
ordered = append(ordered, statuses[j])
}
fmt.Fprintf(tw, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n",
time.Since(start).Seconds(),
// TODO(stevvooe): These calculations are actually way off.
// Need to account for previously downloaded data. These
// will basically be right for a download the first time
// but will be skewed if restarting, as it includes the
// data into the start time before.
progress.Bytes(total),
progress.NewBytesPerSecond(total, time.Since(start)))
display(tw, ordered, start)
tw.Flush()
if done {
@ -222,24 +192,6 @@ type jobs struct {
mu sync.Mutex
}
// jobsKeys let's us store the jobs instance in the context.
//
// This is a very cute way to do things but not ideal.
type jobsKey struct{}
func getJobs(ctx context.Context) []string {
return ctx.Value(jobsKey{}).(*jobs).jobs()
}
func addJob(ctx context.Context, job string) {
ctx.Value(jobsKey{}).(*jobs).add(job)
}
func withJobsContext(ctx context.Context) context.Context {
jobs := newJobs()
return context.WithValue(ctx, jobsKey{}, jobs)
}
func newJobs() *jobs {
return &jobs{added: make(map[string]struct{})}
}
@ -267,88 +219,49 @@ func (j *jobs) jobs() []string {
return jobs
}
func fetchManifest(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, desc ocispec.Descriptor) error {
ref := "manifest-" + desc.Digest.String()
addJob(ctx, ref)
rc, err := fetcher.Fetch(ctx, desc)
if err != nil {
return err
}
defer rc.Close()
// it would be better to read the content back from the content store in this case.
p, err := ioutil.ReadAll(rc)
if err != nil {
return err
}
if err := content.WriteBlob(ctx, ingester, ref, bytes.NewReader(p), 0, ""); err != nil {
return err
}
// TODO(stevvooe): This assumption that we get a manifest is unfortunate.
// Need to provide way to resolve what the type of the target is.
var manifest ocispec.Manifest
if err := json.Unmarshal(p, &manifest); err != nil {
return err
}
var descs []ocispec.Descriptor
descs = append(descs, manifest.Config)
for _, desc := range manifest.Layers {
descs = append(descs, desc)
}
return dispatch(ctx, ingester, fetcher, descs...)
type statusInfo struct {
Ref string
Status string
Offset int64
Total int64
StartedAt time.Time
UpdatedAt time.Time
}
func fetch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, desc ocispec.Descriptor) error {
ref := "fetch-" + desc.Digest.String()
addJob(ctx, ref)
log.G(ctx).Debug("fetch")
rc, err := fetcher.Fetch(ctx, desc)
if err != nil {
log.G(ctx).WithError(err).Error("fetch error")
return err
}
defer rc.Close()
// TODO(stevvooe): Need better remote key selection here. Should be a
// product of the fetcher. We may need more narrow infomation on fetcher or
// just pull from env/context.
return content.WriteBlob(ctx, ingester, ref, rc, desc.Size, desc.Digest)
}
// dispatch blocks until all content in `descs` is retrieved.
func dispatch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, descs ...ocispec.Descriptor) error {
eg, ctx := errgroup.WithContext(ctx)
for _, desc := range descs {
if err := func(desc ocispec.Descriptor) error {
ctx := log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
"digest": desc.Digest,
"mediatype": desc.MediaType,
"size": desc.Size,
}))
switch desc.MediaType {
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
eg.Go(func() error {
return fetchManifest(ctx, ingester, fetcher, desc)
})
case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
return fmt.Errorf("%v not yet supported", desc.MediaType)
default:
eg.Go(func() error {
return fetch(ctx, ingester, fetcher, desc)
})
}
return nil
}(desc); err != nil {
return err
func display(w io.Writer, statuses []statusInfo, start time.Time) {
var total int64
for _, status := range statuses {
total += status.Offset
switch status.Status {
case "downloading":
bar := progress.Bar(float64(status.Offset) / float64(status.Total))
fmt.Fprintf(w, "%s:\t%s\t%40r\t%8.8s/%s\t\n",
status.Ref,
status.Status,
bar,
progress.Bytes(status.Offset), progress.Bytes(status.Total))
case "resolving":
bar := progress.Bar(0.0)
fmt.Fprintf(w, "%s:\t%s\t%40r\t\n",
status.Ref,
status.Status,
bar)
default:
bar := progress.Bar(1.0)
fmt.Fprintf(w, "%s:\t%s\t%40r\t\n",
status.Ref,
status.Status,
bar)
}
}
return eg.Wait()
fmt.Fprintf(w, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n",
time.Since(start).Seconds(),
// TODO(stevvooe): These calculations are actually way off.
// Need to account for previously downloaded data. These
// will basically be right for a download the first time
// but will be skewed if restarting, as it includes the
// data into the start time before.
progress.Bytes(total),
progress.NewBytesPerSecond(total, time.Since(start)))
}