From 5a3151eefccf4ed59d4571831a8f0ef02de247aa Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Fri, 17 Mar 2017 14:46:25 -0700 Subject: [PATCH] cmd/dist, image, remotes: introduce image handlers With this PR, we introduce the concept of image handlers. They support walking a tree of image resource descriptors for doing various tasks related to processing them. Handlers can be dispatched sequentially or in parallel and can be stacked for various effects. The main functionality we introduce here is parameterized fetch without coupling format resolution to the process itself. Two important handlers, `remotes.FetchHandler` and `image.ChildrenHandler` can be composed to implement recursive fetch with full status reporting. The approach can also be modified to filter based on platform or other constraints, unlocking a lot of possibilities. This also includes some light refactoring in the fetch command, in preparation for submission of end to end pull. Signed-off-by: Stephen J Day --- cmd/dist/fetch.go | 209 +++++++++++++------------------------------- image/handlers.go | 161 ++++++++++++++++++++++++++++++++++ remotes/handlers.go | 66 ++++++++++++++ 3 files changed, 288 insertions(+), 148 deletions(-) create mode 100644 image/handlers.go create mode 100644 remotes/handlers.go diff --git a/cmd/dist/fetch.go b/cmd/dist/fetch.go index f210cfb..820d8d8 100644 --- a/cmd/dist/fetch.go +++ b/cmd/dist/fetch.go @@ -1,19 +1,16 @@ package main import ( - "bytes" "context" - "encoding/json" "fmt" - "io/ioutil" + "io" "os" "sync" "text/tabwriter" "time" - "github.com/Sirupsen/logrus" contentapi "github.com/docker/containerd/api/services/content" - "github.com/docker/containerd/content" + "github.com/docker/containerd/image" "github.com/docker/containerd/log" "github.com/docker/containerd/progress" "github.com/docker/containerd/remotes" @@ -58,9 +55,13 @@ Most of this is experimental and there are few leaps to make this work.`, if err != nil { return err } - ctx = withJobsContext(ctx) + + ongoing := newJobs() ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn)) + provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)) + + // TODO(stevvooe): Need to replace this with content store client. cs, err := resolveContentStore(clicontext) if err != nil { return err @@ -70,7 +71,7 @@ Most of this is experimental and there are few leaps to make this work.`, resolved := make(chan struct{}) eg.Go(func() error { - addJob(ctx, ref) + ongoing.add(ref) name, desc, fetcher, err := resolver.Resolve(ctx, ref) if err != nil { return err @@ -78,7 +79,15 @@ Most of this is experimental and there are few leaps to make this work.`, log.G(ctx).WithField("image", name).Debug("fetching") close(resolved) - return dispatch(ctx, ingester, fetcher, desc) + return image.Dispatch(ctx, + image.Handlers(image.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + ongoing.add(remotes.MakeRefKey(ctx, desc)) + return nil, nil + }), + remotes.FetchHandler(ingester, fetcher), + image.ChildrenHandler(provider), + ), + desc) }) errs := make(chan error) @@ -99,17 +108,7 @@ Most of this is experimental and there are few leaps to make this work.`, fw.Flush() tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0) - var total int64 - js := getJobs(ctx) - type statusInfo struct { - Ref string - Status string - Offset int64 - Total int64 - StartedAt time.Time - UpdatedAt time.Time - } statuses := map[string]statusInfo{} activeSeen := map[string]struct{}{} @@ -133,6 +132,7 @@ Most of this is experimental and there are few leaps to make this work.`, } } + js := ongoing.jobs() // now, update the items in jobs that are not in active for _, j := range js { if _, ok := activeSeen[j]; ok { @@ -155,42 +155,12 @@ Most of this is experimental and there are few leaps to make this work.`, } } + var ordered []statusInfo for _, j := range js { - status := statuses[j] - - total += status.Offset - switch status.Status { - case "downloading": - bar := progress.Bar(float64(status.Offset) / float64(status.Total)) - fmt.Fprintf(tw, "%s:\t%s\t%40r\t%8.8s/%s\t\n", - status.Ref, - status.Status, - bar, - progress.Bytes(status.Offset), progress.Bytes(status.Total)) - case "resolving": - bar := progress.Bar(0.0) - fmt.Fprintf(tw, "%s:\t%s\t%40r\t\n", - status.Ref, - status.Status, - bar) - default: - bar := progress.Bar(1.0) - fmt.Fprintf(tw, "%s:\t%s\t%40r\t\n", - status.Ref, - status.Status, - bar) - } + ordered = append(ordered, statuses[j]) } - fmt.Fprintf(tw, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n", - time.Since(start).Seconds(), - // TODO(stevvooe): These calculations are actually way off. - // Need to account for previously downloaded data. These - // will basically be right for a download the first time - // but will be skewed if restarting, as it includes the - // data into the start time before. - progress.Bytes(total), - progress.NewBytesPerSecond(total, time.Since(start))) + display(tw, ordered, start) tw.Flush() if done { @@ -222,24 +192,6 @@ type jobs struct { mu sync.Mutex } -// jobsKeys let's us store the jobs instance in the context. -// -// This is a very cute way to do things but not ideal. -type jobsKey struct{} - -func getJobs(ctx context.Context) []string { - return ctx.Value(jobsKey{}).(*jobs).jobs() -} - -func addJob(ctx context.Context, job string) { - ctx.Value(jobsKey{}).(*jobs).add(job) -} - -func withJobsContext(ctx context.Context) context.Context { - jobs := newJobs() - return context.WithValue(ctx, jobsKey{}, jobs) -} - func newJobs() *jobs { return &jobs{added: make(map[string]struct{})} } @@ -267,88 +219,49 @@ func (j *jobs) jobs() []string { return jobs } -func fetchManifest(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, desc ocispec.Descriptor) error { - ref := "manifest-" + desc.Digest.String() - addJob(ctx, ref) - - rc, err := fetcher.Fetch(ctx, desc) - if err != nil { - return err - } - defer rc.Close() - - // it would be better to read the content back from the content store in this case. - p, err := ioutil.ReadAll(rc) - if err != nil { - return err - } - - if err := content.WriteBlob(ctx, ingester, ref, bytes.NewReader(p), 0, ""); err != nil { - return err - } - - // TODO(stevvooe): This assumption that we get a manifest is unfortunate. - // Need to provide way to resolve what the type of the target is. - var manifest ocispec.Manifest - if err := json.Unmarshal(p, &manifest); err != nil { - return err - } - - var descs []ocispec.Descriptor - - descs = append(descs, manifest.Config) - for _, desc := range manifest.Layers { - descs = append(descs, desc) - } - - return dispatch(ctx, ingester, fetcher, descs...) +type statusInfo struct { + Ref string + Status string + Offset int64 + Total int64 + StartedAt time.Time + UpdatedAt time.Time } -func fetch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, desc ocispec.Descriptor) error { - ref := "fetch-" + desc.Digest.String() - addJob(ctx, ref) - log.G(ctx).Debug("fetch") - rc, err := fetcher.Fetch(ctx, desc) - if err != nil { - log.G(ctx).WithError(err).Error("fetch error") - return err - } - defer rc.Close() - - // TODO(stevvooe): Need better remote key selection here. Should be a - // product of the fetcher. We may need more narrow infomation on fetcher or - // just pull from env/context. - return content.WriteBlob(ctx, ingester, ref, rc, desc.Size, desc.Digest) -} - -// dispatch blocks until all content in `descs` is retrieved. -func dispatch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, descs ...ocispec.Descriptor) error { - eg, ctx := errgroup.WithContext(ctx) - for _, desc := range descs { - if err := func(desc ocispec.Descriptor) error { - ctx := log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{ - "digest": desc.Digest, - "mediatype": desc.MediaType, - "size": desc.Size, - })) - switch desc.MediaType { - case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: - eg.Go(func() error { - return fetchManifest(ctx, ingester, fetcher, desc) - }) - case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: - return fmt.Errorf("%v not yet supported", desc.MediaType) - default: - eg.Go(func() error { - return fetch(ctx, ingester, fetcher, desc) - }) - } - - return nil - }(desc); err != nil { - return err +func display(w io.Writer, statuses []statusInfo, start time.Time) { + var total int64 + for _, status := range statuses { + total += status.Offset + switch status.Status { + case "downloading": + bar := progress.Bar(float64(status.Offset) / float64(status.Total)) + fmt.Fprintf(w, "%s:\t%s\t%40r\t%8.8s/%s\t\n", + status.Ref, + status.Status, + bar, + progress.Bytes(status.Offset), progress.Bytes(status.Total)) + case "resolving": + bar := progress.Bar(0.0) + fmt.Fprintf(w, "%s:\t%s\t%40r\t\n", + status.Ref, + status.Status, + bar) + default: + bar := progress.Bar(1.0) + fmt.Fprintf(w, "%s:\t%s\t%40r\t\n", + status.Ref, + status.Status, + bar) } } - return eg.Wait() + fmt.Fprintf(w, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n", + time.Since(start).Seconds(), + // TODO(stevvooe): These calculations are actually way off. + // Need to account for previously downloaded data. These + // will basically be right for a download the first time + // but will be skewed if restarting, as it includes the + // data into the start time before. + progress.Bytes(total), + progress.NewBytesPerSecond(total, time.Since(start))) } diff --git a/image/handlers.go b/image/handlers.go new file mode 100644 index 0000000..44c7563 --- /dev/null +++ b/image/handlers.go @@ -0,0 +1,161 @@ +package image + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + + "github.com/docker/containerd/content" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" +) + +const ( + MediaTypeDockerSchema2Layer = "application/vnd.docker.image.rootfs.diff.tar" + MediaTypeDockerSchema2LayerGzip = "application/vnd.docker.image.rootfs.diff.tar.gzip" + MediaTypeDockerSchema2Config = "application/vnd.docker.container.image.v1+json" + MediaTypeDockerSchema2Manifest = "application/vnd.docker.distribution.manifest.v2+json" + MediaTypeDockerSchema2ManifestList = "application/vnd.docker.distribution.manifest.list.v2+json" +) + +var SkipDesc = fmt.Errorf("skip descriptor") + +type Handler interface { + Handle(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) +} + +type HandlerFunc func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) + +func (fn HandlerFunc) Handle(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) { + return fn(ctx, desc) +} + +// Handlers returns a handler that will run the handlers in sequence. +func Handlers(handlers ...Handler) HandlerFunc { + return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) { + var children []ocispec.Descriptor + for _, handler := range handlers { + ch, err := handler.Handle(ctx, desc) + if err != nil { + return nil, err + } + + children = append(children, ch...) + } + + return children, nil + } +} + +// Walk the resources of an image and call the handler for each. If the handler +// decodes the sub-resources for each image, +// +// This differs from dispatch in that each sibling resource is considered +// synchronously. +func Walk(ctx context.Context, handler Handler, descs ...ocispec.Descriptor) error { + for _, desc := range descs { + + children, err := handler.Handle(ctx, desc) + if err != nil { + if errors.Cause(err) == SkipDesc { + return nil // don't traverse the children. + } + return err + } + + if len(children) > 0 { + if err := Walk(ctx, handler, children...); err != nil { + return err + } + } + } + + return nil +} + +// Dispatch runs the provided handler for content specified by the descriptors. +// If the handler decode subresources, they will be visited, as well. +// +// Handlers for siblings are run in parallel on the provided descriptors. A +// handler may return `SkipDesc` to signal to the dispatcher to not traverse +// any children. +// +// Typically, this function will be used with `FetchHandler`, often composed +// with other handlers. +// +// If any handler returns an error, the dispatch session will be canceled. +func Dispatch(ctx context.Context, handler Handler, descs ...ocispec.Descriptor) error { + eg, ctx := errgroup.WithContext(ctx) + for _, desc := range descs { + desc := desc + + eg.Go(func() error { + desc := desc + + children, err := handler.Handle(ctx, desc) + if err != nil { + if errors.Cause(err) == SkipDesc { + return nil // don't traverse the children. + } + return err + } + + if len(children) > 0 { + return Dispatch(ctx, handler, children...) + } + + return nil + }) + } + + return eg.Wait() +} + +// ChildrenHandler decodes well-known manifests types and returns their children. +// +// This is useful for supporting recursive fetch and other use cases where you +// want to do a full walk of resources. +// +// One can also replace this with another implementation to allow descending of +// arbitrary types. +func ChildrenHandler(provider content.Provider) HandlerFunc { + return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + switch desc.MediaType { + case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: + case MediaTypeDockerSchema2Layer, MediaTypeDockerSchema2LayerGzip, + MediaTypeDockerSchema2Config: + return nil, nil + default: + return nil, fmt.Errorf("%v not yet supported", desc.MediaType) + } + + rc, err := provider.Reader(ctx, desc.Digest) + if err != nil { + return nil, err + } + defer rc.Close() + + p, err := ioutil.ReadAll(rc) + if err != nil { + return nil, err + } + + // TODO(stevvooe): We just assume oci manifest, for now. There may be + // subtle differences from the docker version. + var manifest ocispec.Manifest + if err := json.Unmarshal(p, &manifest); err != nil { + return nil, err + } + + var descs []ocispec.Descriptor + + descs = append(descs, manifest.Config) + for _, desc := range manifest.Layers { + descs = append(descs, desc) + } + + return descs, nil + } +} diff --git a/remotes/handlers.go b/remotes/handlers.go new file mode 100644 index 0000000..f621171 --- /dev/null +++ b/remotes/handlers.go @@ -0,0 +1,66 @@ +package remotes + +import ( + "context" + "fmt" + + "github.com/Sirupsen/logrus" + "github.com/docker/containerd/content" + "github.com/docker/containerd/image" + "github.com/docker/containerd/log" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +// MakeRef returns a unique reference for the descriptor. This reference can be +// used to lookup ongoing processes related to the descriptor. This function +// may look to the context to namespace the reference appropriately. +func MakeRefKey(ctx context.Context, desc ocispec.Descriptor) string { + // TODO(stevvooe): Need better remote key selection here. Should be a + // product of the context, which may include information about the ongoing + // fetch process. + switch desc.MediaType { + case image.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest, + image.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: + return "manifest-" + desc.Digest.String() + case image.MediaTypeDockerSchema2Layer, image.MediaTypeDockerSchema2LayerGzip: + return "layer-" + desc.Digest.String() + case "application/vnd.docker.container.image.v1+json": + return "config-" + desc.Digest.String() + default: + log.G(ctx).Warnf("reference for unknown type: %s", desc.MediaType) + return "unknown-" + desc.Digest.String() + } +} + +// FetchHandler returns a handler that will fetch all content into the ingester +// discovered in a call to Dispatch. Use with ChildrenHandler to do a full +// recursive fetch. +func FetchHandler(ingester content.Ingester, fetcher Fetcher) image.HandlerFunc { + return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) { + ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{ + "digest": desc.Digest, + "mediatype": desc.MediaType, + "size": desc.Size, + })) + + switch desc.MediaType { + case image.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: + return nil, fmt.Errorf("%v not yet supported", desc.MediaType) + default: + err := fetch(ctx, ingester, fetcher, desc) + return nil, err + } + } +} + +func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error { + log.G(ctx).Debug("fetch") + ref := MakeRefKey(ctx, desc) + rc, err := fetcher.Fetch(ctx, desc) + if err != nil { + return err + } + defer rc.Close() + + return content.WriteBlob(ctx, ingester, ref, rc, desc.Size, desc.Digest) +}