cmd/dist: POC implementation of dist fetch

With this changeset we introduce several new things. The first is the
top-level dist command. This is a toolkit that implements various
distribution primitives, such as fetching, unpacking and ingesting.

The first component to this is a simple `fetch` command. It is a
low-level command that takes a "remote", identified by a `locator`, and
an object identifier. Keyed by the locator, this tool can identify a
remote implementation to fetch the content and write it back to standard
out. By allowing this to be the unit of pluggability in fetching
content, we can have quite a bit of flexibility in how we retrieve
images.

The current `fetch` implementation provides anonymous access to docker
hub images, through the namespace `docker.io`. As an example, one can
fetch the manifest for `redis` with the following command:

```
$ ./dist fetch docker.io/library/redis latest mediatype:application/vnd.docker.distribution.manifest.v2+json
```

Note that we have provided a mediatype "hint", nudging the fetch
implementation to grab the correct endpoint. We can hash the output of
that to fetch the same content by digest:

```
$ ./dist fetch docker.io/library/redis sha256:$(./dist fetch docker.io/library/redis latest mediatype:application/vnd.docker.distribution.manifest.v2+json | shasum -a256)
```

Note that the hint is now elided, since we have affixed the content to a
particular hash.

If you are not yet entertained, let's bring `jq` and `xargs` into the
mix for maximum fun. The following incantation fetches the same manifest
and downloads all layers into the convenience of `/dev/null`:

```
$ ./dist fetch docker.io/library/redis sha256:a027a470aa2b9b41cc2539847a97b8a14794ebd0a4c7c5d64e390df6bde56c73 | jq -r '.layers[] | .digest' | xargs -n1 -P10 ./dist fetch docker.io/library/redis > /dev/null
```

This is just the beginning. We should be able to centralize
configuration around fetch to implement a number of distribution
methodologies that have been challenging or impossible up to this point.
The `locator`, mentioned earlier, is a schemaless URL that provides a
host and path that can be used to resolve the remote. By dispatching on
this common identifier, we should be able to support almost any protocol
and discovery mechanism imaginable.

When this is more solidified, we can roll these up into higher-level
operations that can be orchestrated through the `dist` tool or via GRPC.

What a time to be alive!

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2017-01-19 19:03:44 -08:00
parent 4180eebf27
commit 19eecaab12
No known key found for this signature in database
GPG Key ID: 67B3DED84EDC823F
7 changed files with 606 additions and 0 deletions

252
cmd/dist/fetch.go vendored Normal file
View File

@ -0,0 +1,252 @@
package main
import (
contextpkg "context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path"
"strings"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/log"
"github.com/docker/containerd/remotes"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/urfave/cli"
"golang.org/x/net/context/ctxhttp"
)
// TODO(stevvooe): Create "multi-fetch" mode that just takes a remote
// then receives object/hint lines on stdin, returning content as
// needed.
var fetchCommand = cli.Command{
Name: "fetch",
Usage: "retrieve objects from a remote",
ArgsUsage: "[flags] <remote> <object> [<hint>, ...]",
Description: `Fetch objects by identifier from a remote.`,
Flags: []cli.Flag{
cli.DurationFlag{
Name: "timeout",
Usage: "total timeout for fetch",
EnvVar: "CONTAINERD_FETCH_TIMEOUT",
},
},
Action: func(context *cli.Context) error {
var (
ctx = contextpkg.Background()
timeout = context.Duration("timeout")
locator = context.Args().First()
args = context.Args().Tail()
)
if timeout > 0 {
var cancel func()
ctx, cancel = contextpkg.WithTimeout(ctx, timeout)
defer cancel()
}
if locator == "" {
return fmt.Errorf("containerd: remote required")
}
if len(args) < 1 {
return fmt.Errorf("containerd: object required")
}
object := args[0]
hints := args[1:]
resolver, err := getResolver(ctx)
if err != nil {
return err
}
remote, err := resolver.Resolve(ctx, locator)
if err != nil {
return err
}
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(
logrus.Fields{
"remote": locator,
"object": object,
}))
log.G(ctx).Infof("fetching")
rc, err := remote.Fetch(ctx, object, hints...)
if err != nil {
return err
}
defer rc.Close()
if _, err := io.Copy(os.Stdout, rc); err != nil {
return err
}
return nil
},
}
// NOTE(stevvooe): Most of the code below this point is prototype code to
// demonstrate a very simplified docker.io fetcher. We have a lot of hard coded
// values but we leave many of the details down to the fetcher, creating a lot
// of room for ways to fetch content.
// getResolver prepares the resolver from the environment and options.
func getResolver(ctx contextpkg.Context) (remotes.Resolver, error) {
return remotes.ResolverFunc(func(ctx contextpkg.Context, locator string) (remotes.Remote, error) {
if !strings.HasPrefix(locator, "docker.io") {
return nil, errors.Errorf("unsupported locator: %q", locator)
}
var (
base = url.URL{
Scheme: "https",
Host: "registry-1.docker.io",
}
prefix = strings.TrimPrefix(locator, "docker.io/")
)
token, err := getToken(ctx, "repository:"+prefix+":pull")
if err != nil {
return nil, err
}
return remotes.RemoteFunc(func(ctx contextpkg.Context, object string, hints ...string) (io.ReadCloser, error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(
logrus.Fields{
"prefix": prefix, // or repo?
"base": base.String(),
"hints": hints,
},
))
paths, err := getV2URLPaths(prefix, object, hints...)
if err != nil {
return nil, err
}
for _, path := range paths {
base.Path = path
url := base.String()
log.G(ctx).WithField("url", url).Debug("fetch content")
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
for _, mediatype := range remotes.HintValues("mediatype", hints...) {
req.Header.Set("Accept", mediatype)
}
resp, err := ctxhttp.Do(ctx, http.DefaultClient, req)
if err != nil {
return nil, err
}
if resp.StatusCode > 299 {
if resp.StatusCode == http.StatusNotFound {
continue // try one of the other urls.
}
resp.Body.Close()
return nil, errors.Errorf("unexpected status code %v: %v", url, resp.Status)
}
return resp.Body, nil
}
return nil, errors.New("not found")
}), nil
}), nil
}
func getToken(ctx contextpkg.Context, scopes ...string) (string, error) {
var (
u = url.URL{
Scheme: "https",
Host: "auth.docker.io",
Path: "/token",
}
q = url.Values{
"scope": scopes,
"service": []string{"registry.docker.io"}, // usually comes from auth challenge
}
)
u.RawQuery = q.Encode()
log.G(ctx).WithField("token.url", u.String()).Debug("requesting token")
resp, err := ctxhttp.Get(ctx, http.DefaultClient, u.String())
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode > 299 {
return "", errors.Errorf("unexpected status code: %v %v", resp.StatusCode, resp.Status)
}
p, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
var tokenResponse struct {
Token string `json:"token"`
}
if err := json.Unmarshal(p, &tokenResponse); err != nil {
return "", err
}
return tokenResponse.Token, nil
}
// getV2URLPaths generates the canidate urls paths for the object based on the
// set of hints and the provided object id. URLs are returned in the order of
// most to least likely succeed.
func getV2URLPaths(prefix, object string, hints ...string) ([]string, error) {
var urls []string
// TODO(stevvooe): We can probably define a higher-level "type" hint to
// avoid having to do extra round trips to resolve content, as well as
// avoid the tedium of providing media types.
if remotes.HintExists("mediatype", "application/vnd.docker.distribution.manifest.v2+json", hints...) { // TODO(stevvooe): make this handle oci types, as well.
// fast path out if we know we are getting a manifest. Arguably, we
// should fallback to blobs, just in case.
urls = append(urls, path.Join("/v2", prefix, "manifests", object))
}
_, err := digest.Parse(object)
if err == nil {
// we have a digest, use blob or manifest path, depending on hints, may
// need to try both.
urls = append(urls, path.Join("/v2", prefix, "blobs", object))
}
// probably a take, so we go through the manifests endpoint
urls = append(urls, path.Join("/v2", prefix, "manifests", object))
var (
noduplicates []string
seen = map[string]struct{}{}
)
for _, u := range urls {
if _, ok := seen[u]; !ok {
seen[u] = struct{}{}
noduplicates = append(noduplicates, u)
}
}
return noduplicates, nil
}

44
cmd/dist/main.go vendored Normal file
View File

@ -0,0 +1,44 @@
package main
import (
"fmt"
"os"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd"
"github.com/urfave/cli"
)
func main() {
app := cli.NewApp()
app.Name = "dist"
app.Version = containerd.Version
app.Usage = `
___ __
____/ (_)____/ /_
/ __ / / ___/ __/
/ /_/ / (__ ) /_
\__,_/_/____/\__/
distribution tool
`
app.Flags = []cli.Flag{
cli.BoolFlag{
Name: "debug",
Usage: "enable debug output in logs",
},
}
app.Commands = []cli.Command{
fetchCommand,
}
app.Before = func(context *cli.Context) error {
if context.GlobalBool("debug") {
logrus.SetLevel(logrus.DebugLevel)
}
return nil
}
if err := app.Run(os.Args); err != nil {
fmt.Fprintf(os.Stderr, "dist: %s\n", err)
os.Exit(1)
}
}

31
remotes/hints.go Normal file
View File

@ -0,0 +1,31 @@
package remotes
import "strings"
// HintExists returns true if a hint of the provided kind and values exists in
// the set of provided hints.
func HintExists(kind, value string, hints ...string) bool {
for _, hint := range hints {
if strings.HasPrefix(hint, kind) && strings.HasSuffix(hint, value) {
return true
}
}
return false
}
// HintValues returns a slice of the values of the hints that match kind.
func HintValues(kind string, hints ...string) []string {
var values []string
for _, hint := range hints {
if strings.HasPrefix(hint, kind) {
parts := strings.SplitN(hint, ":", 2)
if len(parts) < 2 {
continue
}
values = append(values, parts[1])
}
}
return values
}

39
remotes/remote.go Normal file
View File

@ -0,0 +1,39 @@
package remotes
import (
"context"
"io"
)
type Remote interface {
// Fetch the resource identified by id. The id is opaque to the remote, but
// may typically be a tag or a digest.
//
// Hints are provided to give instruction on how the resource may be
// fetched. They may provide information about the expected type or size.
// They may be protocol specific or help a protocol to identify the most
// efficient fetch methodology.
//
// Hints are the format of `<type>:<content>` where `<type>` is the type
// of the hint and `<content>` can be pretty much anything. For example, a
// media type hint would be the following:
//
// mediatype:application/vnd.docker.distribution.manifest.v2+json
//
// The following hint names are must be honored across all remote
// implementations:
//
// size: specify the expected size in bytes
// mediatype: specify the expected mediatype
//
// The caller should never expect the hints to be honored and should
// validate that returned content is as expected. They are only provided to
// help the remote retrieve the content.
Fetch(ctx context.Context, id string, hints ...string) (io.ReadCloser, error)
}
type RemoteFunc func(context.Context, string, ...string) (io.ReadCloser, error)
func (fn RemoteFunc) Fetch(ctx context.Context, object string, hints ...string) (io.ReadCloser, error) {
return fn(ctx, object, hints...)
}

19
remotes/resolver.go Normal file
View File

@ -0,0 +1,19 @@
package remotes
import "context"
// Resolver provides a remote based on a locator.
type Resolver interface {
// Resolve returns a remote from the locator.
//
// A locator is a scheme-less URI representing the remote. Structurally, it
// has a host and path. The "host" can be used to directly reference a
// specific host or be matched against a specific handler.
Resolve(ctx context.Context, locator string) (Remote, error)
}
type ResolverFunc func(context.Context, string) (Remote, error)
func (fn ResolverFunc) Resolve(ctx context.Context, locator string) (Remote, error) {
return fn(ctx, locator)
}

74
vendor/golang.org/x/net/context/ctxhttp/ctxhttp.go generated vendored Normal file
View File

@ -0,0 +1,74 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build go1.7
// Package ctxhttp provides helper functions for performing context-aware HTTP requests.
package ctxhttp // import "golang.org/x/net/context/ctxhttp"
import (
"io"
"net/http"
"net/url"
"strings"
"golang.org/x/net/context"
)
// Do sends an HTTP request with the provided http.Client and returns
// an HTTP response.
//
// If the client is nil, http.DefaultClient is used.
//
// The provided ctx must be non-nil. If it is canceled or times out,
// ctx.Err() will be returned.
func Do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
if client == nil {
client = http.DefaultClient
}
resp, err := client.Do(req.WithContext(ctx))
// If we got an error, and the context has been canceled,
// the context's error is probably more useful.
if err != nil {
select {
case <-ctx.Done():
err = ctx.Err()
default:
}
}
return resp, err
}
// Get issues a GET request via the Do function.
func Get(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Head issues a HEAD request via the Do function.
func Head(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Post issues a POST request via the Do function.
func Post(ctx context.Context, client *http.Client, url string, bodyType string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest("POST", url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", bodyType)
return Do(ctx, client, req)
}
// PostForm issues a POST request via the Do function.
func PostForm(ctx context.Context, client *http.Client, url string, data url.Values) (*http.Response, error) {
return Post(ctx, client, url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode()))
}

View File

@ -0,0 +1,147 @@
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !go1.7
package ctxhttp // import "golang.org/x/net/context/ctxhttp"
import (
"io"
"net/http"
"net/url"
"strings"
"golang.org/x/net/context"
)
func nop() {}
var (
testHookContextDoneBeforeHeaders = nop
testHookDoReturned = nop
testHookDidBodyClose = nop
)
// Do sends an HTTP request with the provided http.Client and returns an HTTP response.
// If the client is nil, http.DefaultClient is used.
// If the context is canceled or times out, ctx.Err() will be returned.
func Do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
if client == nil {
client = http.DefaultClient
}
// TODO(djd): Respect any existing value of req.Cancel.
cancel := make(chan struct{})
req.Cancel = cancel
type responseAndError struct {
resp *http.Response
err error
}
result := make(chan responseAndError, 1)
// Make local copies of test hooks closed over by goroutines below.
// Prevents data races in tests.
testHookDoReturned := testHookDoReturned
testHookDidBodyClose := testHookDidBodyClose
go func() {
resp, err := client.Do(req)
testHookDoReturned()
result <- responseAndError{resp, err}
}()
var resp *http.Response
select {
case <-ctx.Done():
testHookContextDoneBeforeHeaders()
close(cancel)
// Clean up after the goroutine calling client.Do:
go func() {
if r := <-result; r.resp != nil {
testHookDidBodyClose()
r.resp.Body.Close()
}
}()
return nil, ctx.Err()
case r := <-result:
var err error
resp, err = r.resp, r.err
if err != nil {
return resp, err
}
}
c := make(chan struct{})
go func() {
select {
case <-ctx.Done():
close(cancel)
case <-c:
// The response's Body is closed.
}
}()
resp.Body = &notifyingReader{resp.Body, c}
return resp, nil
}
// Get issues a GET request via the Do function.
func Get(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Head issues a HEAD request via the Do function.
func Head(ctx context.Context, client *http.Client, url string) (*http.Response, error) {
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
return nil, err
}
return Do(ctx, client, req)
}
// Post issues a POST request via the Do function.
func Post(ctx context.Context, client *http.Client, url string, bodyType string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest("POST", url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", bodyType)
return Do(ctx, client, req)
}
// PostForm issues a POST request via the Do function.
func PostForm(ctx context.Context, client *http.Client, url string, data url.Values) (*http.Response, error) {
return Post(ctx, client, url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode()))
}
// notifyingReader is an io.ReadCloser that closes the notify channel after
// Close is called or a Read fails on the underlying ReadCloser.
type notifyingReader struct {
io.ReadCloser
notify chan<- struct{}
}
func (r *notifyingReader) Read(p []byte) (int, error) {
n, err := r.ReadCloser.Read(p)
if err != nil && r.notify != nil {
close(r.notify)
r.notify = nil
}
return n, err
}
func (r *notifyingReader) Close() error {
err := r.ReadCloser.Close()
if r.notify != nil {
close(r.notify)
r.notify = nil
}
return err
}