2016-11-02 01:43:15 +00:00
|
|
|
package content
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bufio"
|
|
|
|
"bytes"
|
2017-02-17 08:07:02 +00:00
|
|
|
"context"
|
2016-11-02 01:43:15 +00:00
|
|
|
"crypto/rand"
|
2016-11-04 00:02:34 +00:00
|
|
|
_ "crypto/sha256" // required for digest package
|
2016-11-02 01:43:15 +00:00
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"io/ioutil"
|
2016-12-02 05:37:58 +00:00
|
|
|
mrand "math/rand"
|
2016-11-02 01:43:15 +00:00
|
|
|
"os"
|
|
|
|
"path/filepath"
|
2016-11-16 04:29:43 +00:00
|
|
|
"reflect"
|
2016-12-02 05:37:58 +00:00
|
|
|
"runtime"
|
2016-11-02 01:43:15 +00:00
|
|
|
"testing"
|
dist: expand functionality of the dist tool
With this change, we add the following commands to the dist tool:
- `ingest`: verify and accept content into storage
- `active`: display active ingest processes
- `list`: list content in storage
- `path`: provide a path to a blob by digest
- `delete`: remove a piece of content from storage
We demonstrate the utility with the following shell pipeline:
```
$ ./dist fetch docker.io/library/redis latest mediatype:application/vnd.docker.distribution.manifest.v2+json | \
jq -r '.layers[] | "./dist fetch docker.io/library/redis "+.digest + "| ./dist ingest --expected-digest "+.digest+" --expected-size "+(.size | tostring) +" docker.io/library/redis@"+.digest' | xargs -I{} -P10 -n1 sh -c "{}"
```
The above fetches a manifest, pipes it to jq, which assembles a shell
pipeline to ingest each layer into the content store. Because the
transactions are keyed by their digest, concurrent downloads and
downloads of repeated content are ignored. Each process is then executed
parallel using xargs.
Put shortly, this is a parallel layer download.
In a separate shell session, could monitor the active downloads with the
following:
```
$ watch -n0.2 ./dist active
```
For now, the content is downloaded into `.content` in the current
working directory. To watch the contents of this directory, you can use
the following:
```
$ watch -n0.2 tree .content
```
This will help to understand what is going on internally.
To get access to the layers, you can use the path command:
```
$./dist path sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa /home/sjd/go/src/github.com/docker/containerd/.content/blobs/sha256/010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
```
When you are done, you can clear out the content with the classic xargs
pipeline:
```
$ ./dist list -q | xargs ./dist delete
```
Note that this is mostly a POC. Things like failed downloads and
abandoned download cleanup aren't quite handled. We'll probably make
adjustments around how content store transactions are handled to address
this.
From here, we'll build out full image pull and create tooling to get
runtime bundles from the fetched content.
Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-01-26 22:08:56 +00:00
|
|
|
"time"
|
2016-11-02 01:43:15 +00:00
|
|
|
|
2017-02-01 02:04:13 +00:00
|
|
|
"github.com/docker/containerd/testutil"
|
2017-01-09 23:10:52 +00:00
|
|
|
"github.com/opencontainers/go-digest"
|
2016-11-02 01:43:15 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
func TestContentWriter(t *testing.T) {
|
2017-02-17 08:07:02 +00:00
|
|
|
ctx, tmpdir, cs, cleanup := contentStoreEnv(t)
|
2016-12-02 05:37:58 +00:00
|
|
|
defer cleanup()
|
2017-02-01 02:04:13 +00:00
|
|
|
defer testutil.DumpDir(t, tmpdir)
|
2016-11-02 01:43:15 +00:00
|
|
|
|
|
|
|
if _, err := os.Stat(filepath.Join(tmpdir, "ingest")); os.IsNotExist(err) {
|
|
|
|
t.Fatal("ingest dir should be created", err)
|
|
|
|
}
|
|
|
|
|
2017-02-17 08:07:02 +00:00
|
|
|
cw, err := cs.Writer(ctx, "myref")
|
2016-11-02 01:43:15 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
if err := cw.Close(); err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// reopen, so we can test things
|
2017-02-17 08:07:02 +00:00
|
|
|
cw, err = cs.Writer(ctx, "myref")
|
2016-11-02 01:43:15 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2016-11-16 03:46:24 +00:00
|
|
|
// make sure that second resume also fails
|
2017-02-17 08:07:02 +00:00
|
|
|
if _, err = cs.Writer(ctx, "myref"); err == nil {
|
2016-11-16 03:46:24 +00:00
|
|
|
// TODO(stevvooe): This also works across processes. Need to find a way
|
|
|
|
// to test that, as well.
|
|
|
|
t.Fatal("no error on second resume")
|
|
|
|
}
|
|
|
|
|
2016-11-16 04:29:43 +00:00
|
|
|
// we should also see this as an active ingestion
|
|
|
|
ingestions, err := cs.Active()
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
dist: expand functionality of the dist tool
With this change, we add the following commands to the dist tool:
- `ingest`: verify and accept content into storage
- `active`: display active ingest processes
- `list`: list content in storage
- `path`: provide a path to a blob by digest
- `delete`: remove a piece of content from storage
We demonstrate the utility with the following shell pipeline:
```
$ ./dist fetch docker.io/library/redis latest mediatype:application/vnd.docker.distribution.manifest.v2+json | \
jq -r '.layers[] | "./dist fetch docker.io/library/redis "+.digest + "| ./dist ingest --expected-digest "+.digest+" --expected-size "+(.size | tostring) +" docker.io/library/redis@"+.digest' | xargs -I{} -P10 -n1 sh -c "{}"
```
The above fetches a manifest, pipes it to jq, which assembles a shell
pipeline to ingest each layer into the content store. Because the
transactions are keyed by their digest, concurrent downloads and
downloads of repeated content are ignored. Each process is then executed
parallel using xargs.
Put shortly, this is a parallel layer download.
In a separate shell session, could monitor the active downloads with the
following:
```
$ watch -n0.2 ./dist active
```
For now, the content is downloaded into `.content` in the current
working directory. To watch the contents of this directory, you can use
the following:
```
$ watch -n0.2 tree .content
```
This will help to understand what is going on internally.
To get access to the layers, you can use the path command:
```
$./dist path sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa /home/sjd/go/src/github.com/docker/containerd/.content/blobs/sha256/010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
```
When you are done, you can clear out the content with the classic xargs
pipeline:
```
$ ./dist list -q | xargs ./dist delete
```
Note that this is mostly a POC. Things like failed downloads and
abandoned download cleanup aren't quite handled. We'll probably make
adjustments around how content store transactions are handled to address
this.
From here, we'll build out full image pull and create tooling to get
runtime bundles from the fetched content.
Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-01-26 22:08:56 +00:00
|
|
|
// clear out the time and meta cause we don't care for this test
|
|
|
|
for i := range ingestions {
|
2017-02-17 08:07:02 +00:00
|
|
|
ingestions[i].UpdatedAt = time.Time{}
|
|
|
|
ingestions[i].StartedAt = time.Time{}
|
dist: expand functionality of the dist tool
With this change, we add the following commands to the dist tool:
- `ingest`: verify and accept content into storage
- `active`: display active ingest processes
- `list`: list content in storage
- `path`: provide a path to a blob by digest
- `delete`: remove a piece of content from storage
We demonstrate the utility with the following shell pipeline:
```
$ ./dist fetch docker.io/library/redis latest mediatype:application/vnd.docker.distribution.manifest.v2+json | \
jq -r '.layers[] | "./dist fetch docker.io/library/redis "+.digest + "| ./dist ingest --expected-digest "+.digest+" --expected-size "+(.size | tostring) +" docker.io/library/redis@"+.digest' | xargs -I{} -P10 -n1 sh -c "{}"
```
The above fetches a manifest, pipes it to jq, which assembles a shell
pipeline to ingest each layer into the content store. Because the
transactions are keyed by their digest, concurrent downloads and
downloads of repeated content are ignored. Each process is then executed
parallel using xargs.
Put shortly, this is a parallel layer download.
In a separate shell session, could monitor the active downloads with the
following:
```
$ watch -n0.2 ./dist active
```
For now, the content is downloaded into `.content` in the current
working directory. To watch the contents of this directory, you can use
the following:
```
$ watch -n0.2 tree .content
```
This will help to understand what is going on internally.
To get access to the layers, you can use the path command:
```
$./dist path sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa /home/sjd/go/src/github.com/docker/containerd/.content/blobs/sha256/010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
```
When you are done, you can clear out the content with the classic xargs
pipeline:
```
$ ./dist list -q | xargs ./dist delete
```
Note that this is mostly a POC. Things like failed downloads and
abandoned download cleanup aren't quite handled. We'll probably make
adjustments around how content store transactions are handled to address
this.
From here, we'll build out full image pull and create tooling to get
runtime bundles from the fetched content.
Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-01-26 22:08:56 +00:00
|
|
|
}
|
|
|
|
|
2016-11-16 04:29:43 +00:00
|
|
|
if !reflect.DeepEqual(ingestions, []Status{
|
|
|
|
{
|
2017-02-17 08:07:02 +00:00
|
|
|
Ref: "myref",
|
|
|
|
Offset: 0,
|
2016-11-16 04:29:43 +00:00
|
|
|
},
|
|
|
|
}) {
|
|
|
|
t.Fatalf("unexpected ingestion set: %v", ingestions)
|
|
|
|
}
|
|
|
|
|
2016-11-02 01:43:15 +00:00
|
|
|
p := make([]byte, 4<<20)
|
|
|
|
if _, err := rand.Read(p); err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
expected := digest.FromBytes(p)
|
|
|
|
|
|
|
|
checkCopy(t, int64(len(p)), cw, bufio.NewReader(ioutil.NopCloser(bytes.NewReader(p))))
|
|
|
|
|
|
|
|
if err := cw.Commit(int64(len(p)), expected); err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := cw.Close(); err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2017-02-17 08:07:02 +00:00
|
|
|
cw, err = cs.Writer(ctx, "aref")
|
2016-11-02 01:43:15 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// now, attempt to write the same data again
|
|
|
|
checkCopy(t, int64(len(p)), cw, bufio.NewReader(ioutil.NopCloser(bytes.NewReader(p))))
|
|
|
|
if err := cw.Commit(int64(len(p)), expected); err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2016-11-04 00:02:34 +00:00
|
|
|
path := checkBlobPath(t, cs, expected)
|
2016-11-02 01:43:15 +00:00
|
|
|
|
|
|
|
// read the data back, make sure its the same
|
|
|
|
pp, err := ioutil.ReadFile(path)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if !bytes.Equal(p, pp) {
|
|
|
|
t.Fatal("mismatched data written to disk")
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2016-12-02 05:37:58 +00:00
|
|
|
func TestWalkBlobs(t *testing.T) {
|
2017-02-17 08:07:02 +00:00
|
|
|
ctx, _, cs, cleanup := contentStoreEnv(t)
|
2016-12-02 05:37:58 +00:00
|
|
|
defer cleanup()
|
|
|
|
|
|
|
|
const (
|
|
|
|
nblobs = 4 << 10
|
|
|
|
maxsize = 4 << 10
|
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
2017-02-17 08:07:02 +00:00
|
|
|
blobs = populateBlobStore(t, ctx, cs, nblobs, maxsize)
|
2016-12-02 05:37:58 +00:00
|
|
|
expected = map[digest.Digest]struct{}{}
|
|
|
|
found = map[digest.Digest]struct{}{}
|
|
|
|
)
|
|
|
|
|
2017-01-24 01:39:44 +00:00
|
|
|
for dgst := range blobs {
|
2016-12-02 05:37:58 +00:00
|
|
|
expected[dgst] = struct{}{}
|
|
|
|
}
|
|
|
|
|
dist: expand functionality of the dist tool
With this change, we add the following commands to the dist tool:
- `ingest`: verify and accept content into storage
- `active`: display active ingest processes
- `list`: list content in storage
- `path`: provide a path to a blob by digest
- `delete`: remove a piece of content from storage
We demonstrate the utility with the following shell pipeline:
```
$ ./dist fetch docker.io/library/redis latest mediatype:application/vnd.docker.distribution.manifest.v2+json | \
jq -r '.layers[] | "./dist fetch docker.io/library/redis "+.digest + "| ./dist ingest --expected-digest "+.digest+" --expected-size "+(.size | tostring) +" docker.io/library/redis@"+.digest' | xargs -I{} -P10 -n1 sh -c "{}"
```
The above fetches a manifest, pipes it to jq, which assembles a shell
pipeline to ingest each layer into the content store. Because the
transactions are keyed by their digest, concurrent downloads and
downloads of repeated content are ignored. Each process is then executed
parallel using xargs.
Put shortly, this is a parallel layer download.
In a separate shell session, could monitor the active downloads with the
following:
```
$ watch -n0.2 ./dist active
```
For now, the content is downloaded into `.content` in the current
working directory. To watch the contents of this directory, you can use
the following:
```
$ watch -n0.2 tree .content
```
This will help to understand what is going on internally.
To get access to the layers, you can use the path command:
```
$./dist path sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa /home/sjd/go/src/github.com/docker/containerd/.content/blobs/sha256/010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
```
When you are done, you can clear out the content with the classic xargs
pipeline:
```
$ ./dist list -q | xargs ./dist delete
```
Note that this is mostly a POC. Things like failed downloads and
abandoned download cleanup aren't quite handled. We'll probably make
adjustments around how content store transactions are handled to address
this.
From here, we'll build out full image pull and create tooling to get
runtime bundles from the fetched content.
Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-01-26 22:08:56 +00:00
|
|
|
if err := cs.Walk(func(path string, fi os.FileInfo, dgst digest.Digest) error {
|
2016-12-02 05:37:58 +00:00
|
|
|
found[dgst] = struct{}{}
|
|
|
|
if checked := checkBlobPath(t, cs, dgst); checked != path {
|
|
|
|
t.Fatalf("blob path did not match: %v != %v", path, checked)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}); err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if !reflect.DeepEqual(expected, found) {
|
|
|
|
t.Fatalf("expected did not match found: %v != %v", found, expected)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-01-24 01:39:44 +00:00
|
|
|
// BenchmarkIngests checks the insertion time over varying blob sizes.
|
|
|
|
//
|
|
|
|
// Note that at the time of writing there is roughly a 4ms insertion overhead
|
|
|
|
// for blobs. This seems to be due to the number of syscalls and file io we do
|
|
|
|
// coordinating the ingestion.
|
|
|
|
func BenchmarkIngests(b *testing.B) {
|
2017-02-17 08:07:02 +00:00
|
|
|
ctx, _, cs, cleanup := contentStoreEnv(b)
|
2017-01-24 01:39:44 +00:00
|
|
|
defer cleanup()
|
|
|
|
|
|
|
|
for _, size := range []int64{
|
|
|
|
1 << 10,
|
|
|
|
4 << 10,
|
|
|
|
512 << 10,
|
|
|
|
1 << 20,
|
|
|
|
} {
|
|
|
|
size := size
|
|
|
|
b.Run(fmt.Sprint(size), func(b *testing.B) {
|
|
|
|
b.StopTimer()
|
|
|
|
blobs := generateBlobs(b, int64(b.N), size)
|
|
|
|
|
|
|
|
var bytes int64
|
|
|
|
for _, blob := range blobs {
|
|
|
|
bytes += int64(len(blob))
|
|
|
|
}
|
|
|
|
b.SetBytes(bytes)
|
|
|
|
|
|
|
|
b.StartTimer()
|
|
|
|
|
|
|
|
for dgst, p := range blobs {
|
2017-02-17 08:07:02 +00:00
|
|
|
checkWrite(b, ctx, cs, dgst, p)
|
2017-01-24 01:39:44 +00:00
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type checker interface {
|
2016-12-02 05:37:58 +00:00
|
|
|
Fatal(args ...interface{})
|
2017-01-24 01:39:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func generateBlobs(t checker, nblobs, maxsize int64) map[digest.Digest][]byte {
|
|
|
|
blobs := map[digest.Digest][]byte{}
|
|
|
|
|
|
|
|
for i := int64(0); i < nblobs; i++ {
|
|
|
|
p := make([]byte, mrand.Int63n(maxsize))
|
|
|
|
|
|
|
|
if _, err := rand.Read(p); err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
dgst := digest.FromBytes(p)
|
|
|
|
blobs[dgst] = p
|
|
|
|
}
|
|
|
|
|
|
|
|
return blobs
|
|
|
|
}
|
|
|
|
|
2017-02-17 08:07:02 +00:00
|
|
|
func populateBlobStore(t checker, ctx context.Context, cs *Store, nblobs, maxsize int64) map[digest.Digest][]byte {
|
2017-01-24 01:39:44 +00:00
|
|
|
blobs := generateBlobs(t, nblobs, maxsize)
|
|
|
|
|
|
|
|
for dgst, p := range blobs {
|
2017-02-17 08:07:02 +00:00
|
|
|
checkWrite(t, ctx, cs, dgst, p)
|
2017-01-24 01:39:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return blobs
|
|
|
|
}
|
|
|
|
|
2017-02-17 08:07:02 +00:00
|
|
|
func contentStoreEnv(t checker) (context.Context, string, *Store, func()) {
|
2016-12-02 05:37:58 +00:00
|
|
|
pc, _, _, ok := runtime.Caller(1)
|
|
|
|
if !ok {
|
|
|
|
t.Fatal("failed to resolve caller")
|
|
|
|
}
|
|
|
|
fn := runtime.FuncForPC(pc)
|
|
|
|
|
|
|
|
tmpdir, err := ioutil.TempDir("", filepath.Base(fn.Name())+"-")
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2017-02-17 08:07:02 +00:00
|
|
|
cs, err := NewStore(tmpdir)
|
2016-12-02 05:37:58 +00:00
|
|
|
if err != nil {
|
|
|
|
os.RemoveAll(tmpdir)
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2017-02-17 08:07:02 +00:00
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
return ctx, tmpdir, cs, func() {
|
|
|
|
cancel()
|
2016-12-02 05:37:58 +00:00
|
|
|
os.RemoveAll(tmpdir)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-01-24 01:39:44 +00:00
|
|
|
func checkCopy(t checker, size int64, dst io.Writer, src io.Reader) {
|
2016-11-02 01:43:15 +00:00
|
|
|
nn, err := io.Copy(dst, src)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if nn != size {
|
|
|
|
t.Fatal("incorrect number of bytes copied")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-01-24 01:39:44 +00:00
|
|
|
func checkBlobPath(t *testing.T, cs *Store, dgst digest.Digest) string {
|
2017-02-17 08:07:02 +00:00
|
|
|
path := cs.blobPath(dgst)
|
|
|
|
|
2016-11-04 00:02:34 +00:00
|
|
|
if path != filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) {
|
2017-01-19 17:18:18 +00:00
|
|
|
t.Fatalf("unexpected path: %q", path)
|
2016-11-04 00:02:34 +00:00
|
|
|
}
|
|
|
|
fi, err := os.Stat(path)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("error stating blob path: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// ensure that only read bits are set.
|
|
|
|
if ((fi.Mode() & os.ModePerm) & 0333) != 0 {
|
|
|
|
t.Fatalf("incorrect permissions: %v", fi.Mode())
|
|
|
|
}
|
|
|
|
|
|
|
|
return path
|
|
|
|
}
|
|
|
|
|
2017-02-17 08:07:02 +00:00
|
|
|
func checkWrite(t checker, ctx context.Context, cs *Store, dgst digest.Digest, p []byte) digest.Digest {
|
|
|
|
if err := WriteBlob(ctx, cs, bytes.NewReader(p), dgst.String(), int64(len(p)), dgst); err != nil {
|
2016-12-02 05:37:58 +00:00
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return dgst
|
|
|
|
}
|