From 19eecaab126205b3e2d0d508fdd7ecd3dadeb5e6 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Thu, 19 Jan 2017 19:03:44 -0800 Subject: [PATCH] cmd/dist: POC implementation of dist fetch With this changeset we introduce several new things. The first is the top-level dist command. This is a toolkit that implements various distribution primitives, such as fetching, unpacking and ingesting. The first component to this is a simple `fetch` command. It is a low-level command that takes a "remote", identified by a `locator`, and an object identifier. Keyed by the locator, this tool can identify a remote implementation to fetch the content and write it back to standard out. By allowing this to be the unit of pluggability in fetching content, we can have quite a bit of flexibility in how we retrieve images. The current `fetch` implementation provides anonymous access to docker hub images, through the namespace `docker.io`. As an example, one can fetch the manifest for `redis` with the following command: ``` $ ./dist fetch docker.io/library/redis latest mediatype:application/vnd.docker.distribution.manifest.v2+json ``` Note that we have provided a mediatype "hint", nudging the fetch implementation to grab the correct endpoint. We can hash the output of that to fetch the same content by digest: ``` $ ./dist fetch docker.io/library/redis sha256:$(./dist fetch docker.io/library/redis latest mediatype:application/vnd.docker.distribution.manifest.v2+json | shasum -a256) ``` Note that the hint is now elided, since we have affixed the content to a particular hash. If you are not yet entertained, let's bring `jq` and `xargs` into the mix for maximum fun. The following incantation fetches the same manifest and downloads all layers into the convenience of `/dev/null`: ``` $ ./dist fetch docker.io/library/redis sha256:a027a470aa2b9b41cc2539847a97b8a14794ebd0a4c7c5d64e390df6bde56c73 | jq -r '.layers[] | .digest' | xargs -n1 -P10 ./dist fetch docker.io/library/redis > /dev/null ``` This is just the beginning. We should be able to centralize configuration around fetch to implement a number of distribution methodologies that have been challenging or impossible up to this point. The `locator`, mentioned earlier, is a schemaless URL that provides a host and path that can be used to resolve the remote. By dispatching on this common identifier, we should be able to support almost any protocol and discovery mechanism imaginable. When this is more solidified, we can roll these up into higher-level operations that can be orchestrated through the `dist` tool or via GRPC. What a time to be alive! Signed-off-by: Stephen J Day --- cmd/dist/fetch.go | 252 ++++++++++++++++++ cmd/dist/main.go | 44 +++ remotes/hints.go | 31 +++ remotes/remote.go | 39 +++ remotes/resolver.go | 19 ++ .../x/net/context/ctxhttp/ctxhttp.go | 74 +++++ .../x/net/context/ctxhttp/ctxhttp_pre17.go | 147 ++++++++++ 7 files changed, 606 insertions(+) create mode 100644 cmd/dist/fetch.go create mode 100644 cmd/dist/main.go create mode 100644 remotes/hints.go create mode 100644 remotes/remote.go create mode 100644 remotes/resolver.go create mode 100644 vendor/golang.org/x/net/context/ctxhttp/ctxhttp.go create mode 100644 vendor/golang.org/x/net/context/ctxhttp/ctxhttp_pre17.go diff --git a/cmd/dist/fetch.go b/cmd/dist/fetch.go new file mode 100644 index 0000000..bfab3cd --- /dev/null +++ b/cmd/dist/fetch.go @@ -0,0 +1,252 @@ +package main + +import ( + contextpkg "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "os" + "path" + "strings" + + "github.com/Sirupsen/logrus" + "github.com/docker/containerd/log" + "github.com/docker/containerd/remotes" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" + "github.com/urfave/cli" + "golang.org/x/net/context/ctxhttp" +) + +// TODO(stevvooe): Create "multi-fetch" mode that just takes a remote +// then receives object/hint lines on stdin, returning content as +// needed. + +var fetchCommand = cli.Command{ + Name: "fetch", + Usage: "retrieve objects from a remote", + ArgsUsage: "[flags] [, ...]", + Description: `Fetch objects by identifier from a remote.`, + Flags: []cli.Flag{ + cli.DurationFlag{ + Name: "timeout", + Usage: "total timeout for fetch", + EnvVar: "CONTAINERD_FETCH_TIMEOUT", + }, + }, + Action: func(context *cli.Context) error { + var ( + ctx = contextpkg.Background() + timeout = context.Duration("timeout") + locator = context.Args().First() + args = context.Args().Tail() + ) + + if timeout > 0 { + var cancel func() + ctx, cancel = contextpkg.WithTimeout(ctx, timeout) + defer cancel() + } + + if locator == "" { + return fmt.Errorf("containerd: remote required") + } + + if len(args) < 1 { + return fmt.Errorf("containerd: object required") + } + + object := args[0] + hints := args[1:] + + resolver, err := getResolver(ctx) + if err != nil { + return err + } + + remote, err := resolver.Resolve(ctx, locator) + if err != nil { + return err + } + + ctx = log.WithLogger(ctx, log.G(ctx).WithFields( + logrus.Fields{ + "remote": locator, + "object": object, + })) + + log.G(ctx).Infof("fetching") + rc, err := remote.Fetch(ctx, object, hints...) + if err != nil { + return err + } + defer rc.Close() + + if _, err := io.Copy(os.Stdout, rc); err != nil { + return err + } + + return nil + }, +} + +// NOTE(stevvooe): Most of the code below this point is prototype code to +// demonstrate a very simplified docker.io fetcher. We have a lot of hard coded +// values but we leave many of the details down to the fetcher, creating a lot +// of room for ways to fetch content. + +// getResolver prepares the resolver from the environment and options. +func getResolver(ctx contextpkg.Context) (remotes.Resolver, error) { + return remotes.ResolverFunc(func(ctx contextpkg.Context, locator string) (remotes.Remote, error) { + if !strings.HasPrefix(locator, "docker.io") { + return nil, errors.Errorf("unsupported locator: %q", locator) + } + + var ( + base = url.URL{ + Scheme: "https", + Host: "registry-1.docker.io", + } + prefix = strings.TrimPrefix(locator, "docker.io/") + ) + + token, err := getToken(ctx, "repository:"+prefix+":pull") + if err != nil { + return nil, err + } + + return remotes.RemoteFunc(func(ctx contextpkg.Context, object string, hints ...string) (io.ReadCloser, error) { + ctx = log.WithLogger(ctx, log.G(ctx).WithFields( + logrus.Fields{ + "prefix": prefix, // or repo? + "base": base.String(), + "hints": hints, + }, + )) + + paths, err := getV2URLPaths(prefix, object, hints...) + if err != nil { + return nil, err + } + + for _, path := range paths { + base.Path = path + url := base.String() + log.G(ctx).WithField("url", url).Debug("fetch content") + + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, err + } + + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) + for _, mediatype := range remotes.HintValues("mediatype", hints...) { + req.Header.Set("Accept", mediatype) + } + + resp, err := ctxhttp.Do(ctx, http.DefaultClient, req) + if err != nil { + return nil, err + } + + if resp.StatusCode > 299 { + if resp.StatusCode == http.StatusNotFound { + continue // try one of the other urls. + } + resp.Body.Close() + return nil, errors.Errorf("unexpected status code %v: %v", url, resp.Status) + } + + return resp.Body, nil + } + + return nil, errors.New("not found") + }), nil + }), nil +} + +func getToken(ctx contextpkg.Context, scopes ...string) (string, error) { + var ( + u = url.URL{ + Scheme: "https", + Host: "auth.docker.io", + Path: "/token", + } + + q = url.Values{ + "scope": scopes, + "service": []string{"registry.docker.io"}, // usually comes from auth challenge + } + ) + + u.RawQuery = q.Encode() + + log.G(ctx).WithField("token.url", u.String()).Debug("requesting token") + resp, err := ctxhttp.Get(ctx, http.DefaultClient, u.String()) + if err != nil { + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode > 299 { + return "", errors.Errorf("unexpected status code: %v %v", resp.StatusCode, resp.Status) + } + + p, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", err + } + + var tokenResponse struct { + Token string `json:"token"` + } + + if err := json.Unmarshal(p, &tokenResponse); err != nil { + return "", err + } + + return tokenResponse.Token, nil +} + +// getV2URLPaths generates the canidate urls paths for the object based on the +// set of hints and the provided object id. URLs are returned in the order of +// most to least likely succeed. +func getV2URLPaths(prefix, object string, hints ...string) ([]string, error) { + var urls []string + + // TODO(stevvooe): We can probably define a higher-level "type" hint to + // avoid having to do extra round trips to resolve content, as well as + // avoid the tedium of providing media types. + + if remotes.HintExists("mediatype", "application/vnd.docker.distribution.manifest.v2+json", hints...) { // TODO(stevvooe): make this handle oci types, as well. + // fast path out if we know we are getting a manifest. Arguably, we + // should fallback to blobs, just in case. + urls = append(urls, path.Join("/v2", prefix, "manifests", object)) + } + + _, err := digest.Parse(object) + if err == nil { + // we have a digest, use blob or manifest path, depending on hints, may + // need to try both. + urls = append(urls, path.Join("/v2", prefix, "blobs", object)) + } + + // probably a take, so we go through the manifests endpoint + urls = append(urls, path.Join("/v2", prefix, "manifests", object)) + + var ( + noduplicates []string + seen = map[string]struct{}{} + ) + for _, u := range urls { + if _, ok := seen[u]; !ok { + seen[u] = struct{}{} + noduplicates = append(noduplicates, u) + } + } + + return noduplicates, nil +} diff --git a/cmd/dist/main.go b/cmd/dist/main.go new file mode 100644 index 0000000..4a36166 --- /dev/null +++ b/cmd/dist/main.go @@ -0,0 +1,44 @@ +package main + +import ( + "fmt" + "os" + + "github.com/Sirupsen/logrus" + "github.com/docker/containerd" + "github.com/urfave/cli" +) + +func main() { + app := cli.NewApp() + app.Name = "dist" + app.Version = containerd.Version + app.Usage = ` + ___ __ + ____/ (_)____/ /_ + / __ / / ___/ __/ + / /_/ / (__ ) /_ + \__,_/_/____/\__/ + +distribution tool +` + app.Flags = []cli.Flag{ + cli.BoolFlag{ + Name: "debug", + Usage: "enable debug output in logs", + }, + } + app.Commands = []cli.Command{ + fetchCommand, + } + app.Before = func(context *cli.Context) error { + if context.GlobalBool("debug") { + logrus.SetLevel(logrus.DebugLevel) + } + return nil + } + if err := app.Run(os.Args); err != nil { + fmt.Fprintf(os.Stderr, "dist: %s\n", err) + os.Exit(1) + } +} diff --git a/remotes/hints.go b/remotes/hints.go new file mode 100644 index 0000000..1694477 --- /dev/null +++ b/remotes/hints.go @@ -0,0 +1,31 @@ +package remotes + +import "strings" + +// HintExists returns true if a hint of the provided kind and values exists in +// the set of provided hints. +func HintExists(kind, value string, hints ...string) bool { + for _, hint := range hints { + if strings.HasPrefix(hint, kind) && strings.HasSuffix(hint, value) { + return true + } + } + + return false +} + +// HintValues returns a slice of the values of the hints that match kind. +func HintValues(kind string, hints ...string) []string { + var values []string + for _, hint := range hints { + if strings.HasPrefix(hint, kind) { + parts := strings.SplitN(hint, ":", 2) + if len(parts) < 2 { + continue + } + values = append(values, parts[1]) + } + } + + return values +} diff --git a/remotes/remote.go b/remotes/remote.go new file mode 100644 index 0000000..616d36b --- /dev/null +++ b/remotes/remote.go @@ -0,0 +1,39 @@ +package remotes + +import ( + "context" + "io" +) + +type Remote interface { + // Fetch the resource identified by id. The id is opaque to the remote, but + // may typically be a tag or a digest. + // + // Hints are provided to give instruction on how the resource may be + // fetched. They may provide information about the expected type or size. + // They may be protocol specific or help a protocol to identify the most + // efficient fetch methodology. + // + // Hints are the format of `:` where `` is the type + // of the hint and `` can be pretty much anything. For example, a + // media type hint would be the following: + // + // mediatype:application/vnd.docker.distribution.manifest.v2+json + // + // The following hint names are must be honored across all remote + // implementations: + // + // size: specify the expected size in bytes + // mediatype: specify the expected mediatype + // + // The caller should never expect the hints to be honored and should + // validate that returned content is as expected. They are only provided to + // help the remote retrieve the content. + Fetch(ctx context.Context, id string, hints ...string) (io.ReadCloser, error) +} + +type RemoteFunc func(context.Context, string, ...string) (io.ReadCloser, error) + +func (fn RemoteFunc) Fetch(ctx context.Context, object string, hints ...string) (io.ReadCloser, error) { + return fn(ctx, object, hints...) +} diff --git a/remotes/resolver.go b/remotes/resolver.go new file mode 100644 index 0000000..089bbc6 --- /dev/null +++ b/remotes/resolver.go @@ -0,0 +1,19 @@ +package remotes + +import "context" + +// Resolver provides a remote based on a locator. +type Resolver interface { + // Resolve returns a remote from the locator. + // + // A locator is a scheme-less URI representing the remote. Structurally, it + // has a host and path. The "host" can be used to directly reference a + // specific host or be matched against a specific handler. + Resolve(ctx context.Context, locator string) (Remote, error) +} + +type ResolverFunc func(context.Context, string) (Remote, error) + +func (fn ResolverFunc) Resolve(ctx context.Context, locator string) (Remote, error) { + return fn(ctx, locator) +} diff --git a/vendor/golang.org/x/net/context/ctxhttp/ctxhttp.go b/vendor/golang.org/x/net/context/ctxhttp/ctxhttp.go new file mode 100644 index 0000000..606cf1f --- /dev/null +++ b/vendor/golang.org/x/net/context/ctxhttp/ctxhttp.go @@ -0,0 +1,74 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build go1.7 + +// Package ctxhttp provides helper functions for performing context-aware HTTP requests. +package ctxhttp // import "golang.org/x/net/context/ctxhttp" + +import ( + "io" + "net/http" + "net/url" + "strings" + + "golang.org/x/net/context" +) + +// Do sends an HTTP request with the provided http.Client and returns +// an HTTP response. +// +// If the client is nil, http.DefaultClient is used. +// +// The provided ctx must be non-nil. If it is canceled or times out, +// ctx.Err() will be returned. +func Do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) { + if client == nil { + client = http.DefaultClient + } + resp, err := client.Do(req.WithContext(ctx)) + // If we got an error, and the context has been canceled, + // the context's error is probably more useful. + if err != nil { + select { + case <-ctx.Done(): + err = ctx.Err() + default: + } + } + return resp, err +} + +// Get issues a GET request via the Do function. +func Get(ctx context.Context, client *http.Client, url string) (*http.Response, error) { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + return Do(ctx, client, req) +} + +// Head issues a HEAD request via the Do function. +func Head(ctx context.Context, client *http.Client, url string) (*http.Response, error) { + req, err := http.NewRequest("HEAD", url, nil) + if err != nil { + return nil, err + } + return Do(ctx, client, req) +} + +// Post issues a POST request via the Do function. +func Post(ctx context.Context, client *http.Client, url string, bodyType string, body io.Reader) (*http.Response, error) { + req, err := http.NewRequest("POST", url, body) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", bodyType) + return Do(ctx, client, req) +} + +// PostForm issues a POST request via the Do function. +func PostForm(ctx context.Context, client *http.Client, url string, data url.Values) (*http.Response, error) { + return Post(ctx, client, url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode())) +} diff --git a/vendor/golang.org/x/net/context/ctxhttp/ctxhttp_pre17.go b/vendor/golang.org/x/net/context/ctxhttp/ctxhttp_pre17.go new file mode 100644 index 0000000..926870c --- /dev/null +++ b/vendor/golang.org/x/net/context/ctxhttp/ctxhttp_pre17.go @@ -0,0 +1,147 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !go1.7 + +package ctxhttp // import "golang.org/x/net/context/ctxhttp" + +import ( + "io" + "net/http" + "net/url" + "strings" + + "golang.org/x/net/context" +) + +func nop() {} + +var ( + testHookContextDoneBeforeHeaders = nop + testHookDoReturned = nop + testHookDidBodyClose = nop +) + +// Do sends an HTTP request with the provided http.Client and returns an HTTP response. +// If the client is nil, http.DefaultClient is used. +// If the context is canceled or times out, ctx.Err() will be returned. +func Do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) { + if client == nil { + client = http.DefaultClient + } + + // TODO(djd): Respect any existing value of req.Cancel. + cancel := make(chan struct{}) + req.Cancel = cancel + + type responseAndError struct { + resp *http.Response + err error + } + result := make(chan responseAndError, 1) + + // Make local copies of test hooks closed over by goroutines below. + // Prevents data races in tests. + testHookDoReturned := testHookDoReturned + testHookDidBodyClose := testHookDidBodyClose + + go func() { + resp, err := client.Do(req) + testHookDoReturned() + result <- responseAndError{resp, err} + }() + + var resp *http.Response + + select { + case <-ctx.Done(): + testHookContextDoneBeforeHeaders() + close(cancel) + // Clean up after the goroutine calling client.Do: + go func() { + if r := <-result; r.resp != nil { + testHookDidBodyClose() + r.resp.Body.Close() + } + }() + return nil, ctx.Err() + case r := <-result: + var err error + resp, err = r.resp, r.err + if err != nil { + return resp, err + } + } + + c := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + close(cancel) + case <-c: + // The response's Body is closed. + } + }() + resp.Body = ¬ifyingReader{resp.Body, c} + + return resp, nil +} + +// Get issues a GET request via the Do function. +func Get(ctx context.Context, client *http.Client, url string) (*http.Response, error) { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + return Do(ctx, client, req) +} + +// Head issues a HEAD request via the Do function. +func Head(ctx context.Context, client *http.Client, url string) (*http.Response, error) { + req, err := http.NewRequest("HEAD", url, nil) + if err != nil { + return nil, err + } + return Do(ctx, client, req) +} + +// Post issues a POST request via the Do function. +func Post(ctx context.Context, client *http.Client, url string, bodyType string, body io.Reader) (*http.Response, error) { + req, err := http.NewRequest("POST", url, body) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", bodyType) + return Do(ctx, client, req) +} + +// PostForm issues a POST request via the Do function. +func PostForm(ctx context.Context, client *http.Client, url string, data url.Values) (*http.Response, error) { + return Post(ctx, client, url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode())) +} + +// notifyingReader is an io.ReadCloser that closes the notify channel after +// Close is called or a Read fails on the underlying ReadCloser. +type notifyingReader struct { + io.ReadCloser + notify chan<- struct{} +} + +func (r *notifyingReader) Read(p []byte) (int, error) { + n, err := r.ReadCloser.Read(p) + if err != nil && r.notify != nil { + close(r.notify) + r.notify = nil + } + return n, err +} + +func (r *notifyingReader) Close() error { + err := r.ReadCloser.Close() + if r.notify != nil { + close(r.notify) + r.notify = nil + } + return err +}