2016-11-02 01:43:15 +00:00
|
|
|
package content
|
|
|
|
|
|
|
|
import (
|
|
|
|
"io"
|
2017-01-24 01:39:44 +00:00
|
|
|
"io/ioutil"
|
2016-11-02 01:43:15 +00:00
|
|
|
"os"
|
|
|
|
"path/filepath"
|
|
|
|
"sync"
|
dist: expand functionality of the dist tool
With this change, we add the following commands to the dist tool:
- `ingest`: verify and accept content into storage
- `active`: display active ingest processes
- `list`: list content in storage
- `path`: provide a path to a blob by digest
- `delete`: remove a piece of content from storage
We demonstrate the utility with the following shell pipeline:
```
$ ./dist fetch docker.io/library/redis latest mediatype:application/vnd.docker.distribution.manifest.v2+json | \
jq -r '.layers[] | "./dist fetch docker.io/library/redis "+.digest + "| ./dist ingest --expected-digest "+.digest+" --expected-size "+(.size | tostring) +" docker.io/library/redis@"+.digest' | xargs -I{} -P10 -n1 sh -c "{}"
```
The above fetches a manifest, pipes it to jq, which assembles a shell
pipeline to ingest each layer into the content store. Because the
transactions are keyed by their digest, concurrent downloads and
downloads of repeated content are ignored. Each process is then executed
parallel using xargs.
Put shortly, this is a parallel layer download.
In a separate shell session, could monitor the active downloads with the
following:
```
$ watch -n0.2 ./dist active
```
For now, the content is downloaded into `.content` in the current
working directory. To watch the contents of this directory, you can use
the following:
```
$ watch -n0.2 tree .content
```
This will help to understand what is going on internally.
To get access to the layers, you can use the path command:
```
$./dist path sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa /home/sjd/go/src/github.com/docker/containerd/.content/blobs/sha256/010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
```
When you are done, you can clear out the content with the classic xargs
pipeline:
```
$ ./dist list -q | xargs ./dist delete
```
Note that this is mostly a POC. Things like failed downloads and
abandoned download cleanup aren't quite handled. We'll probably make
adjustments around how content store transactions are handled to address
this.
From here, we'll build out full image pull and create tooling to get
runtime bundles from the fetched content.
Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-01-26 22:08:56 +00:00
|
|
|
"time"
|
2016-11-02 01:43:15 +00:00
|
|
|
|
2016-12-02 05:37:58 +00:00
|
|
|
"github.com/docker/containerd/log"
|
2016-11-16 03:46:24 +00:00
|
|
|
"github.com/nightlyone/lockfile"
|
2017-01-09 23:10:52 +00:00
|
|
|
"github.com/opencontainers/go-digest"
|
2016-11-02 01:43:15 +00:00
|
|
|
"github.com/pkg/errors"
|
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
ErrBlobNotFound = errors.New("blob not found")
|
|
|
|
|
|
|
|
bufPool = sync.Pool{
|
|
|
|
New: func() interface{} {
|
|
|
|
return make([]byte, 32<<10)
|
|
|
|
},
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
2017-01-24 01:39:44 +00:00
|
|
|
// Store is digest-keyed store for content. All data written into the store is
|
|
|
|
// stored under a verifiable digest.
|
2016-11-02 01:43:15 +00:00
|
|
|
//
|
2017-01-24 01:39:44 +00:00
|
|
|
// Store can generally support multi-reader, single-writer ingest of data,
|
|
|
|
// including resumable ingest.
|
|
|
|
type Store struct {
|
2016-11-02 01:43:15 +00:00
|
|
|
root string
|
|
|
|
}
|
|
|
|
|
2017-01-24 01:39:44 +00:00
|
|
|
func Open(root string) (*Store, error) {
|
2016-11-02 01:43:15 +00:00
|
|
|
if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil && !os.IsExist(err) {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2017-01-24 01:39:44 +00:00
|
|
|
return &Store{
|
2016-11-02 01:43:15 +00:00
|
|
|
root: root,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2016-11-16 04:29:43 +00:00
|
|
|
type Status struct {
|
dist: expand functionality of the dist tool
With this change, we add the following commands to the dist tool:
- `ingest`: verify and accept content into storage
- `active`: display active ingest processes
- `list`: list content in storage
- `path`: provide a path to a blob by digest
- `delete`: remove a piece of content from storage
We demonstrate the utility with the following shell pipeline:
```
$ ./dist fetch docker.io/library/redis latest mediatype:application/vnd.docker.distribution.manifest.v2+json | \
jq -r '.layers[] | "./dist fetch docker.io/library/redis "+.digest + "| ./dist ingest --expected-digest "+.digest+" --expected-size "+(.size | tostring) +" docker.io/library/redis@"+.digest' | xargs -I{} -P10 -n1 sh -c "{}"
```
The above fetches a manifest, pipes it to jq, which assembles a shell
pipeline to ingest each layer into the content store. Because the
transactions are keyed by their digest, concurrent downloads and
downloads of repeated content are ignored. Each process is then executed
parallel using xargs.
Put shortly, this is a parallel layer download.
In a separate shell session, could monitor the active downloads with the
following:
```
$ watch -n0.2 ./dist active
```
For now, the content is downloaded into `.content` in the current
working directory. To watch the contents of this directory, you can use
the following:
```
$ watch -n0.2 tree .content
```
This will help to understand what is going on internally.
To get access to the layers, you can use the path command:
```
$./dist path sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa /home/sjd/go/src/github.com/docker/containerd/.content/blobs/sha256/010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
```
When you are done, you can clear out the content with the classic xargs
pipeline:
```
$ ./dist list -q | xargs ./dist delete
```
Note that this is mostly a POC. Things like failed downloads and
abandoned download cleanup aren't quite handled. We'll probably make
adjustments around how content store transactions are handled to address
this.
From here, we'll build out full image pull and create tooling to get
runtime bundles from the fetched content.
Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-01-26 22:08:56 +00:00
|
|
|
Ref string
|
|
|
|
Size int64
|
|
|
|
ModTime time.Time
|
|
|
|
Meta interface{}
|
2016-11-16 04:29:43 +00:00
|
|
|
}
|
|
|
|
|
dist: expand functionality of the dist tool
With this change, we add the following commands to the dist tool:
- `ingest`: verify and accept content into storage
- `active`: display active ingest processes
- `list`: list content in storage
- `path`: provide a path to a blob by digest
- `delete`: remove a piece of content from storage
We demonstrate the utility with the following shell pipeline:
```
$ ./dist fetch docker.io/library/redis latest mediatype:application/vnd.docker.distribution.manifest.v2+json | \
jq -r '.layers[] | "./dist fetch docker.io/library/redis "+.digest + "| ./dist ingest --expected-digest "+.digest+" --expected-size "+(.size | tostring) +" docker.io/library/redis@"+.digest' | xargs -I{} -P10 -n1 sh -c "{}"
```
The above fetches a manifest, pipes it to jq, which assembles a shell
pipeline to ingest each layer into the content store. Because the
transactions are keyed by their digest, concurrent downloads and
downloads of repeated content are ignored. Each process is then executed
parallel using xargs.
Put shortly, this is a parallel layer download.
In a separate shell session, could monitor the active downloads with the
following:
```
$ watch -n0.2 ./dist active
```
For now, the content is downloaded into `.content` in the current
working directory. To watch the contents of this directory, you can use
the following:
```
$ watch -n0.2 tree .content
```
This will help to understand what is going on internally.
To get access to the layers, you can use the path command:
```
$./dist path sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa /home/sjd/go/src/github.com/docker/containerd/.content/blobs/sha256/010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
```
When you are done, you can clear out the content with the classic xargs
pipeline:
```
$ ./dist list -q | xargs ./dist delete
```
Note that this is mostly a POC. Things like failed downloads and
abandoned download cleanup aren't quite handled. We'll probably make
adjustments around how content store transactions are handled to address
this.
From here, we'll build out full image pull and create tooling to get
runtime bundles from the fetched content.
Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-01-26 22:08:56 +00:00
|
|
|
func (cs *Store) Exists(dgst digest.Digest) (bool, error) {
|
|
|
|
if _, err := os.Stat(cs.blobPath(dgst)); err != nil {
|
|
|
|
if !os.IsNotExist(err) {
|
|
|
|
return false, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return false, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return true, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cs *Store) GetPath(dgst digest.Digest) (string, error) {
|
|
|
|
p := cs.blobPath(dgst)
|
|
|
|
if _, err := os.Stat(p); err != nil {
|
|
|
|
if os.IsNotExist(err) {
|
|
|
|
return "", ErrBlobNotFound
|
|
|
|
}
|
|
|
|
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
|
|
|
|
return p, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Delete removes a blob by its digest.
|
|
|
|
//
|
|
|
|
// While this is safe to do concurrently, safe exist-removal logic must hold
|
|
|
|
// some global lock on the store.
|
|
|
|
func (cs *Store) Delete(dgst digest.Digest) error {
|
|
|
|
if err := os.RemoveAll(cs.blobPath(dgst)); err != nil {
|
|
|
|
if !os.IsNotExist(err) {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cs *Store) blobPath(dgst digest.Digest) string {
|
|
|
|
return filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex())
|
|
|
|
}
|
|
|
|
|
|
|
|
// Stat returns the current status of a blob by the ingest ref.
|
2017-01-24 01:39:44 +00:00
|
|
|
func (cs *Store) Stat(ref string) (Status, error) {
|
|
|
|
dp := filepath.Join(cs.ingestRoot(ref), "data")
|
|
|
|
return cs.stat(dp)
|
|
|
|
}
|
|
|
|
|
|
|
|
// stat works like stat above except uses the path to the ingest.
|
|
|
|
func (cs *Store) stat(ingestPath string) (Status, error) {
|
|
|
|
dp := filepath.Join(ingestPath, "data")
|
|
|
|
dfi, err := os.Stat(dp)
|
|
|
|
if err != nil {
|
|
|
|
return Status{}, err
|
|
|
|
}
|
|
|
|
|
|
|
|
ref, err := readFileString(filepath.Join(ingestPath, "ref"))
|
2016-11-16 04:29:43 +00:00
|
|
|
if err != nil {
|
|
|
|
return Status{}, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return Status{
|
dist: expand functionality of the dist tool
With this change, we add the following commands to the dist tool:
- `ingest`: verify and accept content into storage
- `active`: display active ingest processes
- `list`: list content in storage
- `path`: provide a path to a blob by digest
- `delete`: remove a piece of content from storage
We demonstrate the utility with the following shell pipeline:
```
$ ./dist fetch docker.io/library/redis latest mediatype:application/vnd.docker.distribution.manifest.v2+json | \
jq -r '.layers[] | "./dist fetch docker.io/library/redis "+.digest + "| ./dist ingest --expected-digest "+.digest+" --expected-size "+(.size | tostring) +" docker.io/library/redis@"+.digest' | xargs -I{} -P10 -n1 sh -c "{}"
```
The above fetches a manifest, pipes it to jq, which assembles a shell
pipeline to ingest each layer into the content store. Because the
transactions are keyed by their digest, concurrent downloads and
downloads of repeated content are ignored. Each process is then executed
parallel using xargs.
Put shortly, this is a parallel layer download.
In a separate shell session, could monitor the active downloads with the
following:
```
$ watch -n0.2 ./dist active
```
For now, the content is downloaded into `.content` in the current
working directory. To watch the contents of this directory, you can use
the following:
```
$ watch -n0.2 tree .content
```
This will help to understand what is going on internally.
To get access to the layers, you can use the path command:
```
$./dist path sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa /home/sjd/go/src/github.com/docker/containerd/.content/blobs/sha256/010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
```
When you are done, you can clear out the content with the classic xargs
pipeline:
```
$ ./dist list -q | xargs ./dist delete
```
Note that this is mostly a POC. Things like failed downloads and
abandoned download cleanup aren't quite handled. We'll probably make
adjustments around how content store transactions are handled to address
this.
From here, we'll build out full image pull and create tooling to get
runtime bundles from the fetched content.
Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-01-26 22:08:56 +00:00
|
|
|
Ref: ref,
|
|
|
|
Size: dfi.Size(),
|
|
|
|
ModTime: dfi.ModTime(),
|
2016-11-16 04:29:43 +00:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2017-01-24 01:39:44 +00:00
|
|
|
func (cs *Store) Active() ([]Status, error) {
|
2016-11-16 04:29:43 +00:00
|
|
|
ip := filepath.Join(cs.root, "ingest")
|
|
|
|
|
|
|
|
fp, err := os.Open(ip)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
fis, err := fp.Readdir(-1)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
var active []Status
|
|
|
|
for _, fi := range fis {
|
2017-01-24 01:39:44 +00:00
|
|
|
p := filepath.Join(ip, fi.Name())
|
|
|
|
stat, err := cs.stat(p)
|
2016-11-16 04:29:43 +00:00
|
|
|
if err != nil {
|
|
|
|
if !os.IsNotExist(err) {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO(stevvooe): This is a common error if uploads are being
|
|
|
|
// completed while making this listing. Need to consider taking a
|
|
|
|
// lock on the whole store to coordinate this aspect.
|
|
|
|
//
|
|
|
|
// Another option is to cleanup downloads asynchronously and
|
|
|
|
// coordinate this method with the cleanup process.
|
|
|
|
//
|
|
|
|
// For now, we just skip them, as they really don't exist.
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
active = append(active, stat)
|
|
|
|
}
|
|
|
|
|
|
|
|
return active, nil
|
|
|
|
}
|
|
|
|
|
2016-11-16 03:46:24 +00:00
|
|
|
// TODO(stevvooe): Allow querying the set of blobs in the blob store.
|
|
|
|
|
dist: expand functionality of the dist tool
With this change, we add the following commands to the dist tool:
- `ingest`: verify and accept content into storage
- `active`: display active ingest processes
- `list`: list content in storage
- `path`: provide a path to a blob by digest
- `delete`: remove a piece of content from storage
We demonstrate the utility with the following shell pipeline:
```
$ ./dist fetch docker.io/library/redis latest mediatype:application/vnd.docker.distribution.manifest.v2+json | \
jq -r '.layers[] | "./dist fetch docker.io/library/redis "+.digest + "| ./dist ingest --expected-digest "+.digest+" --expected-size "+(.size | tostring) +" docker.io/library/redis@"+.digest' | xargs -I{} -P10 -n1 sh -c "{}"
```
The above fetches a manifest, pipes it to jq, which assembles a shell
pipeline to ingest each layer into the content store. Because the
transactions are keyed by their digest, concurrent downloads and
downloads of repeated content are ignored. Each process is then executed
parallel using xargs.
Put shortly, this is a parallel layer download.
In a separate shell session, could monitor the active downloads with the
following:
```
$ watch -n0.2 ./dist active
```
For now, the content is downloaded into `.content` in the current
working directory. To watch the contents of this directory, you can use
the following:
```
$ watch -n0.2 tree .content
```
This will help to understand what is going on internally.
To get access to the layers, you can use the path command:
```
$./dist path sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa /home/sjd/go/src/github.com/docker/containerd/.content/blobs/sha256/010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
```
When you are done, you can clear out the content with the classic xargs
pipeline:
```
$ ./dist list -q | xargs ./dist delete
```
Note that this is mostly a POC. Things like failed downloads and
abandoned download cleanup aren't quite handled. We'll probably make
adjustments around how content store transactions are handled to address
this.
From here, we'll build out full image pull and create tooling to get
runtime bundles from the fetched content.
Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-01-26 22:08:56 +00:00
|
|
|
// WalkFunc defines the callback for a blob walk.
|
|
|
|
//
|
|
|
|
// TODO(stevvooe): Remove the file info. Just need size and modtime. Perhaps,
|
|
|
|
// not a huge deal, considering we have a path, but let's not just let this one
|
|
|
|
// go without scrunity.
|
|
|
|
type WalkFunc func(path string, fi os.FileInfo, dgst digest.Digest) error
|
|
|
|
|
|
|
|
func (cs *Store) Walk(fn WalkFunc) error {
|
2016-12-02 05:37:58 +00:00
|
|
|
root := filepath.Join(cs.root, "blobs")
|
|
|
|
var alg digest.Algorithm
|
|
|
|
return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {
|
|
|
|
if !fi.IsDir() && !alg.Available() {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO(stevvooe): There are few more cases with subdirs that should be
|
|
|
|
// handled in case the layout gets corrupted. This isn't strict enough
|
|
|
|
// an may spew bad data.
|
|
|
|
|
|
|
|
if path == root {
|
|
|
|
return nil
|
|
|
|
} else if filepath.Dir(path) == root {
|
|
|
|
alg = digest.Algorithm(filepath.Base(path))
|
|
|
|
|
|
|
|
if !alg.Available() {
|
|
|
|
alg = ""
|
|
|
|
return filepath.SkipDir
|
|
|
|
}
|
|
|
|
|
|
|
|
// descending into a hash directory
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
dgst := digest.NewDigestFromHex(alg.String(), filepath.Base(path))
|
|
|
|
if err := dgst.Validate(); err != nil {
|
|
|
|
// log error but don't report
|
|
|
|
log.L.WithError(err).WithField("path", path).Error("invalid digest for blob path")
|
|
|
|
// if we see this, it could mean some sort of corruption of the
|
|
|
|
// store or extra paths not expected previously.
|
|
|
|
}
|
|
|
|
|
dist: expand functionality of the dist tool
With this change, we add the following commands to the dist tool:
- `ingest`: verify and accept content into storage
- `active`: display active ingest processes
- `list`: list content in storage
- `path`: provide a path to a blob by digest
- `delete`: remove a piece of content from storage
We demonstrate the utility with the following shell pipeline:
```
$ ./dist fetch docker.io/library/redis latest mediatype:application/vnd.docker.distribution.manifest.v2+json | \
jq -r '.layers[] | "./dist fetch docker.io/library/redis "+.digest + "| ./dist ingest --expected-digest "+.digest+" --expected-size "+(.size | tostring) +" docker.io/library/redis@"+.digest' | xargs -I{} -P10 -n1 sh -c "{}"
```
The above fetches a manifest, pipes it to jq, which assembles a shell
pipeline to ingest each layer into the content store. Because the
transactions are keyed by their digest, concurrent downloads and
downloads of repeated content are ignored. Each process is then executed
parallel using xargs.
Put shortly, this is a parallel layer download.
In a separate shell session, could monitor the active downloads with the
following:
```
$ watch -n0.2 ./dist active
```
For now, the content is downloaded into `.content` in the current
working directory. To watch the contents of this directory, you can use
the following:
```
$ watch -n0.2 tree .content
```
This will help to understand what is going on internally.
To get access to the layers, you can use the path command:
```
$./dist path sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa /home/sjd/go/src/github.com/docker/containerd/.content/blobs/sha256/010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
```
When you are done, you can clear out the content with the classic xargs
pipeline:
```
$ ./dist list -q | xargs ./dist delete
```
Note that this is mostly a POC. Things like failed downloads and
abandoned download cleanup aren't quite handled. We'll probably make
adjustments around how content store transactions are handled to address
this.
From here, we'll build out full image pull and create tooling to get
runtime bundles from the fetched content.
Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-01-26 22:08:56 +00:00
|
|
|
return fn(path, fi, dgst)
|
2016-12-02 05:37:58 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2016-11-02 01:43:15 +00:00
|
|
|
// Begin starts a new write transaction against the blob store.
|
|
|
|
//
|
|
|
|
// The argument `ref` is used to identify the transaction. It must be a valid
|
|
|
|
// path component, meaning it has no `/` characters and no `:` (we'll ban
|
|
|
|
// others fs characters, as needed).
|
2017-01-24 01:39:44 +00:00
|
|
|
func (cs *Store) Begin(ref string) (*Writer, error) {
|
|
|
|
path, refp, data, lock, err := cs.ingestPaths(ref)
|
2016-11-02 01:43:15 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// use single path mkdir for this to ensure ref is only base path, in
|
|
|
|
// addition to validation above.
|
|
|
|
if err := os.Mkdir(path, 0755); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2016-11-16 03:46:24 +00:00
|
|
|
if err := tryLock(lock); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2017-01-24 01:39:44 +00:00
|
|
|
// write the ref to a file for later use
|
|
|
|
if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2016-11-02 01:43:15 +00:00
|
|
|
fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0666)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "failed to open data file")
|
|
|
|
}
|
|
|
|
defer fp.Close()
|
|
|
|
|
|
|
|
// re-open the file in append mode
|
|
|
|
fp, err = os.OpenFile(data, os.O_WRONLY|os.O_APPEND, 0666)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "error opening for append")
|
|
|
|
}
|
|
|
|
|
2017-01-24 01:39:44 +00:00
|
|
|
return &Writer{
|
2016-11-02 01:43:15 +00:00
|
|
|
cs: cs,
|
|
|
|
fp: fp,
|
2016-11-16 03:46:24 +00:00
|
|
|
lock: lock,
|
2016-11-02 01:43:15 +00:00
|
|
|
path: path,
|
2017-01-09 23:10:52 +00:00
|
|
|
digester: digest.Canonical.Digester(),
|
2016-11-02 01:43:15 +00:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2017-01-24 01:39:44 +00:00
|
|
|
func (cs *Store) Resume(ref string) (*Writer, error) {
|
|
|
|
path, refp, data, lock, err := cs.ingestPaths(ref)
|
2016-11-02 01:43:15 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2016-11-16 03:46:24 +00:00
|
|
|
if err := tryLock(lock); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2017-01-24 01:39:44 +00:00
|
|
|
refraw, err := readFileString(refp)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "could not read ref")
|
|
|
|
}
|
|
|
|
|
|
|
|
if ref != refraw {
|
|
|
|
// NOTE(stevvooe): This is fairly catastrophic. Either we have some
|
|
|
|
// layout corruption or a hash collision for the ref key.
|
|
|
|
return nil, errors.Wrapf(err, "ref key does not match: %v != %v", ref, refraw)
|
|
|
|
}
|
|
|
|
|
2017-01-09 23:10:52 +00:00
|
|
|
digester := digest.Canonical.Digester()
|
2016-11-02 01:43:15 +00:00
|
|
|
|
2016-11-16 03:46:24 +00:00
|
|
|
// slow slow slow!!, send to goroutine or use resumable hashes
|
2016-11-02 01:43:15 +00:00
|
|
|
fp, err := os.Open(data)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
defer fp.Close()
|
|
|
|
|
2017-02-02 02:14:37 +00:00
|
|
|
p := bufPool.Get().([]byte)
|
|
|
|
defer bufPool.Put(p)
|
|
|
|
|
|
|
|
offset, err := io.CopyBuffer(digester.Hash(), fp, p)
|
2016-11-16 03:46:24 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2016-11-02 01:43:15 +00:00
|
|
|
fp1, err := os.OpenFile(data, os.O_WRONLY|os.O_APPEND, 0666)
|
|
|
|
if err != nil {
|
|
|
|
if os.IsNotExist(err) {
|
|
|
|
return nil, errors.Wrap(err, "ingest does not exist")
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil, errors.Wrap(err, "error opening for append")
|
|
|
|
}
|
|
|
|
|
2017-01-24 01:39:44 +00:00
|
|
|
return &Writer{
|
2016-11-02 01:43:15 +00:00
|
|
|
cs: cs,
|
|
|
|
fp: fp1,
|
2016-11-16 03:46:24 +00:00
|
|
|
lock: lock,
|
2017-01-24 01:39:44 +00:00
|
|
|
ref: ref,
|
2016-11-02 01:43:15 +00:00
|
|
|
path: path,
|
|
|
|
offset: offset,
|
|
|
|
digester: digester,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
dist: expand functionality of the dist tool
With this change, we add the following commands to the dist tool:
- `ingest`: verify and accept content into storage
- `active`: display active ingest processes
- `list`: list content in storage
- `path`: provide a path to a blob by digest
- `delete`: remove a piece of content from storage
We demonstrate the utility with the following shell pipeline:
```
$ ./dist fetch docker.io/library/redis latest mediatype:application/vnd.docker.distribution.manifest.v2+json | \
jq -r '.layers[] | "./dist fetch docker.io/library/redis "+.digest + "| ./dist ingest --expected-digest "+.digest+" --expected-size "+(.size | tostring) +" docker.io/library/redis@"+.digest' | xargs -I{} -P10 -n1 sh -c "{}"
```
The above fetches a manifest, pipes it to jq, which assembles a shell
pipeline to ingest each layer into the content store. Because the
transactions are keyed by their digest, concurrent downloads and
downloads of repeated content are ignored. Each process is then executed
parallel using xargs.
Put shortly, this is a parallel layer download.
In a separate shell session, could monitor the active downloads with the
following:
```
$ watch -n0.2 ./dist active
```
For now, the content is downloaded into `.content` in the current
working directory. To watch the contents of this directory, you can use
the following:
```
$ watch -n0.2 tree .content
```
This will help to understand what is going on internally.
To get access to the layers, you can use the path command:
```
$./dist path sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
sha256:010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa /home/sjd/go/src/github.com/docker/containerd/.content/blobs/sha256/010c454d55e53059beaba4044116ea4636f8dd8181e975d893931c7e7204fffa
```
When you are done, you can clear out the content with the classic xargs
pipeline:
```
$ ./dist list -q | xargs ./dist delete
```
Note that this is mostly a POC. Things like failed downloads and
abandoned download cleanup aren't quite handled. We'll probably make
adjustments around how content store transactions are handled to address
this.
From here, we'll build out full image pull and create tooling to get
runtime bundles from the fetched content.
Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-01-26 22:08:56 +00:00
|
|
|
// Remove an active transaction keyed by ref.
|
|
|
|
func (cs *Store) Remove(ref string) error {
|
|
|
|
root := cs.ingestRoot(ref)
|
|
|
|
if err := os.RemoveAll(root); err != nil {
|
|
|
|
if os.IsNotExist(err) {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-01-24 01:39:44 +00:00
|
|
|
func (cs *Store) ingestRoot(ref string) string {
|
|
|
|
dgst := digest.FromString(ref)
|
|
|
|
return filepath.Join(cs.root, "ingest", dgst.Hex())
|
|
|
|
}
|
2016-11-16 03:46:24 +00:00
|
|
|
|
2017-01-24 01:39:44 +00:00
|
|
|
// ingestPaths are returned, including the lockfile. The paths are the following:
|
|
|
|
//
|
|
|
|
// - root: entire ingest directory
|
|
|
|
// - ref: name of the starting ref, must be unique
|
|
|
|
// - data: file where data is written
|
|
|
|
// - lock: lock file location
|
|
|
|
//
|
|
|
|
func (cs *Store) ingestPaths(ref string) (string, string, string, lockfile.Lockfile, error) {
|
|
|
|
var (
|
|
|
|
fp = cs.ingestRoot(ref)
|
|
|
|
rp = filepath.Join(fp, "ref")
|
|
|
|
lp = filepath.Join(fp, "lock")
|
|
|
|
dp = filepath.Join(fp, "data")
|
|
|
|
)
|
|
|
|
|
|
|
|
lock, err := lockfile.New(lp)
|
2016-11-16 03:46:24 +00:00
|
|
|
if err != nil {
|
2017-01-24 01:39:44 +00:00
|
|
|
return "", "", "", "", errors.Wrapf(err, "error creating lockfile %v", lp)
|
2016-11-02 01:43:15 +00:00
|
|
|
}
|
|
|
|
|
2017-01-24 01:39:44 +00:00
|
|
|
return fp, rp, dp, lock, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func readFileString(path string) (string, error) {
|
|
|
|
p, err := ioutil.ReadFile(path)
|
|
|
|
return string(p), err
|
2016-11-02 01:43:15 +00:00
|
|
|
}
|