cmd/dist, remotes: simplify resolution flow

After receiving feedback during containerd summit walk through of the
pull POC, we found that the resolution flow for names was out of place.
We could see this present in awkward places where we were trying to
re-resolve whether something was a digest or a tag and extra retries to
various endpoints.

By centering this problem around, "what do we write in the metadata
store?", the following interface comes about:

```
Resolve(ctx context.Context, ref string) (name string, desc ocispec.Descriptor, fetcher Fetcher, err error)
```

The above takes an "opaque" reference (we'll get to this later) and
returns the canonical name for the object, a content description of the
object and a `Fetcher` that can be used to retrieve the object and its
child resources. We can write `name` into the metadata store, pointing
at the descriptor. Descisions about discovery, trust, provenance,
distribution are completely abstracted away from the pulling code.

A first response to such a monstrosity is "that is a lot of return
arguments". When we look at the actual, we can see that in practice, the
usage pattern works well, albeit we don't quite demonstrate the utility
of `name`, which will be more apparent later. Designs that allowed
separate resolution of the `Fetcher` and the return of a collected
object were considered. Let's give this a chance before we go
refactoring this further.

With this change, we introduce a reference package with helps for
remotes to decompose "docker-esque" references into consituent
components, without arbitrarily enforcing those opinions on the backend.
Utlimately, the name and the reference used to qualify that name are
completely opaque to containerd. Obviously, implementors will need to
show some candor in following some conventions, but the possibilities
are fairly wide. Structurally, we still maintain the concept of the
locator and object but the interpretation is up to the resolver.

For the most part, the `dist` tool operates exactly the same, except
objects can be fetched with a reference:

```
dist fetch docker.io/library/redis:latest
```

The above should work well with a running containerd instance. I
recommend giving this a try with `fetch-object`, as well. With
`fetch-object`, it is easy for one to better understand the intricacies
of the OCI/Docker image formats.

Ultimately, this serves the main purpose of the elusive "metadata
store".

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2017-03-07 16:51:08 -08:00
parent 27a99400e8
commit 831f68fd71
No known key found for this signature in database
GPG key ID: 67B3DED84EDC823F
6 changed files with 593 additions and 201 deletions

76
cmd/dist/fetch.go vendored
View file

@ -45,9 +45,8 @@ Most of this is experimental and there are few leaps to make this work.`,
Flags: []cli.Flag{},
Action: func(clicontext *cli.Context) error {
var (
ctx = background
locator = clicontext.Args().First()
object = clicontext.Args().Get(1)
ctx = background
ref = clicontext.Args().First()
)
conn, err := connectGRPC(clicontext)
@ -59,11 +58,7 @@ Most of this is experimental and there are few leaps to make this work.`,
if err != nil {
return err
}
fetcher, err := resolver.Resolve(ctx, locator)
if err != nil {
return err
}
ctx = withJobsContext(ctx)
ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn))
cs, err := resolveContentStore(clicontext)
@ -73,10 +68,17 @@ Most of this is experimental and there are few leaps to make this work.`,
eg, ctx := errgroup.WithContext(ctx)
ctx = withJobsContext(ctx)
resolved := make(chan struct{})
eg.Go(func() error {
return fetchManifest(ctx, ingester, fetcher, object)
addJob(ctx, ref)
name, desc, fetcher, err := resolver.Resolve(ctx, ref)
if err != nil {
return err
}
log.G(ctx).WithField("image", name).Debug("fetching")
close(resolved)
return dispatch(ctx, ingester, fetcher, desc)
})
errs := make(chan error)
@ -96,8 +98,7 @@ Most of this is experimental and there are few leaps to make this work.`,
case <-ticker.C:
fw.Flush()
tw := tabwriter.NewWriter(fw, 1, 8, 1, '\t', 0)
// fmt.Fprintln(tw, "REF\tSIZE\tAGE")
tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
var total int64
js := getJobs(ctx)
@ -137,10 +138,20 @@ Most of this is experimental and there are few leaps to make this work.`,
if _, ok := activeSeen[j]; ok {
continue
}
status := "done"
if j == ref {
select {
case <-resolved:
status = "resolved"
default:
status = "resolving"
}
}
statuses[j] = statusInfo{
Ref: j,
Status: "done", // for now!
Status: status, // for now!
}
}
@ -151,21 +162,27 @@ Most of this is experimental and there are few leaps to make this work.`,
switch status.Status {
case "downloading":
bar := progress.Bar(float64(status.Offset) / float64(status.Total))
fmt.Fprintf(tw, "%s:\t%s\t%40r\t%8.8s/%s\n",
fmt.Fprintf(tw, "%s:\t%s\t%40r\t%8.8s/%s\t\n",
status.Ref,
status.Status,
bar,
progress.Bytes(status.Offset), progress.Bytes(status.Total))
case "done":
case "resolving":
bar := progress.Bar(0.0)
fmt.Fprintf(tw, "%s:\t%s\t%40r\t\n",
status.Ref,
status.Status,
bar)
default:
bar := progress.Bar(1.0)
fmt.Fprintf(tw, "%s:\t%s\t%40r\n",
fmt.Fprintf(tw, "%s:\t%s\t%40r\t\n",
status.Ref,
status.Status,
bar)
}
}
fmt.Fprintf(tw, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\n",
fmt.Fprintf(tw, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n",
time.Since(start).Seconds(),
// TODO(stevvooe): These calculations are actually way off.
// Need to account for previously downloaded data. These
@ -250,14 +267,11 @@ func (j *jobs) jobs() []string {
return jobs
}
func fetchManifest(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, object string, hints ...string) error {
const manifestMediaType = "application/vnd.docker.distribution.manifest.v2+json"
hints = append(hints, "mediatype:"+manifestMediaType)
ref := "manifest-" + object
func fetchManifest(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, desc ocispec.Descriptor) error {
ref := "manifest-" + desc.Digest.String()
addJob(ctx, ref)
rc, err := fetcher.Fetch(ctx, object, hints...)
rc, err := fetcher.Fetch(ctx, desc)
if err != nil {
return err
}
@ -291,18 +305,10 @@ func fetchManifest(ctx context.Context, ingester content.Ingester, fetcher remot
}
func fetch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, desc ocispec.Descriptor) error {
var (
hints []string
object = desc.Digest.String()
)
if desc.MediaType != "" {
hints = append(hints, "mediatype:"+desc.MediaType)
}
ref := "fetch-" + object
ref := "fetch-" + desc.Digest.String()
addJob(ctx, ref)
log.G(ctx).Debug("fetch")
rc, err := fetcher.Fetch(ctx, object, hints...)
rc, err := fetcher.Fetch(ctx, desc)
if err != nil {
log.G(ctx).WithError(err).Error("fetch error")
return err
@ -328,7 +334,7 @@ func dispatch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fe
switch desc.MediaType {
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
eg.Go(func() error {
return fetchManifest(ctx, ingester, fetcher, desc.Digest.String(), "mediatype:"+desc.MediaType)
return fetchManifest(ctx, ingester, fetcher, desc)
})
case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
return fmt.Errorf("%v not yet supported", desc.MediaType)

View file

@ -10,10 +10,12 @@ import (
"net/url"
"os"
"path"
"strconv"
"strings"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/log"
"github.com/docker/containerd/reference"
"github.com/docker/containerd/remotes"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -47,8 +49,7 @@ var fetchObjectCommand = cli.Command{
var (
ctx = background
timeout = context.Duration("timeout")
locator = context.Args().First()
args = context.Args().Tail()
ref = context.Args().First()
)
if timeout > 0 {
@ -57,35 +58,21 @@ var fetchObjectCommand = cli.Command{
defer cancel()
}
if locator == "" {
return errors.New("containerd: remote required")
}
if len(args) < 1 {
return errors.New("containerd: object required")
}
object := args[0]
hints := args[1:]
resolver, err := getResolver(ctx)
if err != nil {
return err
}
remote, err := resolver.Resolve(ctx, locator)
ctx = log.WithLogger(ctx, log.G(ctx).WithField("ref", ref))
log.G(ctx).Infof("resolving")
_, desc, fetcher, err := resolver.Resolve(ctx, ref)
if err != nil {
return err
}
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(
logrus.Fields{
"remote": locator,
"object": object,
}))
log.G(ctx).Infof("fetching")
rc, err := remote.Fetch(ctx, object, hints...)
rc, err := fetcher.Fetch(ctx, desc)
if err != nil {
return err
}
@ -99,91 +86,217 @@ var fetchObjectCommand = cli.Command{
},
}
// getResolver prepares the resolver from the environment and options.
func getResolver(ctx contextpkg.Context) (remotes.Resolver, error) {
return newDockerResolver(), nil
}
// NOTE(stevvooe): Most of the code below this point is prototype code to
// demonstrate a very simplified docker.io fetcher. We have a lot of hard coded
// values but we leave many of the details down to the fetcher, creating a lot
// of room for ways to fetch content.
// getResolver prepares the resolver from the environment and options.
func getResolver(ctx contextpkg.Context) (remotes.Resolver, error) {
return remotes.ResolverFunc(func(ctx contextpkg.Context, locator string) (remotes.Fetcher, error) {
if !strings.HasPrefix(locator, "docker.io") && !strings.HasPrefix(locator, "localhost:5000") {
return nil, errors.Errorf("unsupported locator: %q", locator)
}
type dockerFetcher struct {
base url.URL
token string
}
var (
base = url.URL{
Scheme: "https",
Host: "registry-1.docker.io",
}
prefix = strings.TrimPrefix(locator, "docker.io/")
)
func (r *dockerFetcher) url(ps ...string) string {
url := r.base
url.Path = path.Join(url.Path, path.Join(ps...))
return url.String()
}
if strings.HasPrefix(locator, "localhost:5000") {
base.Scheme = "http"
base.Host = "localhost:5000"
prefix = strings.TrimPrefix(locator, "localhost:5000/")
}
func (r *dockerFetcher) Fetch(ctx contextpkg.Context, desc ocispec.Descriptor) (io.ReadCloser, error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(
logrus.Fields{
"base": r.base.String(),
"digest": desc.Digest,
},
))
token, err := getToken(ctx, "repository:"+prefix+":pull")
paths, err := getV2URLPaths(desc)
if err != nil {
return nil, err
}
for _, path := range paths {
u := r.url(path)
req, err := http.NewRequest(http.MethodGet, u, nil)
if err != nil {
return nil, err
}
return remotes.FetcherFunc(func(ctx contextpkg.Context, object string, hints ...string) (io.ReadCloser, error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(
logrus.Fields{
"prefix": prefix, // or repo?
"base": base.String(),
"hints": hints,
},
))
req.Header.Set("Accept", strings.Join([]string{desc.MediaType, `*`}, ", "))
resp, err := r.doRequest(ctx, req)
if err != nil {
return nil, err
}
if _, err := digest.Parse(object); err != nil {
// in this case, we are seeking a manifest by a tag. Let's add some hints.
hints = append(hints, "mediatype:"+MediaTypeDockerSchema2Manifest)
hints = append(hints, "mediatype:"+MediaTypeDockerSchema2ManifestList)
hints = append(hints, "mediatype:"+ocispec.MediaTypeImageManifest)
hints = append(hints, "mediatype:"+ocispec.MediaTypeImageIndex)
if resp.StatusCode > 299 {
if resp.StatusCode == http.StatusNotFound {
continue // try one of the other urls.
}
resp.Body.Close()
return nil, errors.Errorf("unexpected status code %v: %v", u, resp.Status)
}
paths, err := getV2URLPaths(prefix, object, hints...)
if err != nil {
return nil, err
return resp.Body, nil
}
return nil, errors.New("not found")
}
func (r *dockerFetcher) doRequest(ctx contextpkg.Context, req *http.Request) (*http.Response, error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", req.URL.String()))
log.G(ctx).WithField("request.headers", req.Header).Debug("fetch content")
if r.token != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", r.token))
}
resp, err := ctxhttp.Do(ctx, http.DefaultClient, req)
if err != nil {
return nil, err
}
log.G(ctx).WithFields(logrus.Fields{
"status": resp.Status,
"response.headers": resp.Header,
}).Debug("fetch response received")
return resp, err
}
type dockerResolver struct{}
var _ remotes.Resolver = &dockerResolver{}
func newDockerResolver() remotes.Resolver {
return &dockerResolver{}
}
func (r *dockerResolver) Resolve(ctx contextpkg.Context, ref string) (string, ocispec.Descriptor, remotes.Fetcher, error) {
refspec, err := reference.Parse(ref)
if err != nil {
return "", ocispec.Descriptor{}, nil, err
}
var (
base url.URL
token string
)
switch refspec.Hostname() {
case "docker.io":
base.Scheme = "https"
base.Host = "registry-1.docker.io"
prefix := strings.TrimPrefix(refspec.Locator, "docker.io/")
base.Path = path.Join("/v2", prefix)
token, err = getToken(ctx, "repository:"+prefix+":pull")
if err != nil {
return "", ocispec.Descriptor{}, nil, err
}
case "localhost:5000":
base.Scheme = "http"
base.Host = "localhost:5000"
base.Path = path.Join("/v2", strings.TrimPrefix(refspec.Locator, "localhost:5000/"))
default:
return "", ocispec.Descriptor{}, nil, errors.Errorf("unsupported locator: %q", refspec.Locator)
}
fetcher := &dockerFetcher{
base: base,
token: token,
}
var (
urls []string
dgst = refspec.Digest()
)
if dgst != "" {
if err := dgst.Validate(); err != nil {
// need to fail here, since we can't actually resolve the invalid
// digest.
return "", ocispec.Descriptor{}, nil, err
}
// turns out, we have a valid digest, make a url.
urls = append(urls, fetcher.url("manifests", dgst.String()))
} else {
urls = append(urls, fetcher.url("manifests", refspec.Object))
}
// fallback to blobs on not found.
urls = append(urls, fetcher.url("blobs", dgst.String()))
for _, u := range urls {
req, err := http.NewRequest(http.MethodHead, u, nil)
if err != nil {
return "", ocispec.Descriptor{}, nil, err
}
// set headers for all the types we support for resolution.
req.Header.Set("Accept", strings.Join([]string{
MediaTypeDockerSchema2Manifest,
MediaTypeDockerSchema2ManifestList,
ocispec.MediaTypeImageManifest,
ocispec.MediaTypeImageIndex, "*"}, ", "))
log.G(ctx).Debug("resolving")
resp, err := fetcher.doRequest(ctx, req)
if err != nil {
return "", ocispec.Descriptor{}, nil, err
}
resp.Body.Close() // don't care about body contents.
if resp.StatusCode > 299 {
if resp.StatusCode == http.StatusNotFound {
continue
}
return "", ocispec.Descriptor{}, nil, errors.Errorf("unexpected status code %v: %v", u, resp.Status)
}
for _, path := range paths {
url := base
url.Path = path
log.G(ctx).WithField("url", url).Debug("fetch content")
// this is the only point at which we trust the registry. we use the
// content headers to assemble a descriptor for the name. when this becomes
// more robust, we mostly get this information from a secure trust store.
dgstHeader := digest.Digest(resp.Header.Get("Docker-Content-Digest"))
req, err := http.NewRequest(http.MethodGet, url.String(), nil)
if err != nil {
return nil, err
if dgstHeader != "" {
if err := dgstHeader.Validate(); err != nil {
if err == nil {
return "", ocispec.Descriptor{}, nil, errors.Errorf("%q in header not a valid digest", dgstHeader)
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
req.Header.Set("Accept", strings.Join(remotes.HintValues("mediatype", hints...), ", "))
resp, err := ctxhttp.Do(ctx, http.DefaultClient, req)
if err != nil {
return nil, err
}
if resp.StatusCode > 299 {
if resp.StatusCode == http.StatusNotFound {
continue // try one of the other urls.
}
resp.Body.Close()
return nil, errors.Errorf("unexpected status code %v: %v", url, resp.Status)
}
return resp.Body, nil
return "", ocispec.Descriptor{}, nil, errors.Wrapf(err, "%q in header not a valid digest", dgstHeader)
}
dgst = dgstHeader
}
return nil, errors.New("not found")
}), nil
}), nil
if dgst == "" {
return "", ocispec.Descriptor{}, nil, errors.Wrapf(err, "could not resolve digest for %v", ref)
}
var (
size int64
sizeHeader = resp.Header.Get("Content-Length")
)
size, err = strconv.ParseInt(sizeHeader, 10, 64)
if err != nil || size < 0 {
return "", ocispec.Descriptor{}, nil, errors.Wrapf(err, "%q in header not a valid size", sizeHeader)
}
desc := ocispec.Descriptor{
Digest: dgst,
MediaType: resp.Header.Get("Content-Type"), // need to strip disposition?
Size: size,
}
log.G(ctx).WithField("desc.digest", desc.Digest).Debug("resolved")
return ref, desc, fetcher, nil
}
return "", ocispec.Descriptor{}, nil, errors.Errorf("%v not found", ref)
}
func getToken(ctx contextpkg.Context, scopes ...string) (string, error) {
@ -232,39 +345,17 @@ func getToken(ctx contextpkg.Context, scopes ...string) (string, error) {
// getV2URLPaths generates the candidate urls paths for the object based on the
// set of hints and the provided object id. URLs are returned in the order of
// most to least likely succeed.
func getV2URLPaths(prefix, object string, hints ...string) ([]string, error) {
func getV2URLPaths(desc ocispec.Descriptor) ([]string, error) {
var urls []string
// TODO(stevvooe): We can probably define a higher-level "type" hint to
// avoid having to do extra round trips to resolve content, as well as
// avoid the tedium of providing media types.
if remotes.HintExists("mediatype", MediaTypeDockerSchema2Manifest, hints...) ||
remotes.HintExists("mediatype", MediaTypeDockerSchema2ManifestList, hints...) ||
remotes.HintExists("mediatype", ocispec.MediaTypeImageManifest, hints...) ||
remotes.HintExists("mediatype", ocispec.MediaTypeImageIndex, hints...) {
// fast path out if we know we are getting a manifest. Arguably, we
// should fallback to blobs, just in case.
urls = append(urls, path.Join("/v2", prefix, "manifests", object))
switch desc.MediaType {
case MediaTypeDockerSchema2Manifest, MediaTypeDockerSchema2ManifestList,
ocispec.MediaTypeImageManifest, ocispec.MediaTypeImageIndex:
urls = append(urls, path.Join("manifests", desc.Digest.String()))
}
// we have a digest, use blob or manifest path, depending on hints, may
// need to try both.
urls = append(urls, path.Join("/v2", prefix, "blobs", object))
// always fallback to attempting to get the object out of the blobs store.
urls = append(urls, path.Join("blobs", desc.Digest.String()))
// probably a take, so we go through the manifests endpoint
urls = append(urls, path.Join("/v2", prefix, "manifests", object))
var (
noduplicates []string
seen = map[string]struct{}{}
)
for _, u := range urls {
if _, ok := seen[u]; !ok {
seen[u] = struct{}{}
noduplicates = append(noduplicates, u)
}
}
return noduplicates, nil
return urls, nil
}