Merge pull request #638 from stevvooe/refactor-fetch

cmd/dist, image, remotes: introduce image handlers
This commit is contained in:
Michael Crosby 2017-03-17 16:07:25 -07:00 committed by GitHub
commit ffbe36e118
3 changed files with 288 additions and 148 deletions

207
cmd/dist/fetch.go vendored
View file

@ -1,19 +1,16 @@
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"os"
"sync"
"text/tabwriter"
"time"
"github.com/Sirupsen/logrus"
contentapi "github.com/docker/containerd/api/services/content"
"github.com/docker/containerd/content"
"github.com/docker/containerd/image"
"github.com/docker/containerd/log"
"github.com/docker/containerd/progress"
"github.com/docker/containerd/remotes"
@ -58,9 +55,13 @@ Most of this is experimental and there are few leaps to make this work.`,
if err != nil {
return err
}
ctx = withJobsContext(ctx)
ongoing := newJobs()
ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn))
provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
// TODO(stevvooe): Need to replace this with content store client.
cs, err := resolveContentStore(clicontext)
if err != nil {
return err
@ -70,7 +71,7 @@ Most of this is experimental and there are few leaps to make this work.`,
resolved := make(chan struct{})
eg.Go(func() error {
addJob(ctx, ref)
ongoing.add(ref)
name, desc, fetcher, err := resolver.Resolve(ctx, ref)
if err != nil {
return err
@ -78,7 +79,15 @@ Most of this is experimental and there are few leaps to make this work.`,
log.G(ctx).WithField("image", name).Debug("fetching")
close(resolved)
return dispatch(ctx, ingester, fetcher, desc)
return image.Dispatch(ctx,
image.Handlers(image.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ongoing.add(remotes.MakeRefKey(ctx, desc))
return nil, nil
}),
remotes.FetchHandler(ingester, fetcher),
image.ChildrenHandler(provider),
),
desc)
})
errs := make(chan error)
@ -99,17 +108,7 @@ Most of this is experimental and there are few leaps to make this work.`,
fw.Flush()
tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
var total int64
js := getJobs(ctx)
type statusInfo struct {
Ref string
Status string
Offset int64
Total int64
StartedAt time.Time
UpdatedAt time.Time
}
statuses := map[string]statusInfo{}
activeSeen := map[string]struct{}{}
@ -133,6 +132,7 @@ Most of this is experimental and there are few leaps to make this work.`,
}
}
js := ongoing.jobs()
// now, update the items in jobs that are not in active
for _, j := range js {
if _, ok := activeSeen[j]; ok {
@ -155,42 +155,12 @@ Most of this is experimental and there are few leaps to make this work.`,
}
}
var ordered []statusInfo
for _, j := range js {
status := statuses[j]
total += status.Offset
switch status.Status {
case "downloading":
bar := progress.Bar(float64(status.Offset) / float64(status.Total))
fmt.Fprintf(tw, "%s:\t%s\t%40r\t%8.8s/%s\t\n",
status.Ref,
status.Status,
bar,
progress.Bytes(status.Offset), progress.Bytes(status.Total))
case "resolving":
bar := progress.Bar(0.0)
fmt.Fprintf(tw, "%s:\t%s\t%40r\t\n",
status.Ref,
status.Status,
bar)
default:
bar := progress.Bar(1.0)
fmt.Fprintf(tw, "%s:\t%s\t%40r\t\n",
status.Ref,
status.Status,
bar)
}
ordered = append(ordered, statuses[j])
}
fmt.Fprintf(tw, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n",
time.Since(start).Seconds(),
// TODO(stevvooe): These calculations are actually way off.
// Need to account for previously downloaded data. These
// will basically be right for a download the first time
// but will be skewed if restarting, as it includes the
// data into the start time before.
progress.Bytes(total),
progress.NewBytesPerSecond(total, time.Since(start)))
display(tw, ordered, start)
tw.Flush()
if done {
@ -222,24 +192,6 @@ type jobs struct {
mu sync.Mutex
}
// jobsKeys let's us store the jobs instance in the context.
//
// This is a very cute way to do things but not ideal.
type jobsKey struct{}
func getJobs(ctx context.Context) []string {
return ctx.Value(jobsKey{}).(*jobs).jobs()
}
func addJob(ctx context.Context, job string) {
ctx.Value(jobsKey{}).(*jobs).add(job)
}
func withJobsContext(ctx context.Context) context.Context {
jobs := newJobs()
return context.WithValue(ctx, jobsKey{}, jobs)
}
func newJobs() *jobs {
return &jobs{added: make(map[string]struct{})}
}
@ -267,88 +219,49 @@ func (j *jobs) jobs() []string {
return jobs
}
func fetchManifest(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, desc ocispec.Descriptor) error {
ref := "manifest-" + desc.Digest.String()
addJob(ctx, ref)
rc, err := fetcher.Fetch(ctx, desc)
if err != nil {
return err
}
defer rc.Close()
// it would be better to read the content back from the content store in this case.
p, err := ioutil.ReadAll(rc)
if err != nil {
return err
type statusInfo struct {
Ref string
Status string
Offset int64
Total int64
StartedAt time.Time
UpdatedAt time.Time
}
if err := content.WriteBlob(ctx, ingester, ref, bytes.NewReader(p), 0, ""); err != nil {
return err
}
// TODO(stevvooe): This assumption that we get a manifest is unfortunate.
// Need to provide way to resolve what the type of the target is.
var manifest ocispec.Manifest
if err := json.Unmarshal(p, &manifest); err != nil {
return err
}
var descs []ocispec.Descriptor
descs = append(descs, manifest.Config)
for _, desc := range manifest.Layers {
descs = append(descs, desc)
}
return dispatch(ctx, ingester, fetcher, descs...)
}
func fetch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, desc ocispec.Descriptor) error {
ref := "fetch-" + desc.Digest.String()
addJob(ctx, ref)
log.G(ctx).Debug("fetch")
rc, err := fetcher.Fetch(ctx, desc)
if err != nil {
log.G(ctx).WithError(err).Error("fetch error")
return err
}
defer rc.Close()
// TODO(stevvooe): Need better remote key selection here. Should be a
// product of the fetcher. We may need more narrow infomation on fetcher or
// just pull from env/context.
return content.WriteBlob(ctx, ingester, ref, rc, desc.Size, desc.Digest)
}
// dispatch blocks until all content in `descs` is retrieved.
func dispatch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, descs ...ocispec.Descriptor) error {
eg, ctx := errgroup.WithContext(ctx)
for _, desc := range descs {
if err := func(desc ocispec.Descriptor) error {
ctx := log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
"digest": desc.Digest,
"mediatype": desc.MediaType,
"size": desc.Size,
}))
switch desc.MediaType {
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
eg.Go(func() error {
return fetchManifest(ctx, ingester, fetcher, desc)
})
case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
return fmt.Errorf("%v not yet supported", desc.MediaType)
func display(w io.Writer, statuses []statusInfo, start time.Time) {
var total int64
for _, status := range statuses {
total += status.Offset
switch status.Status {
case "downloading":
bar := progress.Bar(float64(status.Offset) / float64(status.Total))
fmt.Fprintf(w, "%s:\t%s\t%40r\t%8.8s/%s\t\n",
status.Ref,
status.Status,
bar,
progress.Bytes(status.Offset), progress.Bytes(status.Total))
case "resolving":
bar := progress.Bar(0.0)
fmt.Fprintf(w, "%s:\t%s\t%40r\t\n",
status.Ref,
status.Status,
bar)
default:
eg.Go(func() error {
return fetch(ctx, ingester, fetcher, desc)
})
}
return nil
}(desc); err != nil {
return err
bar := progress.Bar(1.0)
fmt.Fprintf(w, "%s:\t%s\t%40r\t\n",
status.Ref,
status.Status,
bar)
}
}
return eg.Wait()
fmt.Fprintf(w, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n",
time.Since(start).Seconds(),
// TODO(stevvooe): These calculations are actually way off.
// Need to account for previously downloaded data. These
// will basically be right for a download the first time
// but will be skewed if restarting, as it includes the
// data into the start time before.
progress.Bytes(total),
progress.NewBytesPerSecond(total, time.Since(start)))
}

161
image/handlers.go Normal file
View file

@ -0,0 +1,161 @@
package image
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"github.com/docker/containerd/content"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
const (
MediaTypeDockerSchema2Layer = "application/vnd.docker.image.rootfs.diff.tar"
MediaTypeDockerSchema2LayerGzip = "application/vnd.docker.image.rootfs.diff.tar.gzip"
MediaTypeDockerSchema2Config = "application/vnd.docker.container.image.v1+json"
MediaTypeDockerSchema2Manifest = "application/vnd.docker.distribution.manifest.v2+json"
MediaTypeDockerSchema2ManifestList = "application/vnd.docker.distribution.manifest.list.v2+json"
)
var SkipDesc = fmt.Errorf("skip descriptor")
type Handler interface {
Handle(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error)
}
type HandlerFunc func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error)
func (fn HandlerFunc) Handle(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
return fn(ctx, desc)
}
// Handlers returns a handler that will run the handlers in sequence.
func Handlers(handlers ...Handler) HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
var children []ocispec.Descriptor
for _, handler := range handlers {
ch, err := handler.Handle(ctx, desc)
if err != nil {
return nil, err
}
children = append(children, ch...)
}
return children, nil
}
}
// Walk the resources of an image and call the handler for each. If the handler
// decodes the sub-resources for each image,
//
// This differs from dispatch in that each sibling resource is considered
// synchronously.
func Walk(ctx context.Context, handler Handler, descs ...ocispec.Descriptor) error {
for _, desc := range descs {
children, err := handler.Handle(ctx, desc)
if err != nil {
if errors.Cause(err) == SkipDesc {
return nil // don't traverse the children.
}
return err
}
if len(children) > 0 {
if err := Walk(ctx, handler, children...); err != nil {
return err
}
}
}
return nil
}
// Dispatch runs the provided handler for content specified by the descriptors.
// If the handler decode subresources, they will be visited, as well.
//
// Handlers for siblings are run in parallel on the provided descriptors. A
// handler may return `SkipDesc` to signal to the dispatcher to not traverse
// any children.
//
// Typically, this function will be used with `FetchHandler`, often composed
// with other handlers.
//
// If any handler returns an error, the dispatch session will be canceled.
func Dispatch(ctx context.Context, handler Handler, descs ...ocispec.Descriptor) error {
eg, ctx := errgroup.WithContext(ctx)
for _, desc := range descs {
desc := desc
eg.Go(func() error {
desc := desc
children, err := handler.Handle(ctx, desc)
if err != nil {
if errors.Cause(err) == SkipDesc {
return nil // don't traverse the children.
}
return err
}
if len(children) > 0 {
return Dispatch(ctx, handler, children...)
}
return nil
})
}
return eg.Wait()
}
// ChildrenHandler decodes well-known manifests types and returns their children.
//
// This is useful for supporting recursive fetch and other use cases where you
// want to do a full walk of resources.
//
// One can also replace this with another implementation to allow descending of
// arbitrary types.
func ChildrenHandler(provider content.Provider) HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
switch desc.MediaType {
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
case MediaTypeDockerSchema2Layer, MediaTypeDockerSchema2LayerGzip,
MediaTypeDockerSchema2Config:
return nil, nil
default:
return nil, fmt.Errorf("%v not yet supported", desc.MediaType)
}
rc, err := provider.Reader(ctx, desc.Digest)
if err != nil {
return nil, err
}
defer rc.Close()
p, err := ioutil.ReadAll(rc)
if err != nil {
return nil, err
}
// TODO(stevvooe): We just assume oci manifest, for now. There may be
// subtle differences from the docker version.
var manifest ocispec.Manifest
if err := json.Unmarshal(p, &manifest); err != nil {
return nil, err
}
var descs []ocispec.Descriptor
descs = append(descs, manifest.Config)
for _, desc := range manifest.Layers {
descs = append(descs, desc)
}
return descs, nil
}
}

66
remotes/handlers.go Normal file
View file

@ -0,0 +1,66 @@
package remotes
import (
"context"
"fmt"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/content"
"github.com/docker/containerd/image"
"github.com/docker/containerd/log"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
// MakeRef returns a unique reference for the descriptor. This reference can be
// used to lookup ongoing processes related to the descriptor. This function
// may look to the context to namespace the reference appropriately.
func MakeRefKey(ctx context.Context, desc ocispec.Descriptor) string {
// TODO(stevvooe): Need better remote key selection here. Should be a
// product of the context, which may include information about the ongoing
// fetch process.
switch desc.MediaType {
case image.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest,
image.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
return "manifest-" + desc.Digest.String()
case image.MediaTypeDockerSchema2Layer, image.MediaTypeDockerSchema2LayerGzip:
return "layer-" + desc.Digest.String()
case "application/vnd.docker.container.image.v1+json":
return "config-" + desc.Digest.String()
default:
log.G(ctx).Warnf("reference for unknown type: %s", desc.MediaType)
return "unknown-" + desc.Digest.String()
}
}
// FetchHandler returns a handler that will fetch all content into the ingester
// discovered in a call to Dispatch. Use with ChildrenHandler to do a full
// recursive fetch.
func FetchHandler(ingester content.Ingester, fetcher Fetcher) image.HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
"digest": desc.Digest,
"mediatype": desc.MediaType,
"size": desc.Size,
}))
switch desc.MediaType {
case image.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
return nil, fmt.Errorf("%v not yet supported", desc.MediaType)
default:
err := fetch(ctx, ingester, fetcher, desc)
return nil, err
}
}
}
func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error {
log.G(ctx).Debug("fetch")
ref := MakeRefKey(ctx, desc)
rc, err := fetcher.Fetch(ctx, desc)
if err != nil {
return err
}
defer rc.Close()
return content.WriteBlob(ctx, ingester, ref, rc, desc.Size, desc.Digest)
}