diff --git a/cmd/dist/fetch.go b/cmd/dist/fetch.go new file mode 100644 index 0000000..a586cf2 --- /dev/null +++ b/cmd/dist/fetch.go @@ -0,0 +1,348 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "os" + "sync" + "text/tabwriter" + "time" + + "github.com/Sirupsen/logrus" + contentapi "github.com/docker/containerd/api/services/content" + "github.com/docker/containerd/content" + "github.com/docker/containerd/log" + "github.com/docker/containerd/progress" + "github.com/docker/containerd/remotes" + contentservice "github.com/docker/containerd/services/content" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/urfave/cli" + "golang.org/x/sync/errgroup" +) + +var fetchCommand = cli.Command{ + Name: "fetch", + Usage: "fetch all content for an image into containerd", + ArgsUsage: "[flags] ", + Description: `Fetch an image into containerd. + +This command ensures that containerd has all the necessary resources to build +an image's rootfs and convert the configuration to a runtime format supported +by containerd. + +This command uses the same syntax, of remote and object, as 'dist +fetch-object'. We may want to make this nicer, but agnostism is preferred for +the moment. + +Right now, the responsibility of the daemon and the cli aren't quite clear. Do +not use this implementation as a guide. The end goal should be having metadata, +content and snapshots ready for a direct use via the 'ctr run'. + +Most of this is experimental and there are few leaps to make this work.`, + Flags: []cli.Flag{}, + Action: func(clicontext *cli.Context) error { + var ( + ctx = background + locator = clicontext.Args().First() + object = clicontext.Args().Get(1) + ) + + conn, err := connectGRPC(clicontext) + if err != nil { + return err + } + + resolver, err := getResolver(ctx) + if err != nil { + return err + } + + fetcher, err := resolver.Resolve(ctx, locator) + if err != nil { + return err + } + + ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn)) + cs, err := resolveContentStore(clicontext) + if err != nil { + return err + } + + eg, ctx := errgroup.WithContext(ctx) + + ctx = withJobsContext(ctx) + + eg.Go(func() error { + return fetchManifest(ctx, ingester, fetcher, object) + }) + + errs := make(chan error) + go func() { + defer close(errs) + errs <- eg.Wait() + }() + + ticker := time.NewTicker(100 * time.Millisecond) + fw := progress.NewWriter(os.Stdout) + start := time.Now() + defer ticker.Stop() + var done bool + + for { + select { + case <-ticker.C: + fw.Flush() + + tw := tabwriter.NewWriter(fw, 1, 8, 1, '\t', 0) + // fmt.Fprintln(tw, "REF\tSIZE\tAGE") + var total int64 + + js := getJobs(ctx) + type statusInfo struct { + Ref string + Status string + Offset int64 + Total int64 + StartedAt time.Time + UpdatedAt time.Time + } + statuses := map[string]statusInfo{} + + activeSeen := map[string]struct{}{} + if !done { + active, err := cs.Active() + if err != nil { + log.G(ctx).WithError(err).Error("active check failed") + continue + } + // update status of active entries! + for _, active := range active { + statuses[active.Ref] = statusInfo{ + Ref: active.Ref, + Status: "downloading", + Offset: active.Offset, + Total: active.Total, + StartedAt: active.StartedAt, + UpdatedAt: active.UpdatedAt, + } + activeSeen[active.Ref] = struct{}{} + } + } + + // now, update the items in jobs that are not in active + for _, j := range js { + if _, ok := activeSeen[j]; ok { + continue + } + + statuses[j] = statusInfo{ + Ref: j, + Status: "done", // for now! + } + } + + for _, j := range js { + status := statuses[j] + + total += status.Offset + switch status.Status { + case "downloading": + bar := progress.Bar(float64(status.Offset) / float64(status.Total)) + fmt.Fprintf(tw, "%s:\t%s\t%40r\t%8.8s/%s\n", + status.Ref, + status.Status, + bar, + progress.Bytes(status.Offset), progress.Bytes(status.Total)) + case "done": + bar := progress.Bar(1.0) + fmt.Fprintf(tw, "%s:\t%s\t%40r\n", + status.Ref, + status.Status, + bar) + } + } + + fmt.Fprintf(tw, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\n", + time.Since(start).Seconds(), + // TODO(stevvooe): These calculations are actually way off. + // Need to account for previously downloaded data. These + // will basically be right for a download the first time + // but will be skewed if restarting, as it includes the + // data into the start time before. + progress.Bytes(total), + progress.NewBytesPerSecond(total, time.Since(start))) + tw.Flush() + + if done { + fw.Flush() + return nil + } + case err := <-errs: + if err != nil { + return err + } + done = true + case <-ctx.Done(): + done = true // allow ui to update once more + } + } + + return nil + }, +} + +// jobs provides a way of identifying the download keys for a particular task +// encountering during the pull walk. +// +// This is very minimal and will probably be replaced with something more +// featured. +type jobs struct { + added map[string]struct{} + refs []string + mu sync.Mutex +} + +// jobsKeys let's us store the jobs instance in the context. +// +// This is a very cute way to do things but not ideal. +type jobsKey struct{} + +func getJobs(ctx context.Context) []string { + return ctx.Value(jobsKey{}).(*jobs).jobs() +} + +func addJob(ctx context.Context, job string) { + ctx.Value(jobsKey{}).(*jobs).add(job) +} + +func withJobsContext(ctx context.Context) context.Context { + jobs := newJobs() + return context.WithValue(ctx, jobsKey{}, jobs) +} + +func newJobs() *jobs { + return &jobs{added: make(map[string]struct{})} +} + +func (j *jobs) add(ref string) { + j.mu.Lock() + defer j.mu.Unlock() + + if _, ok := j.added[ref]; ok { + return + } + j.refs = append(j.refs, ref) + j.added[ref] = struct{}{} +} + +func (j *jobs) jobs() []string { + j.mu.Lock() + defer j.mu.Unlock() + + var jobs []string + for _, j := range j.refs { + jobs = append(jobs, j) + } + + return jobs +} + +func fetchManifest(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, object string, hints ...string) error { + const manifestMediaType = "application/vnd.docker.distribution.manifest.v2+json" + hints = append(hints, "mediatype:"+manifestMediaType) + + ref := "manifest-" + object + addJob(ctx, ref) + + rc, err := fetcher.Fetch(ctx, object, hints...) + if err != nil { + return err + } + defer rc.Close() + + // it would be better to read the content back from the content store in this case. + p, err := ioutil.ReadAll(rc) + if err != nil { + return err + } + + if err := content.WriteBlob(ctx, ingester, ref, bytes.NewReader(p), 0, ""); err != nil { + return err + } + + // TODO(stevvooe): This assumption that we get a manifest is unfortunate. + // Need to provide way to resolve what the type of the target is. + var manifest ocispec.Manifest + if err := json.Unmarshal(p, &manifest); err != nil { + return err + } + + var descs []ocispec.Descriptor + + descs = append(descs, manifest.Config) + for _, desc := range manifest.Layers { + descs = append(descs, desc) + } + + return dispatch(ctx, ingester, fetcher, descs...) +} + +func fetch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, desc ocispec.Descriptor) error { + var ( + hints []string + object = desc.Digest.String() + ) + if desc.MediaType != "" { + hints = append(hints, "mediatype:"+desc.MediaType) + } + + ref := "fetch-" + object + addJob(ctx, ref) + log.G(ctx).Debug("fetch") + rc, err := fetcher.Fetch(ctx, object, hints...) + if err != nil { + log.G(ctx).WithError(err).Error("fetch error") + return err + } + defer rc.Close() + + // TODO(stevvooe): Need better remote key selection here. Should be a + // product of the fetcher. We may need more narrow infomation on fetcher or + // just pull from env/context. + return content.WriteBlob(ctx, ingester, ref, rc, desc.Size, desc.Digest) +} + +// dispatch blocks until all content in `descs` is retrieved. +func dispatch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, descs ...ocispec.Descriptor) error { + eg, ctx := errgroup.WithContext(ctx) + for _, desc := range descs { + if err := func(desc ocispec.Descriptor) error { + ctx := log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{ + "digest": desc.Digest, + "mediatype": desc.MediaType, + "size": desc.Size, + })) + switch desc.MediaType { + case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: + eg.Go(func() error { + return fetchManifest(ctx, ingester, fetcher, desc.Digest.String(), "mediatype:"+desc.MediaType) + }) + case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: + return fmt.Errorf("%v not yet supported", desc.MediaType) + default: + eg.Go(func() error { + return fetch(ctx, ingester, fetcher, desc) + }) + } + + return nil + }(desc); err != nil { + return err + } + } + + return eg.Wait() +} diff --git a/cmd/dist/main.go b/cmd/dist/main.go index 2172a37..cf96ad5 100644 --- a/cmd/dist/main.go +++ b/cmd/dist/main.go @@ -61,6 +61,7 @@ distribution tool }, } app.Commands = []cli.Command{ + fetchCommand, fetchObjectCommand, ingestCommand, activeCommand, diff --git a/progress/bar.go b/progress/bar.go new file mode 100644 index 0000000..fa916ef --- /dev/null +++ b/progress/bar.go @@ -0,0 +1,65 @@ +package progress + +import ( + "bytes" + "fmt" +) + +// TODO(stevvooe): We may want to support more interesting parameterization of +// the bar. For now, it is very simple. + +// Bar provides a very simple progress bar implementation. +// +// Use with fmt.Printf and "r" to format the progress bar. A "-" flag makes it +// progress from right to left. +type Bar float64 + +var _ fmt.Formatter = Bar(1.0) + +func (h Bar) Format(state fmt.State, r rune) { + switch r { + case 'r': + default: + panic(fmt.Sprintf("%v: unexpected format character", float64(h))) + } + + if h > 1.0 { + h = 1.0 + } + + if h < 0.0 { + h = 0.0 + } + + if state.Flag('-') { + h = 1.0 - h + } + + width, ok := state.Width() + if !ok { + // default width of 40 + width = 40 + } + + var pad int + + extra := len([]byte(green)) + len([]byte(reset)) + + p := make([]byte, width+extra) + p[0], p[len(p)-1] = '|', '|' + pad += 2 + + positive := int(Bar(width-pad) * h) + negative := width - pad - positive + + n := 1 + n += copy(p[n:], []byte(green)) + n += copy(p[n:], bytes.Repeat([]byte("+"), positive)) + n += copy(p[n:], []byte(reset)) + + if negative > 0 { + n += copy(p[n:len(p)-1], bytes.Repeat([]byte("-"), negative)) + } + + state.Write(p) +} diff --git a/progress/doc.go b/progress/doc.go new file mode 100644 index 0000000..7f580af --- /dev/null +++ b/progress/doc.go @@ -0,0 +1,2 @@ +// Package progress assists in displaying human readable progress information. +package progress diff --git a/progress/escape.go b/progress/escape.go new file mode 100644 index 0000000..5889efe --- /dev/null +++ b/progress/escape.go @@ -0,0 +1,8 @@ +package progress + +const ( + escape = "\x1b" + reset = escape + "[0m" + red = escape + "[31m" + green = escape + "[32m" +) diff --git a/progress/humaans.go b/progress/humaans.go new file mode 100644 index 0000000..8775899 --- /dev/null +++ b/progress/humaans.go @@ -0,0 +1,25 @@ +package progress + +import ( + "fmt" + "time" + + units "github.com/docker/go-units" +) + +// Bytes converts a regular int64 to human readable type. +type Bytes int64 + +func (b Bytes) String() string { + return units.CustomSize("%02.1f %s", float64(b), 1024.0, []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"}) +} + +type BytesPerSecond int64 + +func NewBytesPerSecond(n int64, duration time.Duration) BytesPerSecond { + return BytesPerSecond(float64(n) / duration.Seconds()) +} + +func (bps BytesPerSecond) String() string { + return fmt.Sprintf("%v/s", Bytes(bps)) +} diff --git a/progress/writer.go b/progress/writer.go new file mode 100644 index 0000000..a3f90b8 --- /dev/null +++ b/progress/writer.go @@ -0,0 +1,60 @@ +package progress + +import ( + "bytes" + "fmt" + "io" +) + +// Writer buffers writes until flush, at which time the last screen is cleared +// and the current buffer contents are written. This is useful for +// implementing progress displays, such as those implemented in docker and +// git. +type Writer struct { + buf bytes.Buffer + w io.Writer + lines int +} + +func NewWriter(w io.Writer) *Writer { + return &Writer{ + w: w, + } +} + +func (w *Writer) Write(p []byte) (n int, err error) { + return w.buf.Write(p) +} + +// Flush should be called when refreshing the current display. +func (w *Writer) Flush() error { + if w.buf.Len() == 0 { + return nil + } + + if err := w.clear(); err != nil { + return err + } + + w.lines = bytes.Count(w.buf.Bytes(), []byte("\n")) + + if _, err := w.w.Write(w.buf.Bytes()); err != nil { + return err + } + + w.buf.Reset() + return nil +} + +// TODO(stevvooe): The following are system specific. Break these out if we +// decide to build this package further. + +func (w *Writer) clear() error { + for i := 0; i < w.lines; i++ { + if _, err := fmt.Fprintf(w.w, "\x1b[0A\x1b[2K\r"); err != nil { + return err + } + } + + return nil +}