cmd/dist, cmd/ctr: end to end image pull

With this changeset, we now have a proof of concept of end to end pull.
Up to this point, the relationship between subsystems has been somewhat
theoretical. We now leverage fetching, the snapshot drivers, the rootfs
service, image metadata and the execution service, validating the proposed
model for containerd. There are a few caveats, including the need to move some
of the access into GRPC services, but the basic components are there.

The first command we will cover here is `dist pull`. This is the analog
of `docker pull` and `git pull`. It performs a full resource fetch for
an image and unpacks the root filesystem into the snapshot drivers. An
example follows:

``` console
$ sudo ./bin/dist pull docker.io/library/redis:latest
docker.io/library/redis:latest:                                                   resolved       |++++++++++++++++++++++++++++++++++++++|
manifest-sha256:4c8fb09e8d634ab823b1c125e64f0e1ceaf216025aa38283ea1b42997f1e8059: done           |++++++++++++++++++++++++++++++++++++++|
layer-sha256:3b281f2bcae3b25c701d53a219924fffe79bdb74385340b73a539ed4020999c4:    done           |++++++++++++++++++++++++++++++++++++++|
config-sha256:e4a35914679d05d25e2fccfd310fde1aa59ffbbf1b0b9d36f7b03db5ca0311b0:   done           |++++++++++++++++++++++++++++++++++++++|
layer-sha256:4b7726832aec75f0a742266c7190c4d2217492722dfd603406208eaa902648d8:    done           |++++++++++++++++++++++++++++++++++++++|
layer-sha256:338a7133395941c85087522582af182d2f6477dbf54ba769cb24ec4fd91d728f:    done           |++++++++++++++++++++++++++++++++++++++|
layer-sha256:83f12ff60ff1132d1e59845e26c41968406b4176c1a85a50506c954696b21570:    done           |++++++++++++++++++++++++++++++++++++++|
layer-sha256:693502eb7dfbc6b94964ae66ebc72d3e32facd981c72995b09794f1e87bac184:    done           |++++++++++++++++++++++++++++++++++++++|
layer-sha256:622732cddc347afc9360b4b04b46c6f758191a1dc73d007f95548658847ee67e:    done           |++++++++++++++++++++++++++++++++++++++|
layer-sha256:19a7e34366a6f558336c364693df538c38307484b729a36fede76432789f084f:    done           |++++++++++++++++++++++++++++++++++++++|
elapsed: 1.6 s                                                                    total:   0.0 B (0.0 B/s)
INFO[0001] unpacking rootfs
```

Note that we haven't integrated rootfs unpacking into the status output, but we
pretty much have what is in docker today (:P). We can see the result of our pull
with the following:

```console
$ sudo ./bin/dist images
REF                            TYPE                                                 DIGEST                                                                  SIZE
docker.io/library/redis:latest application/vnd.docker.distribution.manifest.v2+json sha256:4c8fb09e8d634ab823b1c125e64f0e1ceaf216025aa38283ea1b42997f1e8059 1.8 kB
```

The above shows that we have an image called "docker.io/library/redis:latest"
mapped to the given digest marked with a specific format. We get the size of
the manifest right now, not the full image, but we can add more as we need it.
For the most part, this is all that is needed, but a few tweaks to the model
for naming may need to be added. Specifically, we may want to index under a few
different names, including those qualified by hash or matched by tag versions.
We can do more work in this area as we develop the metadata store.

The name shown above can then be used to run the actual container image. We can
do this with the following command:

```console
$ sudo ./bin/ctr run --id foo docker.io/library/redis:latest /usr/local/bin/redis-server
1:C 17 Mar 17:20:25.316 # Warning: no config file specified, using the default config. In order to specify a config file use /usr/local/bin/redis-server /path/to/redis.conf
1:M 17 Mar 17:20:25.317 * Increased maximum number of open files to 10032 (it was originally set to 1024).
                _._
           _.-``__ ''-._
      _.-``    `.  `_.  ''-._           Redis 3.2.8 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._
 (    '      ,       .-`  | `,    )     Running in standalone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 1
  `-._    `-._  `-./  _.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |           http://redis.io
  `-._    `-._`-.__.-'_.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |
  `-._    `-._`-.__.-'_.-'    _.-'
      `-._    `-.__.-'    _.-'
          `-._        _.-'
              `-.__.-'

1:M 17 Mar 17:20:25.326 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
1:M 17 Mar 17:20:25.326 # Server started, Redis version 3.2.8
1:M 17 Mar 17:20:25.326 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
1:M 17 Mar 17:20:25.326 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.
1:M 17 Mar 17:20:25.326 * The server is now ready to accept connections on port 6379
```

Wow! So, now we are running `redis`!

There are still a few things to work out. Notice that we have to specify the
command as part of the arguments to `ctr run`. This is because are not yet
reading the image config and converting it to an OCI runtime config. With the
base laid in this PR, adding such functionality should be straightforward.

While this is a _little_ messy, this is great progress. It should be easy
sailing from here.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2017-03-17 17:00:52 -07:00
parent fa71fc91fa
commit e53539c58f
No known key found for this signature in database
GPG Key ID: 67B3DED84EDC823F
11 changed files with 710 additions and 22 deletions

View File

@ -38,6 +38,13 @@ containerd client
Usage: "socket path for containerd's GRPC server",
Value: "/run/containerd/containerd.sock",
},
cli.StringFlag{
// TODO(stevvooe): for now, we allow circumventing the GRPC. Once
// we have clear separation, this will likely go away.
Name: "root",
Usage: "path to content store root",
Value: "/var/lib/containerd",
},
}
app.Commands = []cli.Command{
runCommand,

View File

@ -7,11 +7,16 @@ import (
"path/filepath"
"runtime"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"github.com/Sirupsen/logrus"
"github.com/crosbymichael/console"
"github.com/docker/containerd/api/services/execution"
"github.com/docker/containerd/api/types/mount"
rootfsapi "github.com/docker/containerd/api/services/rootfs"
"github.com/docker/containerd/image"
protobuf "github.com/gogo/protobuf/types"
"github.com/opencontainers/image-spec/identity"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/urfave/cli"
@ -176,6 +181,7 @@ var runCommand = cli.Command{
},
},
Action: func(context *cli.Context) error {
ctx := gocontext.Background()
id := context.String("id")
if id == "" {
return errors.New("container id must be provided")
@ -189,29 +195,69 @@ var runCommand = cli.Command{
if err != nil {
return err
}
events, err := containers.Events(gocontext.Background(), &execution.EventsRequest{})
events, err := containers.Events(ctx, &execution.EventsRequest{})
if err != nil {
return err
}
abs, err := filepath.Abs(context.String("rootfs"))
provider, err := getContentProvider(context)
if err != nil {
return err
}
// for ctr right now just do a bind mount
rootfs := []*mount.Mount{
{
Type: "bind",
Source: abs,
Options: []string{
"rw",
"rbind",
},
},
rootfsClient, err := getRootFSService(context)
if err != nil {
return err
}
db, err := getDB(context, false)
if err != nil {
return errors.Wrap(err, "failed opening database")
}
defer db.Close()
tx, err := db.Begin(false)
if err != nil {
return err
}
defer tx.Rollback()
ref := context.Args().First()
im, err := image.Get(tx, ref)
if err != nil {
return errors.Wrapf(err, "could not resolve %q", ref)
}
// let's close out our db and tx so we don't hold the lock whilst running.
tx.Rollback()
db.Close()
diffIDs, err := im.RootFS(ctx, provider)
if err != nil {
return err
}
if _, err := rootfsClient.Prepare(gocontext.TODO(), &rootfsapi.PrepareRequest{
Name: id,
ChainID: identity.ChainID(diffIDs),
}); err != nil {
if grpc.Code(err) != codes.AlreadyExists {
return err
}
}
resp, err := rootfsClient.Mounts(gocontext.TODO(), &rootfsapi.MountsRequest{
Name: id,
})
if err != nil {
return err
}
rootfs := resp.Mounts
var s *specs.Spec
if config := context.String("runtime-config"); config == "" {
s = spec(id, []string(context.Args()), context.Bool("tty"))
s = spec(id, []string(context.Args().Tail()), context.Bool("tty"))
} else {
s, err = customSpec(config)
if err != nil {
@ -251,6 +297,7 @@ var runCommand = cli.Command{
if err != nil {
return err
}
if _, err := containers.Start(gocontext.Background(), &execution.StartRequest{
ID: response.ID,
}); err != nil {

View File

@ -14,8 +14,14 @@ import (
gocontext "context"
"github.com/boltdb/bolt"
contentapi "github.com/docker/containerd/api/services/content"
"github.com/docker/containerd/api/services/execution"
rootfsapi "github.com/docker/containerd/api/services/rootfs"
"github.com/docker/containerd/api/types/container"
"github.com/docker/containerd/content"
"github.com/docker/containerd/image"
contentservice "github.com/docker/containerd/services/content"
"github.com/pkg/errors"
"github.com/tonistiigi/fifo"
"github.com/urfave/cli"
@ -112,6 +118,43 @@ func getExecutionService(context *cli.Context) (execution.ContainerServiceClient
return execution.NewContainerServiceClient(conn), nil
}
func getContentProvider(context *cli.Context) (content.Provider, error) {
conn, err := getGRPCConnection(context)
if err != nil {
return nil, err
}
return contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)), nil
}
func getRootFSService(context *cli.Context) (rootfsapi.RootFSClient, error) {
conn, err := getGRPCConnection(context)
if err != nil {
return nil, err
}
return rootfsapi.NewRootFSClient(conn), nil
}
func getDB(ctx *cli.Context, readonly bool) (*bolt.DB, error) {
// TODO(stevvooe): For now, we operate directly on the database. We will
// replace this with a GRPC service when the details are more concrete.
path := filepath.Join(ctx.GlobalString("root"), "meta.db")
db, err := bolt.Open(path, 0644, &bolt.Options{
ReadOnly: readonly,
})
if err != nil {
return nil, err
}
if !readonly {
if err := image.InitDB(db); err != nil {
return nil, err
}
}
return db, nil
}
func getTempDir(id string) (string, error) {
err := os.MkdirAll(filepath.Join(os.TempDir(), "ctr"), 0700)
if err != nil {

23
cmd/dist/common.go vendored
View File

@ -5,7 +5,9 @@ import (
"path/filepath"
"time"
"github.com/boltdb/bolt"
"github.com/docker/containerd/content"
"github.com/docker/containerd/image"
"github.com/urfave/cli"
"google.golang.org/grpc"
)
@ -34,3 +36,24 @@ func connectGRPC(context *cli.Context) (*grpc.ClientConn, error) {
}),
)
}
func getDB(ctx *cli.Context, readonly bool) (*bolt.DB, error) {
// TODO(stevvooe): For now, we operate directly on the database. We will
// replace this with a GRPC service when the details are more concrete.
path := filepath.Join(ctx.GlobalString("root"), "meta.db")
db, err := bolt.Open(path, 0644, &bolt.Options{
ReadOnly: readonly,
})
if err != nil {
return nil, err
}
if !readonly {
if err := image.InitDB(db); err != nil {
return nil, err
}
}
return db, nil
}

46
cmd/dist/images.go vendored Normal file
View File

@ -0,0 +1,46 @@
package main
import (
"fmt"
"os"
"text/tabwriter"
"github.com/docker/containerd/image"
"github.com/docker/containerd/progress"
"github.com/pkg/errors"
"github.com/urfave/cli"
)
var imagesCommand = cli.Command{
Name: "images",
Usage: "list images known to containerd",
ArgsUsage: "[flags] <ref>",
Description: `List images registered with containerd.`,
Flags: []cli.Flag{},
Action: func(clicontext *cli.Context) error {
db, err := getDB(clicontext, true)
if err != nil {
return errors.Wrap(err, "failed to open database")
}
tx, err := db.Begin(false)
if err != nil {
return errors.Wrap(err, "could not start transaction")
}
defer tx.Rollback()
images, err := image.List(tx)
if err != nil {
return errors.Wrap(err, "failed to list images")
}
tw := tabwriter.NewWriter(os.Stdout, 1, 8, 1, ' ', 0)
fmt.Fprintln(tw, "REF\tTYPE\tDIGEST\tSIZE\t")
for _, image := range images {
fmt.Fprintf(tw, "%v\t%v\t%v\t%v\t\n", image.Name, image.Descriptor.MediaType, image.Descriptor.Digest, progress.Bytes(image.Descriptor.Size))
}
tw.Flush()
return nil
},
}

2
cmd/dist/main.go vendored
View File

@ -61,6 +61,8 @@ distribution tool
},
}
app.Commands = []cli.Command{
imagesCommand,
pullCommand,
fetchCommand,
fetchObjectCommand,
ingestCommand,

245
cmd/dist/pull.go vendored Normal file
View File

@ -0,0 +1,245 @@
package main
import (
"context"
"encoding/json"
"os"
"text/tabwriter"
"time"
contentapi "github.com/docker/containerd/api/services/content"
rootfsapi "github.com/docker/containerd/api/services/rootfs"
"github.com/docker/containerd/content"
"github.com/docker/containerd/image"
"github.com/docker/containerd/log"
"github.com/docker/containerd/progress"
"github.com/docker/containerd/remotes"
contentservice "github.com/docker/containerd/services/content"
rootfsservice "github.com/docker/containerd/services/rootfs"
"github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/urfave/cli"
"golang.org/x/sync/errgroup"
)
var pullCommand = cli.Command{
Name: "pull",
Usage: "pull an image from a remote",
ArgsUsage: "[flags] <ref>",
Description: `Fetch and prepare an image for use in containerd.
After pulling an image, it should be ready to use the same reference in a run
command. As part of this process, we do the following:
1. Fetch all resources into containerd.
2. Prepare the snapshot filesystem with the pulled resources.
3. Register metadata for the image.
`,
Flags: []cli.Flag{},
Action: func(clicontext *cli.Context) error {
var (
ctx = background
ref = clicontext.Args().First()
)
conn, err := connectGRPC(clicontext)
if err != nil {
return err
}
db, err := getDB(clicontext, false)
if err != nil {
return err
}
defer db.Close()
tx, err := db.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()
resolver, err := getResolver(ctx)
if err != nil {
return err
}
ongoing := newJobs()
ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn))
provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
cs, err := resolveContentStore(clicontext)
if err != nil {
return err
}
eg, ctx := errgroup.WithContext(ctx)
var resolvedImageName string
resolved := make(chan struct{})
eg.Go(func() error {
ongoing.add(ref)
name, desc, fetcher, err := resolver.Resolve(ctx, ref)
if err != nil {
return err
}
log.G(ctx).WithField("image", name).Debug("fetching")
resolvedImageName = name
close(resolved)
eg.Go(func() error {
return image.Register(tx, name, desc)
})
defer func() {
if err := tx.Commit(); err != nil {
log.G(ctx).WithError(err).Error("commit failed")
}
}()
return image.Dispatch(ctx,
image.Handlers(image.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ongoing.add(remotes.MakeRefKey(ctx, desc))
return nil, nil
}),
remotes.FetchHandler(ingester, fetcher),
image.ChildrenHandler(provider)),
desc)
})
errs := make(chan error)
go func() {
defer close(errs)
errs <- eg.Wait()
}()
defer func() {
ctx := context.Background()
tx, err := db.Begin(false)
if err != nil {
log.G(ctx).Fatal(err)
}
// TODO(stevvooe): This section unpacks the layers and resolves the
// root filesystem chainid for the image. For now, we just print
// it, but we should keep track of this in the metadata storage.
image, err := image.Get(tx, resolvedImageName)
if err != nil {
log.G(ctx).Fatal(err)
}
provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn))
p, err := content.ReadBlob(ctx, provider, image.Descriptor.Digest)
if err != nil {
log.G(ctx).Fatal(err)
}
var manifest ocispec.Manifest
if err := json.Unmarshal(p, &manifest); err != nil {
log.G(ctx).Fatal(err)
}
rootfs := rootfsservice.NewUnpackerFromClient(rootfsapi.NewRootFSClient(conn))
log.G(ctx).Info("unpacking rootfs")
chainID, err := rootfs.Unpack(ctx, manifest.Layers)
if err != nil {
log.G(ctx).Fatal(err)
}
diffIDs, err := image.RootFS(ctx, provider)
if err != nil {
log.G(ctx).WithError(err).Fatal("failed resolving rootfs")
}
expectedChainID := identity.ChainID(diffIDs)
if expectedChainID != chainID {
log.G(ctx).Fatal("rootfs service did not match chainid")
}
}()
var (
ticker = time.NewTicker(100 * time.Millisecond)
fw = progress.NewWriter(os.Stdout)
start = time.Now()
done bool
)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fw.Flush()
tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
js := ongoing.jobs()
statuses := map[string]statusInfo{}
activeSeen := map[string]struct{}{}
if !done {
active, err := cs.Active()
if err != nil {
log.G(ctx).WithError(err).Error("active check failed")
continue
}
// update status of active entries!
for _, active := range active {
statuses[active.Ref] = statusInfo{
Ref: active.Ref,
Status: "downloading",
Offset: active.Offset,
Total: active.Total,
StartedAt: active.StartedAt,
UpdatedAt: active.UpdatedAt,
}
activeSeen[active.Ref] = struct{}{}
}
}
// now, update the items in jobs that are not in active
for _, j := range js {
if _, ok := activeSeen[j]; ok {
continue
}
status := "done"
if j == ref {
select {
case <-resolved:
status = "resolved"
default:
status = "resolving"
}
}
statuses[j] = statusInfo{
Ref: j,
Status: status, // for now!
}
}
var ordered []statusInfo
for _, j := range js {
ordered = append(ordered, statuses[j])
}
display(tw, ordered, start)
tw.Flush()
if done {
fw.Flush()
return nil
}
case err := <-errs:
if err != nil {
return err
}
done = true
case <-ctx.Done():
done = true // allow ui to update once more
}
}
},
}

View File

@ -10,6 +10,19 @@ import (
"github.com/pkg/errors"
)
// ReadBlob retrieves the entire contents of the blob from the provider.
//
// Avoid using this for large blobs, such as layers.
func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byte, error) {
rc, err := provider.Reader(ctx, dgst)
if err != nil {
return nil, err
}
defer rc.Close()
return ioutil.ReadAll(rc)
}
// WriteBlob writes data with the expected digest into the content store. If
// expected already exists, the method returns immediately and the reader will
// not be consumed.

View File

@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"github.com/docker/containerd/content"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -131,13 +130,7 @@ func ChildrenHandler(provider content.Provider) HandlerFunc {
return nil, fmt.Errorf("%v not yet supported", desc.MediaType)
}
rc, err := provider.Reader(ctx, desc.Digest)
if err != nil {
return nil, err
}
defer rc.Close()
p, err := ioutil.ReadAll(rc)
p, err := content.ReadBlob(ctx, provider, desc.Digest)
if err != nil {
return nil, err
}

88
image/image.go Normal file
View File

@ -0,0 +1,88 @@
package image
import (
"context"
"encoding/json"
"errors"
"io/ioutil"
"github.com/docker/containerd/content"
"github.com/docker/containerd/log"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
// Image provides the model for how containerd views container images.
type Image struct {
Name string
Descriptor ocispec.Descriptor
}
// TODO(stevvooe): Many of these functions make strong platform assumptions,
// which are untrue in a lot of cases. More refactoring must be done here to
// make this work in all cases.
// Config resolves the image configuration descriptor.
//
// The caller can then use the descriptor to resolve and process the
// configuration of the image.
func (image *Image) Config(ctx context.Context, provider content.Provider) (ocispec.Descriptor, error) {
var configDesc ocispec.Descriptor
return configDesc, Walk(ctx, HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
switch image.Descriptor.MediaType {
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
rc, err := provider.Reader(ctx, image.Descriptor.Digest)
if err != nil {
return nil, err
}
defer rc.Close()
p, err := ioutil.ReadAll(rc)
if err != nil {
return nil, err
}
var manifest ocispec.Manifest
if err := json.Unmarshal(p, &manifest); err != nil {
return nil, err
}
configDesc = manifest.Config
return nil, nil
default:
return nil, errors.New("could not resolve config")
}
}), image.Descriptor)
}
// RootFS returns the unpacked diffids that make up and images rootfs.
//
// These are used to verify that a set of layers unpacked to the expected
// values.
func (image *Image) RootFS(ctx context.Context, provider content.Provider) ([]digest.Digest, error) {
desc, err := image.Config(ctx, provider)
if err != nil {
return nil, err
}
p, err := content.ReadBlob(ctx, provider, desc.Digest)
if err != nil {
log.G(ctx).Fatal(err)
}
var config ocispec.Image
if err := json.Unmarshal(p, &config); err != nil {
log.G(ctx).Fatal(err)
}
// TODO(stevvooe): Remove this bit when OCI structure uses correct type for
// rootfs.DiffIDs.
var diffIDs []digest.Digest
for _, diffID := range config.RootFS.DiffIDs {
diffIDs = append(diffIDs, digest.Digest(diffID))
}
return diffIDs, nil
}

181
image/storage.go Normal file
View File

@ -0,0 +1,181 @@
package image
import (
"encoding/binary"
"fmt"
"github.com/boltdb/bolt"
"github.com/docker/containerd/log"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
var (
errImageUnknown = fmt.Errorf("image: unknown")
errNoTx = fmt.Errorf("no transaction available")
)
var (
bucketKeyStorageVersion = []byte("v1")
bucketKeyImages = []byte("images")
bucketKeyDigest = []byte("digest")
bucketKeyMediaType = []byte("mediatype")
bucketKeySize = []byte("size")
)
// TODO(stevvooe): This file comprises the data required to implement the
// "metadata" store. For now, it is bound tightly to the local machine and bolt
// but we can take this and use it to define a service interface.
func InitDB(db *bolt.DB) error {
log.L.Debug("init db")
return db.Update(func(tx *bolt.Tx) error {
_, err := createBucketIfNotExists(tx, bucketKeyStorageVersion, bucketKeyImages)
return err
})
}
func Register(tx *bolt.Tx, name string, desc ocispec.Descriptor) error {
return withImagesBucket(tx, func(bkt *bolt.Bucket) error {
ibkt, err := bkt.CreateBucketIfNotExists([]byte(name))
if err != nil {
return err
}
var (
buf [binary.MaxVarintLen64]byte
sizeEncoded []byte = buf[:]
)
sizeEncoded = sizeEncoded[:binary.PutVarint(sizeEncoded, desc.Size)]
if len(sizeEncoded) == 0 {
return fmt.Errorf("failed encoding size = %v", desc.Size)
}
for _, v := range [][2][]byte{
{bucketKeyDigest, []byte(desc.Digest)},
{bucketKeyMediaType, []byte(desc.MediaType)},
{bucketKeySize, sizeEncoded},
} {
if err := ibkt.Put(v[0], v[1]); err != nil {
return err
}
}
return nil
})
}
func Get(tx *bolt.Tx, name string) (Image, error) {
var image Image
if err := withImageBucket(tx, name, func(bkt *bolt.Bucket) error {
image.Name = name
return readImage(&image, bkt)
}); err != nil {
return Image{}, err
}
return image, nil
}
func List(tx *bolt.Tx) ([]Image, error) {
var images []Image
if err := withImagesBucket(tx, func(bkt *bolt.Bucket) error {
return bkt.ForEach(func(k, v []byte) error {
var (
image = Image{
Name: string(k),
}
kbkt = bkt.Bucket(k)
)
if err := readImage(&image, kbkt); err != nil {
return err
}
images = append(images, image)
return nil
})
}); err != nil {
return nil, err
}
return images, nil
}
func readImage(image *Image, bkt *bolt.Bucket) error {
return bkt.ForEach(func(k, v []byte) error {
if v == nil {
return nil // skip it? a bkt maybe?
}
// TODO(stevvooe): This is why we need to use byte values for
// keys, rather than full arrays.
switch string(k) {
case string(bucketKeyDigest):
image.Descriptor.Digest = digest.Digest(v)
case string(bucketKeyMediaType):
image.Descriptor.MediaType = string(v)
case string(bucketKeySize):
image.Descriptor.Size, _ = binary.Varint(v)
}
return nil
})
}
func createBucketIfNotExists(tx *bolt.Tx, keys ...[]byte) (*bolt.Bucket, error) {
bkt, err := tx.CreateBucketIfNotExists(keys[0])
if err != nil {
return nil, err
}
for _, key := range keys[1:] {
bkt, err = bkt.CreateBucketIfNotExists(key)
if err != nil {
return nil, err
}
}
return bkt, nil
}
func withImagesBucket(tx *bolt.Tx, fn func(bkt *bolt.Bucket) error) error {
bkt := getImagesBucket(tx)
if bkt == nil {
return errImageUnknown
}
return fn(bkt)
}
func withImageBucket(tx *bolt.Tx, name string, fn func(bkt *bolt.Bucket) error) error {
bkt := getImageBucket(tx, name)
if bkt == nil {
return errImageUnknown
}
return fn(bkt)
}
func getImagesBucket(tx *bolt.Tx) *bolt.Bucket {
return getBucket(tx, bucketKeyStorageVersion, bucketKeyImages)
}
func getImageBucket(tx *bolt.Tx, name string) *bolt.Bucket {
return getBucket(tx, bucketKeyStorageVersion, bucketKeyImages, []byte(name))
}
func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket {
bkt := tx.Bucket(keys[0])
for _, key := range keys[1:] {
if bkt == nil {
break
}
bkt = bkt.Bucket(key)
}
return bkt
}