cmd/dist: implement fetch prototype
With the rename of fetch to fetch-object, we now introduce the `fetch` command. It will fetch all of the resources required for an image into the content store. We'll still need to follow this up with metadata registration but this is a good start. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
971b9ca29a
commit
55a1b4eff8
7 changed files with 509 additions and 0 deletions
348
cmd/dist/fetch.go
vendored
Normal file
348
cmd/dist/fetch.go
vendored
Normal file
|
@ -0,0 +1,348 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"text/tabwriter"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
contentapi "github.com/docker/containerd/api/services/content"
|
||||||
|
"github.com/docker/containerd/content"
|
||||||
|
"github.com/docker/containerd/log"
|
||||||
|
"github.com/docker/containerd/progress"
|
||||||
|
"github.com/docker/containerd/remotes"
|
||||||
|
contentservice "github.com/docker/containerd/services/content"
|
||||||
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
|
"github.com/urfave/cli"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
var fetchCommand = cli.Command{
|
||||||
|
Name: "fetch",
|
||||||
|
Usage: "fetch all content for an image into containerd",
|
||||||
|
ArgsUsage: "[flags] <remote> <object>",
|
||||||
|
Description: `Fetch an image into containerd.
|
||||||
|
|
||||||
|
This command ensures that containerd has all the necessary resources to build
|
||||||
|
an image's rootfs and convert the configuration to a runtime format supported
|
||||||
|
by containerd.
|
||||||
|
|
||||||
|
This command uses the same syntax, of remote and object, as 'dist
|
||||||
|
fetch-object'. We may want to make this nicer, but agnostism is preferred for
|
||||||
|
the moment.
|
||||||
|
|
||||||
|
Right now, the responsibility of the daemon and the cli aren't quite clear. Do
|
||||||
|
not use this implementation as a guide. The end goal should be having metadata,
|
||||||
|
content and snapshots ready for a direct use via the 'ctr run'.
|
||||||
|
|
||||||
|
Most of this is experimental and there are few leaps to make this work.`,
|
||||||
|
Flags: []cli.Flag{},
|
||||||
|
Action: func(clicontext *cli.Context) error {
|
||||||
|
var (
|
||||||
|
ctx = background
|
||||||
|
locator = clicontext.Args().First()
|
||||||
|
object = clicontext.Args().Get(1)
|
||||||
|
)
|
||||||
|
|
||||||
|
conn, err := connectGRPC(clicontext)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
resolver, err := getResolver(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
fetcher, err := resolver.Resolve(ctx, locator)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn))
|
||||||
|
cs, err := resolveContentStore(clicontext)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
|
ctx = withJobsContext(ctx)
|
||||||
|
|
||||||
|
eg.Go(func() error {
|
||||||
|
return fetchManifest(ctx, ingester, fetcher, object)
|
||||||
|
})
|
||||||
|
|
||||||
|
errs := make(chan error)
|
||||||
|
go func() {
|
||||||
|
defer close(errs)
|
||||||
|
errs <- eg.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
ticker := time.NewTicker(100 * time.Millisecond)
|
||||||
|
fw := progress.NewWriter(os.Stdout)
|
||||||
|
start := time.Now()
|
||||||
|
defer ticker.Stop()
|
||||||
|
var done bool
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
fw.Flush()
|
||||||
|
|
||||||
|
tw := tabwriter.NewWriter(fw, 1, 8, 1, '\t', 0)
|
||||||
|
// fmt.Fprintln(tw, "REF\tSIZE\tAGE")
|
||||||
|
var total int64
|
||||||
|
|
||||||
|
js := getJobs(ctx)
|
||||||
|
type statusInfo struct {
|
||||||
|
Ref string
|
||||||
|
Status string
|
||||||
|
Offset int64
|
||||||
|
Total int64
|
||||||
|
StartedAt time.Time
|
||||||
|
UpdatedAt time.Time
|
||||||
|
}
|
||||||
|
statuses := map[string]statusInfo{}
|
||||||
|
|
||||||
|
activeSeen := map[string]struct{}{}
|
||||||
|
if !done {
|
||||||
|
active, err := cs.Active()
|
||||||
|
if err != nil {
|
||||||
|
log.G(ctx).WithError(err).Error("active check failed")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// update status of active entries!
|
||||||
|
for _, active := range active {
|
||||||
|
statuses[active.Ref] = statusInfo{
|
||||||
|
Ref: active.Ref,
|
||||||
|
Status: "downloading",
|
||||||
|
Offset: active.Offset,
|
||||||
|
Total: active.Total,
|
||||||
|
StartedAt: active.StartedAt,
|
||||||
|
UpdatedAt: active.UpdatedAt,
|
||||||
|
}
|
||||||
|
activeSeen[active.Ref] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// now, update the items in jobs that are not in active
|
||||||
|
for _, j := range js {
|
||||||
|
if _, ok := activeSeen[j]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
statuses[j] = statusInfo{
|
||||||
|
Ref: j,
|
||||||
|
Status: "done", // for now!
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, j := range js {
|
||||||
|
status := statuses[j]
|
||||||
|
|
||||||
|
total += status.Offset
|
||||||
|
switch status.Status {
|
||||||
|
case "downloading":
|
||||||
|
bar := progress.Bar(float64(status.Offset) / float64(status.Total))
|
||||||
|
fmt.Fprintf(tw, "%s:\t%s\t%40r\t%8.8s/%s\n",
|
||||||
|
status.Ref,
|
||||||
|
status.Status,
|
||||||
|
bar,
|
||||||
|
progress.Bytes(status.Offset), progress.Bytes(status.Total))
|
||||||
|
case "done":
|
||||||
|
bar := progress.Bar(1.0)
|
||||||
|
fmt.Fprintf(tw, "%s:\t%s\t%40r\n",
|
||||||
|
status.Ref,
|
||||||
|
status.Status,
|
||||||
|
bar)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(tw, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\n",
|
||||||
|
time.Since(start).Seconds(),
|
||||||
|
// TODO(stevvooe): These calculations are actually way off.
|
||||||
|
// Need to account for previously downloaded data. These
|
||||||
|
// will basically be right for a download the first time
|
||||||
|
// but will be skewed if restarting, as it includes the
|
||||||
|
// data into the start time before.
|
||||||
|
progress.Bytes(total),
|
||||||
|
progress.NewBytesPerSecond(total, time.Since(start)))
|
||||||
|
tw.Flush()
|
||||||
|
|
||||||
|
if done {
|
||||||
|
fw.Flush()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
case err := <-errs:
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
done = true
|
||||||
|
case <-ctx.Done():
|
||||||
|
done = true // allow ui to update once more
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// jobs provides a way of identifying the download keys for a particular task
|
||||||
|
// encountering during the pull walk.
|
||||||
|
//
|
||||||
|
// This is very minimal and will probably be replaced with something more
|
||||||
|
// featured.
|
||||||
|
type jobs struct {
|
||||||
|
added map[string]struct{}
|
||||||
|
refs []string
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// jobsKeys let's us store the jobs instance in the context.
|
||||||
|
//
|
||||||
|
// This is a very cute way to do things but not ideal.
|
||||||
|
type jobsKey struct{}
|
||||||
|
|
||||||
|
func getJobs(ctx context.Context) []string {
|
||||||
|
return ctx.Value(jobsKey{}).(*jobs).jobs()
|
||||||
|
}
|
||||||
|
|
||||||
|
func addJob(ctx context.Context, job string) {
|
||||||
|
ctx.Value(jobsKey{}).(*jobs).add(job)
|
||||||
|
}
|
||||||
|
|
||||||
|
func withJobsContext(ctx context.Context) context.Context {
|
||||||
|
jobs := newJobs()
|
||||||
|
return context.WithValue(ctx, jobsKey{}, jobs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newJobs() *jobs {
|
||||||
|
return &jobs{added: make(map[string]struct{})}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *jobs) add(ref string) {
|
||||||
|
j.mu.Lock()
|
||||||
|
defer j.mu.Unlock()
|
||||||
|
|
||||||
|
if _, ok := j.added[ref]; ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
j.refs = append(j.refs, ref)
|
||||||
|
j.added[ref] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *jobs) jobs() []string {
|
||||||
|
j.mu.Lock()
|
||||||
|
defer j.mu.Unlock()
|
||||||
|
|
||||||
|
var jobs []string
|
||||||
|
for _, j := range j.refs {
|
||||||
|
jobs = append(jobs, j)
|
||||||
|
}
|
||||||
|
|
||||||
|
return jobs
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchManifest(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, object string, hints ...string) error {
|
||||||
|
const manifestMediaType = "application/vnd.docker.distribution.manifest.v2+json"
|
||||||
|
hints = append(hints, "mediatype:"+manifestMediaType)
|
||||||
|
|
||||||
|
ref := "manifest-" + object
|
||||||
|
addJob(ctx, ref)
|
||||||
|
|
||||||
|
rc, err := fetcher.Fetch(ctx, object, hints...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer rc.Close()
|
||||||
|
|
||||||
|
// it would be better to read the content back from the content store in this case.
|
||||||
|
p, err := ioutil.ReadAll(rc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := content.WriteBlob(ctx, ingester, ref, bytes.NewReader(p), 0, ""); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(stevvooe): This assumption that we get a manifest is unfortunate.
|
||||||
|
// Need to provide way to resolve what the type of the target is.
|
||||||
|
var manifest ocispec.Manifest
|
||||||
|
if err := json.Unmarshal(p, &manifest); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var descs []ocispec.Descriptor
|
||||||
|
|
||||||
|
descs = append(descs, manifest.Config)
|
||||||
|
for _, desc := range manifest.Layers {
|
||||||
|
descs = append(descs, desc)
|
||||||
|
}
|
||||||
|
|
||||||
|
return dispatch(ctx, ingester, fetcher, descs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, desc ocispec.Descriptor) error {
|
||||||
|
var (
|
||||||
|
hints []string
|
||||||
|
object = desc.Digest.String()
|
||||||
|
)
|
||||||
|
if desc.MediaType != "" {
|
||||||
|
hints = append(hints, "mediatype:"+desc.MediaType)
|
||||||
|
}
|
||||||
|
|
||||||
|
ref := "fetch-" + object
|
||||||
|
addJob(ctx, ref)
|
||||||
|
log.G(ctx).Debug("fetch")
|
||||||
|
rc, err := fetcher.Fetch(ctx, object, hints...)
|
||||||
|
if err != nil {
|
||||||
|
log.G(ctx).WithError(err).Error("fetch error")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer rc.Close()
|
||||||
|
|
||||||
|
// TODO(stevvooe): Need better remote key selection here. Should be a
|
||||||
|
// product of the fetcher. We may need more narrow infomation on fetcher or
|
||||||
|
// just pull from env/context.
|
||||||
|
return content.WriteBlob(ctx, ingester, ref, rc, desc.Size, desc.Digest)
|
||||||
|
}
|
||||||
|
|
||||||
|
// dispatch blocks until all content in `descs` is retrieved.
|
||||||
|
func dispatch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, descs ...ocispec.Descriptor) error {
|
||||||
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
|
for _, desc := range descs {
|
||||||
|
if err := func(desc ocispec.Descriptor) error {
|
||||||
|
ctx := log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
|
||||||
|
"digest": desc.Digest,
|
||||||
|
"mediatype": desc.MediaType,
|
||||||
|
"size": desc.Size,
|
||||||
|
}))
|
||||||
|
switch desc.MediaType {
|
||||||
|
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
|
||||||
|
eg.Go(func() error {
|
||||||
|
return fetchManifest(ctx, ingester, fetcher, desc.Digest.String(), "mediatype:"+desc.MediaType)
|
||||||
|
})
|
||||||
|
case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
|
||||||
|
return fmt.Errorf("%v not yet supported", desc.MediaType)
|
||||||
|
default:
|
||||||
|
eg.Go(func() error {
|
||||||
|
return fetch(ctx, ingester, fetcher, desc)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}(desc); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return eg.Wait()
|
||||||
|
}
|
1
cmd/dist/main.go
vendored
1
cmd/dist/main.go
vendored
|
@ -61,6 +61,7 @@ distribution tool
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
app.Commands = []cli.Command{
|
app.Commands = []cli.Command{
|
||||||
|
fetchCommand,
|
||||||
fetchObjectCommand,
|
fetchObjectCommand,
|
||||||
ingestCommand,
|
ingestCommand,
|
||||||
activeCommand,
|
activeCommand,
|
||||||
|
|
65
progress/bar.go
Normal file
65
progress/bar.go
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
package progress
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO(stevvooe): We may want to support more interesting parameterization of
|
||||||
|
// the bar. For now, it is very simple.
|
||||||
|
|
||||||
|
// Bar provides a very simple progress bar implementation.
|
||||||
|
//
|
||||||
|
// Use with fmt.Printf and "r" to format the progress bar. A "-" flag makes it
|
||||||
|
// progress from right to left.
|
||||||
|
type Bar float64
|
||||||
|
|
||||||
|
var _ fmt.Formatter = Bar(1.0)
|
||||||
|
|
||||||
|
func (h Bar) Format(state fmt.State, r rune) {
|
||||||
|
switch r {
|
||||||
|
case 'r':
|
||||||
|
default:
|
||||||
|
panic(fmt.Sprintf("%v: unexpected format character", float64(h)))
|
||||||
|
}
|
||||||
|
|
||||||
|
if h > 1.0 {
|
||||||
|
h = 1.0
|
||||||
|
}
|
||||||
|
|
||||||
|
if h < 0.0 {
|
||||||
|
h = 0.0
|
||||||
|
}
|
||||||
|
|
||||||
|
if state.Flag('-') {
|
||||||
|
h = 1.0 - h
|
||||||
|
}
|
||||||
|
|
||||||
|
width, ok := state.Width()
|
||||||
|
if !ok {
|
||||||
|
// default width of 40
|
||||||
|
width = 40
|
||||||
|
}
|
||||||
|
|
||||||
|
var pad int
|
||||||
|
|
||||||
|
extra := len([]byte(green)) + len([]byte(reset))
|
||||||
|
|
||||||
|
p := make([]byte, width+extra)
|
||||||
|
p[0], p[len(p)-1] = '|', '|'
|
||||||
|
pad += 2
|
||||||
|
|
||||||
|
positive := int(Bar(width-pad) * h)
|
||||||
|
negative := width - pad - positive
|
||||||
|
|
||||||
|
n := 1
|
||||||
|
n += copy(p[n:], []byte(green))
|
||||||
|
n += copy(p[n:], bytes.Repeat([]byte("+"), positive))
|
||||||
|
n += copy(p[n:], []byte(reset))
|
||||||
|
|
||||||
|
if negative > 0 {
|
||||||
|
n += copy(p[n:len(p)-1], bytes.Repeat([]byte("-"), negative))
|
||||||
|
}
|
||||||
|
|
||||||
|
state.Write(p)
|
||||||
|
}
|
2
progress/doc.go
Normal file
2
progress/doc.go
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
// Package progress assists in displaying human readable progress information.
|
||||||
|
package progress
|
8
progress/escape.go
Normal file
8
progress/escape.go
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
package progress
|
||||||
|
|
||||||
|
const (
|
||||||
|
escape = "\x1b"
|
||||||
|
reset = escape + "[0m"
|
||||||
|
red = escape + "[31m"
|
||||||
|
green = escape + "[32m"
|
||||||
|
)
|
25
progress/humaans.go
Normal file
25
progress/humaans.go
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
package progress
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
units "github.com/docker/go-units"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Bytes converts a regular int64 to human readable type.
|
||||||
|
type Bytes int64
|
||||||
|
|
||||||
|
func (b Bytes) String() string {
|
||||||
|
return units.CustomSize("%02.1f %s", float64(b), 1024.0, []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"})
|
||||||
|
}
|
||||||
|
|
||||||
|
type BytesPerSecond int64
|
||||||
|
|
||||||
|
func NewBytesPerSecond(n int64, duration time.Duration) BytesPerSecond {
|
||||||
|
return BytesPerSecond(float64(n) / duration.Seconds())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bps BytesPerSecond) String() string {
|
||||||
|
return fmt.Sprintf("%v/s", Bytes(bps))
|
||||||
|
}
|
60
progress/writer.go
Normal file
60
progress/writer.go
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
package progress
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Writer buffers writes until flush, at which time the last screen is cleared
|
||||||
|
// and the current buffer contents are written. This is useful for
|
||||||
|
// implementing progress displays, such as those implemented in docker and
|
||||||
|
// git.
|
||||||
|
type Writer struct {
|
||||||
|
buf bytes.Buffer
|
||||||
|
w io.Writer
|
||||||
|
lines int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWriter(w io.Writer) *Writer {
|
||||||
|
return &Writer{
|
||||||
|
w: w,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Writer) Write(p []byte) (n int, err error) {
|
||||||
|
return w.buf.Write(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush should be called when refreshing the current display.
|
||||||
|
func (w *Writer) Flush() error {
|
||||||
|
if w.buf.Len() == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.clear(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
w.lines = bytes.Count(w.buf.Bytes(), []byte("\n"))
|
||||||
|
|
||||||
|
if _, err := w.w.Write(w.buf.Bytes()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
w.buf.Reset()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(stevvooe): The following are system specific. Break these out if we
|
||||||
|
// decide to build this package further.
|
||||||
|
|
||||||
|
func (w *Writer) clear() error {
|
||||||
|
for i := 0; i < w.lines; i++ {
|
||||||
|
if _, err := fmt.Fprintf(w.w, "\x1b[0A\x1b[2K\r"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in a new issue