diff --git a/cmd/dist/fetch.go b/cmd/dist/fetch.go index f210cfb..820d8d8 100644 --- a/cmd/dist/fetch.go +++ b/cmd/dist/fetch.go @@ -1,19 +1,16 @@ package main import ( - "bytes" "context" - "encoding/json" "fmt" - "io/ioutil" + "io" "os" "sync" "text/tabwriter" "time" - "github.com/Sirupsen/logrus" contentapi "github.com/docker/containerd/api/services/content" - "github.com/docker/containerd/content" + "github.com/docker/containerd/image" "github.com/docker/containerd/log" "github.com/docker/containerd/progress" "github.com/docker/containerd/remotes" @@ -58,9 +55,13 @@ Most of this is experimental and there are few leaps to make this work.`, if err != nil { return err } - ctx = withJobsContext(ctx) + + ongoing := newJobs() ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn)) + provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)) + + // TODO(stevvooe): Need to replace this with content store client. cs, err := resolveContentStore(clicontext) if err != nil { return err @@ -70,7 +71,7 @@ Most of this is experimental and there are few leaps to make this work.`, resolved := make(chan struct{}) eg.Go(func() error { - addJob(ctx, ref) + ongoing.add(ref) name, desc, fetcher, err := resolver.Resolve(ctx, ref) if err != nil { return err @@ -78,7 +79,15 @@ Most of this is experimental and there are few leaps to make this work.`, log.G(ctx).WithField("image", name).Debug("fetching") close(resolved) - return dispatch(ctx, ingester, fetcher, desc) + return image.Dispatch(ctx, + image.Handlers(image.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + ongoing.add(remotes.MakeRefKey(ctx, desc)) + return nil, nil + }), + remotes.FetchHandler(ingester, fetcher), + image.ChildrenHandler(provider), + ), + desc) }) errs := make(chan error) @@ -99,17 +108,7 @@ Most of this is experimental and there are few leaps to make this work.`, fw.Flush() tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0) - var total int64 - js := getJobs(ctx) - type statusInfo struct { - Ref string - Status string - Offset int64 - Total int64 - StartedAt time.Time - UpdatedAt time.Time - } statuses := map[string]statusInfo{} activeSeen := map[string]struct{}{} @@ -133,6 +132,7 @@ Most of this is experimental and there are few leaps to make this work.`, } } + js := ongoing.jobs() // now, update the items in jobs that are not in active for _, j := range js { if _, ok := activeSeen[j]; ok { @@ -155,42 +155,12 @@ Most of this is experimental and there are few leaps to make this work.`, } } + var ordered []statusInfo for _, j := range js { - status := statuses[j] - - total += status.Offset - switch status.Status { - case "downloading": - bar := progress.Bar(float64(status.Offset) / float64(status.Total)) - fmt.Fprintf(tw, "%s:\t%s\t%40r\t%8.8s/%s\t\n", - status.Ref, - status.Status, - bar, - progress.Bytes(status.Offset), progress.Bytes(status.Total)) - case "resolving": - bar := progress.Bar(0.0) - fmt.Fprintf(tw, "%s:\t%s\t%40r\t\n", - status.Ref, - status.Status, - bar) - default: - bar := progress.Bar(1.0) - fmt.Fprintf(tw, "%s:\t%s\t%40r\t\n", - status.Ref, - status.Status, - bar) - } + ordered = append(ordered, statuses[j]) } - fmt.Fprintf(tw, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n", - time.Since(start).Seconds(), - // TODO(stevvooe): These calculations are actually way off. - // Need to account for previously downloaded data. These - // will basically be right for a download the first time - // but will be skewed if restarting, as it includes the - // data into the start time before. - progress.Bytes(total), - progress.NewBytesPerSecond(total, time.Since(start))) + display(tw, ordered, start) tw.Flush() if done { @@ -222,24 +192,6 @@ type jobs struct { mu sync.Mutex } -// jobsKeys let's us store the jobs instance in the context. -// -// This is a very cute way to do things but not ideal. -type jobsKey struct{} - -func getJobs(ctx context.Context) []string { - return ctx.Value(jobsKey{}).(*jobs).jobs() -} - -func addJob(ctx context.Context, job string) { - ctx.Value(jobsKey{}).(*jobs).add(job) -} - -func withJobsContext(ctx context.Context) context.Context { - jobs := newJobs() - return context.WithValue(ctx, jobsKey{}, jobs) -} - func newJobs() *jobs { return &jobs{added: make(map[string]struct{})} } @@ -267,88 +219,49 @@ func (j *jobs) jobs() []string { return jobs } -func fetchManifest(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, desc ocispec.Descriptor) error { - ref := "manifest-" + desc.Digest.String() - addJob(ctx, ref) - - rc, err := fetcher.Fetch(ctx, desc) - if err != nil { - return err - } - defer rc.Close() - - // it would be better to read the content back from the content store in this case. - p, err := ioutil.ReadAll(rc) - if err != nil { - return err - } - - if err := content.WriteBlob(ctx, ingester, ref, bytes.NewReader(p), 0, ""); err != nil { - return err - } - - // TODO(stevvooe): This assumption that we get a manifest is unfortunate. - // Need to provide way to resolve what the type of the target is. - var manifest ocispec.Manifest - if err := json.Unmarshal(p, &manifest); err != nil { - return err - } - - var descs []ocispec.Descriptor - - descs = append(descs, manifest.Config) - for _, desc := range manifest.Layers { - descs = append(descs, desc) - } - - return dispatch(ctx, ingester, fetcher, descs...) +type statusInfo struct { + Ref string + Status string + Offset int64 + Total int64 + StartedAt time.Time + UpdatedAt time.Time } -func fetch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, desc ocispec.Descriptor) error { - ref := "fetch-" + desc.Digest.String() - addJob(ctx, ref) - log.G(ctx).Debug("fetch") - rc, err := fetcher.Fetch(ctx, desc) - if err != nil { - log.G(ctx).WithError(err).Error("fetch error") - return err - } - defer rc.Close() - - // TODO(stevvooe): Need better remote key selection here. Should be a - // product of the fetcher. We may need more narrow infomation on fetcher or - // just pull from env/context. - return content.WriteBlob(ctx, ingester, ref, rc, desc.Size, desc.Digest) -} - -// dispatch blocks until all content in `descs` is retrieved. -func dispatch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, descs ...ocispec.Descriptor) error { - eg, ctx := errgroup.WithContext(ctx) - for _, desc := range descs { - if err := func(desc ocispec.Descriptor) error { - ctx := log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{ - "digest": desc.Digest, - "mediatype": desc.MediaType, - "size": desc.Size, - })) - switch desc.MediaType { - case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: - eg.Go(func() error { - return fetchManifest(ctx, ingester, fetcher, desc) - }) - case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: - return fmt.Errorf("%v not yet supported", desc.MediaType) - default: - eg.Go(func() error { - return fetch(ctx, ingester, fetcher, desc) - }) - } - - return nil - }(desc); err != nil { - return err +func display(w io.Writer, statuses []statusInfo, start time.Time) { + var total int64 + for _, status := range statuses { + total += status.Offset + switch status.Status { + case "downloading": + bar := progress.Bar(float64(status.Offset) / float64(status.Total)) + fmt.Fprintf(w, "%s:\t%s\t%40r\t%8.8s/%s\t\n", + status.Ref, + status.Status, + bar, + progress.Bytes(status.Offset), progress.Bytes(status.Total)) + case "resolving": + bar := progress.Bar(0.0) + fmt.Fprintf(w, "%s:\t%s\t%40r\t\n", + status.Ref, + status.Status, + bar) + default: + bar := progress.Bar(1.0) + fmt.Fprintf(w, "%s:\t%s\t%40r\t\n", + status.Ref, + status.Status, + bar) } } - return eg.Wait() + fmt.Fprintf(w, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n", + time.Since(start).Seconds(), + // TODO(stevvooe): These calculations are actually way off. + // Need to account for previously downloaded data. These + // will basically be right for a download the first time + // but will be skewed if restarting, as it includes the + // data into the start time before. + progress.Bytes(total), + progress.NewBytesPerSecond(total, time.Since(start))) } diff --git a/image/handlers.go b/image/handlers.go new file mode 100644 index 0000000..44c7563 --- /dev/null +++ b/image/handlers.go @@ -0,0 +1,161 @@ +package image + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + + "github.com/docker/containerd/content" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" +) + +const ( + MediaTypeDockerSchema2Layer = "application/vnd.docker.image.rootfs.diff.tar" + MediaTypeDockerSchema2LayerGzip = "application/vnd.docker.image.rootfs.diff.tar.gzip" + MediaTypeDockerSchema2Config = "application/vnd.docker.container.image.v1+json" + MediaTypeDockerSchema2Manifest = "application/vnd.docker.distribution.manifest.v2+json" + MediaTypeDockerSchema2ManifestList = "application/vnd.docker.distribution.manifest.list.v2+json" +) + +var SkipDesc = fmt.Errorf("skip descriptor") + +type Handler interface { + Handle(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) +} + +type HandlerFunc func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) + +func (fn HandlerFunc) Handle(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) { + return fn(ctx, desc) +} + +// Handlers returns a handler that will run the handlers in sequence. +func Handlers(handlers ...Handler) HandlerFunc { + return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) { + var children []ocispec.Descriptor + for _, handler := range handlers { + ch, err := handler.Handle(ctx, desc) + if err != nil { + return nil, err + } + + children = append(children, ch...) + } + + return children, nil + } +} + +// Walk the resources of an image and call the handler for each. If the handler +// decodes the sub-resources for each image, +// +// This differs from dispatch in that each sibling resource is considered +// synchronously. +func Walk(ctx context.Context, handler Handler, descs ...ocispec.Descriptor) error { + for _, desc := range descs { + + children, err := handler.Handle(ctx, desc) + if err != nil { + if errors.Cause(err) == SkipDesc { + return nil // don't traverse the children. + } + return err + } + + if len(children) > 0 { + if err := Walk(ctx, handler, children...); err != nil { + return err + } + } + } + + return nil +} + +// Dispatch runs the provided handler for content specified by the descriptors. +// If the handler decode subresources, they will be visited, as well. +// +// Handlers for siblings are run in parallel on the provided descriptors. A +// handler may return `SkipDesc` to signal to the dispatcher to not traverse +// any children. +// +// Typically, this function will be used with `FetchHandler`, often composed +// with other handlers. +// +// If any handler returns an error, the dispatch session will be canceled. +func Dispatch(ctx context.Context, handler Handler, descs ...ocispec.Descriptor) error { + eg, ctx := errgroup.WithContext(ctx) + for _, desc := range descs { + desc := desc + + eg.Go(func() error { + desc := desc + + children, err := handler.Handle(ctx, desc) + if err != nil { + if errors.Cause(err) == SkipDesc { + return nil // don't traverse the children. + } + return err + } + + if len(children) > 0 { + return Dispatch(ctx, handler, children...) + } + + return nil + }) + } + + return eg.Wait() +} + +// ChildrenHandler decodes well-known manifests types and returns their children. +// +// This is useful for supporting recursive fetch and other use cases where you +// want to do a full walk of resources. +// +// One can also replace this with another implementation to allow descending of +// arbitrary types. +func ChildrenHandler(provider content.Provider) HandlerFunc { + return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + switch desc.MediaType { + case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: + case MediaTypeDockerSchema2Layer, MediaTypeDockerSchema2LayerGzip, + MediaTypeDockerSchema2Config: + return nil, nil + default: + return nil, fmt.Errorf("%v not yet supported", desc.MediaType) + } + + rc, err := provider.Reader(ctx, desc.Digest) + if err != nil { + return nil, err + } + defer rc.Close() + + p, err := ioutil.ReadAll(rc) + if err != nil { + return nil, err + } + + // TODO(stevvooe): We just assume oci manifest, for now. There may be + // subtle differences from the docker version. + var manifest ocispec.Manifest + if err := json.Unmarshal(p, &manifest); err != nil { + return nil, err + } + + var descs []ocispec.Descriptor + + descs = append(descs, manifest.Config) + for _, desc := range manifest.Layers { + descs = append(descs, desc) + } + + return descs, nil + } +} diff --git a/remotes/handlers.go b/remotes/handlers.go new file mode 100644 index 0000000..f621171 --- /dev/null +++ b/remotes/handlers.go @@ -0,0 +1,66 @@ +package remotes + +import ( + "context" + "fmt" + + "github.com/Sirupsen/logrus" + "github.com/docker/containerd/content" + "github.com/docker/containerd/image" + "github.com/docker/containerd/log" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +// MakeRef returns a unique reference for the descriptor. This reference can be +// used to lookup ongoing processes related to the descriptor. This function +// may look to the context to namespace the reference appropriately. +func MakeRefKey(ctx context.Context, desc ocispec.Descriptor) string { + // TODO(stevvooe): Need better remote key selection here. Should be a + // product of the context, which may include information about the ongoing + // fetch process. + switch desc.MediaType { + case image.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest, + image.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: + return "manifest-" + desc.Digest.String() + case image.MediaTypeDockerSchema2Layer, image.MediaTypeDockerSchema2LayerGzip: + return "layer-" + desc.Digest.String() + case "application/vnd.docker.container.image.v1+json": + return "config-" + desc.Digest.String() + default: + log.G(ctx).Warnf("reference for unknown type: %s", desc.MediaType) + return "unknown-" + desc.Digest.String() + } +} + +// FetchHandler returns a handler that will fetch all content into the ingester +// discovered in a call to Dispatch. Use with ChildrenHandler to do a full +// recursive fetch. +func FetchHandler(ingester content.Ingester, fetcher Fetcher) image.HandlerFunc { + return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) { + ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{ + "digest": desc.Digest, + "mediatype": desc.MediaType, + "size": desc.Size, + })) + + switch desc.MediaType { + case image.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: + return nil, fmt.Errorf("%v not yet supported", desc.MediaType) + default: + err := fetch(ctx, ingester, fetcher, desc) + return nil, err + } + } +} + +func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error { + log.G(ctx).Debug("fetch") + ref := MakeRefKey(ctx, desc) + rc, err := fetcher.Fetch(ctx, desc) + if err != nil { + return err + } + defer rc.Close() + + return content.WriteBlob(ctx, ingester, ref, rc, desc.Size, desc.Digest) +}