Merge pull request #597 from stevvooe/pull-prototype

cmd/dist: implement fetch prototype
This commit is contained in:
Derek McGowan 2017-03-06 16:39:26 -08:00 committed by GitHub
commit 7359fd0f25
7 changed files with 509 additions and 0 deletions

348
cmd/dist/fetch.go vendored Normal file
View file

@ -0,0 +1,348 @@
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"sync"
"text/tabwriter"
"time"
"github.com/Sirupsen/logrus"
contentapi "github.com/docker/containerd/api/services/content"
"github.com/docker/containerd/content"
"github.com/docker/containerd/log"
"github.com/docker/containerd/progress"
"github.com/docker/containerd/remotes"
contentservice "github.com/docker/containerd/services/content"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/urfave/cli"
"golang.org/x/sync/errgroup"
)
var fetchCommand = cli.Command{
Name: "fetch",
Usage: "fetch all content for an image into containerd",
ArgsUsage: "[flags] <remote> <object>",
Description: `Fetch an image into containerd.
This command ensures that containerd has all the necessary resources to build
an image's rootfs and convert the configuration to a runtime format supported
by containerd.
This command uses the same syntax, of remote and object, as 'dist
fetch-object'. We may want to make this nicer, but agnostism is preferred for
the moment.
Right now, the responsibility of the daemon and the cli aren't quite clear. Do
not use this implementation as a guide. The end goal should be having metadata,
content and snapshots ready for a direct use via the 'ctr run'.
Most of this is experimental and there are few leaps to make this work.`,
Flags: []cli.Flag{},
Action: func(clicontext *cli.Context) error {
var (
ctx = background
locator = clicontext.Args().First()
object = clicontext.Args().Get(1)
)
conn, err := connectGRPC(clicontext)
if err != nil {
return err
}
resolver, err := getResolver(ctx)
if err != nil {
return err
}
fetcher, err := resolver.Resolve(ctx, locator)
if err != nil {
return err
}
ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn))
cs, err := resolveContentStore(clicontext)
if err != nil {
return err
}
eg, ctx := errgroup.WithContext(ctx)
ctx = withJobsContext(ctx)
eg.Go(func() error {
return fetchManifest(ctx, ingester, fetcher, object)
})
errs := make(chan error)
go func() {
defer close(errs)
errs <- eg.Wait()
}()
ticker := time.NewTicker(100 * time.Millisecond)
fw := progress.NewWriter(os.Stdout)
start := time.Now()
defer ticker.Stop()
var done bool
for {
select {
case <-ticker.C:
fw.Flush()
tw := tabwriter.NewWriter(fw, 1, 8, 1, '\t', 0)
// fmt.Fprintln(tw, "REF\tSIZE\tAGE")
var total int64
js := getJobs(ctx)
type statusInfo struct {
Ref string
Status string
Offset int64
Total int64
StartedAt time.Time
UpdatedAt time.Time
}
statuses := map[string]statusInfo{}
activeSeen := map[string]struct{}{}
if !done {
active, err := cs.Active()
if err != nil {
log.G(ctx).WithError(err).Error("active check failed")
continue
}
// update status of active entries!
for _, active := range active {
statuses[active.Ref] = statusInfo{
Ref: active.Ref,
Status: "downloading",
Offset: active.Offset,
Total: active.Total,
StartedAt: active.StartedAt,
UpdatedAt: active.UpdatedAt,
}
activeSeen[active.Ref] = struct{}{}
}
}
// now, update the items in jobs that are not in active
for _, j := range js {
if _, ok := activeSeen[j]; ok {
continue
}
statuses[j] = statusInfo{
Ref: j,
Status: "done", // for now!
}
}
for _, j := range js {
status := statuses[j]
total += status.Offset
switch status.Status {
case "downloading":
bar := progress.Bar(float64(status.Offset) / float64(status.Total))
fmt.Fprintf(tw, "%s:\t%s\t%40r\t%8.8s/%s\n",
status.Ref,
status.Status,
bar,
progress.Bytes(status.Offset), progress.Bytes(status.Total))
case "done":
bar := progress.Bar(1.0)
fmt.Fprintf(tw, "%s:\t%s\t%40r\n",
status.Ref,
status.Status,
bar)
}
}
fmt.Fprintf(tw, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\n",
time.Since(start).Seconds(),
// TODO(stevvooe): These calculations are actually way off.
// Need to account for previously downloaded data. These
// will basically be right for a download the first time
// but will be skewed if restarting, as it includes the
// data into the start time before.
progress.Bytes(total),
progress.NewBytesPerSecond(total, time.Since(start)))
tw.Flush()
if done {
fw.Flush()
return nil
}
case err := <-errs:
if err != nil {
return err
}
done = true
case <-ctx.Done():
done = true // allow ui to update once more
}
}
return nil
},
}
// jobs provides a way of identifying the download keys for a particular task
// encountering during the pull walk.
//
// This is very minimal and will probably be replaced with something more
// featured.
type jobs struct {
added map[string]struct{}
refs []string
mu sync.Mutex
}
// jobsKeys let's us store the jobs instance in the context.
//
// This is a very cute way to do things but not ideal.
type jobsKey struct{}
func getJobs(ctx context.Context) []string {
return ctx.Value(jobsKey{}).(*jobs).jobs()
}
func addJob(ctx context.Context, job string) {
ctx.Value(jobsKey{}).(*jobs).add(job)
}
func withJobsContext(ctx context.Context) context.Context {
jobs := newJobs()
return context.WithValue(ctx, jobsKey{}, jobs)
}
func newJobs() *jobs {
return &jobs{added: make(map[string]struct{})}
}
func (j *jobs) add(ref string) {
j.mu.Lock()
defer j.mu.Unlock()
if _, ok := j.added[ref]; ok {
return
}
j.refs = append(j.refs, ref)
j.added[ref] = struct{}{}
}
func (j *jobs) jobs() []string {
j.mu.Lock()
defer j.mu.Unlock()
var jobs []string
for _, j := range j.refs {
jobs = append(jobs, j)
}
return jobs
}
func fetchManifest(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, object string, hints ...string) error {
const manifestMediaType = "application/vnd.docker.distribution.manifest.v2+json"
hints = append(hints, "mediatype:"+manifestMediaType)
ref := "manifest-" + object
addJob(ctx, ref)
rc, err := fetcher.Fetch(ctx, object, hints...)
if err != nil {
return err
}
defer rc.Close()
// it would be better to read the content back from the content store in this case.
p, err := ioutil.ReadAll(rc)
if err != nil {
return err
}
if err := content.WriteBlob(ctx, ingester, ref, bytes.NewReader(p), 0, ""); err != nil {
return err
}
// TODO(stevvooe): This assumption that we get a manifest is unfortunate.
// Need to provide way to resolve what the type of the target is.
var manifest ocispec.Manifest
if err := json.Unmarshal(p, &manifest); err != nil {
return err
}
var descs []ocispec.Descriptor
descs = append(descs, manifest.Config)
for _, desc := range manifest.Layers {
descs = append(descs, desc)
}
return dispatch(ctx, ingester, fetcher, descs...)
}
func fetch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, desc ocispec.Descriptor) error {
var (
hints []string
object = desc.Digest.String()
)
if desc.MediaType != "" {
hints = append(hints, "mediatype:"+desc.MediaType)
}
ref := "fetch-" + object
addJob(ctx, ref)
log.G(ctx).Debug("fetch")
rc, err := fetcher.Fetch(ctx, object, hints...)
if err != nil {
log.G(ctx).WithError(err).Error("fetch error")
return err
}
defer rc.Close()
// TODO(stevvooe): Need better remote key selection here. Should be a
// product of the fetcher. We may need more narrow infomation on fetcher or
// just pull from env/context.
return content.WriteBlob(ctx, ingester, ref, rc, desc.Size, desc.Digest)
}
// dispatch blocks until all content in `descs` is retrieved.
func dispatch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, descs ...ocispec.Descriptor) error {
eg, ctx := errgroup.WithContext(ctx)
for _, desc := range descs {
if err := func(desc ocispec.Descriptor) error {
ctx := log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
"digest": desc.Digest,
"mediatype": desc.MediaType,
"size": desc.Size,
}))
switch desc.MediaType {
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
eg.Go(func() error {
return fetchManifest(ctx, ingester, fetcher, desc.Digest.String(), "mediatype:"+desc.MediaType)
})
case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
return fmt.Errorf("%v not yet supported", desc.MediaType)
default:
eg.Go(func() error {
return fetch(ctx, ingester, fetcher, desc)
})
}
return nil
}(desc); err != nil {
return err
}
}
return eg.Wait()
}

1
cmd/dist/main.go vendored
View file

@ -61,6 +61,7 @@ distribution tool
},
}
app.Commands = []cli.Command{
fetchCommand,
fetchObjectCommand,
ingestCommand,
activeCommand,

65
progress/bar.go Normal file
View file

@ -0,0 +1,65 @@
package progress
import (
"bytes"
"fmt"
)
// TODO(stevvooe): We may want to support more interesting parameterization of
// the bar. For now, it is very simple.
// Bar provides a very simple progress bar implementation.
//
// Use with fmt.Printf and "r" to format the progress bar. A "-" flag makes it
// progress from right to left.
type Bar float64
var _ fmt.Formatter = Bar(1.0)
func (h Bar) Format(state fmt.State, r rune) {
switch r {
case 'r':
default:
panic(fmt.Sprintf("%v: unexpected format character", float64(h)))
}
if h > 1.0 {
h = 1.0
}
if h < 0.0 {
h = 0.0
}
if state.Flag('-') {
h = 1.0 - h
}
width, ok := state.Width()
if !ok {
// default width of 40
width = 40
}
var pad int
extra := len([]byte(green)) + len([]byte(reset))
p := make([]byte, width+extra)
p[0], p[len(p)-1] = '|', '|'
pad += 2
positive := int(Bar(width-pad) * h)
negative := width - pad - positive
n := 1
n += copy(p[n:], []byte(green))
n += copy(p[n:], bytes.Repeat([]byte("+"), positive))
n += copy(p[n:], []byte(reset))
if negative > 0 {
n += copy(p[n:len(p)-1], bytes.Repeat([]byte("-"), negative))
}
state.Write(p)
}

2
progress/doc.go Normal file
View file

@ -0,0 +1,2 @@
// Package progress assists in displaying human readable progress information.
package progress

8
progress/escape.go Normal file
View file

@ -0,0 +1,8 @@
package progress
const (
escape = "\x1b"
reset = escape + "[0m"
red = escape + "[31m"
green = escape + "[32m"
)

25
progress/humaans.go Normal file
View file

@ -0,0 +1,25 @@
package progress
import (
"fmt"
"time"
units "github.com/docker/go-units"
)
// Bytes converts a regular int64 to human readable type.
type Bytes int64
func (b Bytes) String() string {
return units.CustomSize("%02.1f %s", float64(b), 1024.0, []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"})
}
type BytesPerSecond int64
func NewBytesPerSecond(n int64, duration time.Duration) BytesPerSecond {
return BytesPerSecond(float64(n) / duration.Seconds())
}
func (bps BytesPerSecond) String() string {
return fmt.Sprintf("%v/s", Bytes(bps))
}

60
progress/writer.go Normal file
View file

@ -0,0 +1,60 @@
package progress
import (
"bytes"
"fmt"
"io"
)
// Writer buffers writes until flush, at which time the last screen is cleared
// and the current buffer contents are written. This is useful for
// implementing progress displays, such as those implemented in docker and
// git.
type Writer struct {
buf bytes.Buffer
w io.Writer
lines int
}
func NewWriter(w io.Writer) *Writer {
return &Writer{
w: w,
}
}
func (w *Writer) Write(p []byte) (n int, err error) {
return w.buf.Write(p)
}
// Flush should be called when refreshing the current display.
func (w *Writer) Flush() error {
if w.buf.Len() == 0 {
return nil
}
if err := w.clear(); err != nil {
return err
}
w.lines = bytes.Count(w.buf.Bytes(), []byte("\n"))
if _, err := w.w.Write(w.buf.Bytes()); err != nil {
return err
}
w.buf.Reset()
return nil
}
// TODO(stevvooe): The following are system specific. Break these out if we
// decide to build this package further.
func (w *Writer) clear() error {
for i := 0; i < w.lines; i++ {
if _, err := fmt.Fprintf(w.w, "\x1b[0A\x1b[2K\r"); err != nil {
return err
}
}
return nil
}