This commit is contained in:
Misty Stanley-Jones 2016-09-28 14:25:04 -07:00
parent e9de6f2a44
commit d4f01b812c
31 changed files with 2000 additions and 0 deletions

View file

@ -0,0 +1,38 @@
Migration
=========
Migrate all tag and manifest metadata into the new tag/metadata store using
rethinkdb defined within `manager/`.
## How?
Similar to mark and sweep:
1. Iterate through all repositories
2. For each repository, iterate through each tag
3. For each tag load the manifest and:
1. store the manifest plus config blob metadata
2. store the tag data
Once the migration completes update the `isRepoMetadataMigrated` flag (to be
renamed) to true.
## Notes
The tagstore middleware will ensure that any new pushes since migration starts
are properly inserted in the database. This means that we do not need to worry
about stale data from uploads started after the migration.
## Problems
**Resumes**
This needs to be interruptable; if the task fails we should start from where we
left off (or near); we shouldn't start from scratch.
In order to do this we store the name of the repository we're currently
migrating; we can iterate through all repositories until we reach the current
repository and then restart migration of all tags.
This is an easy and low-cost solution to resumes vs always saving the name of
the tags we're migrating.

View file

@ -0,0 +1,82 @@
package migration
import (
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/reference"
"github.com/palantir/stacktrace"
log "github.com/Sirupsen/logrus"
)
type Enumerator interface {
EnumerateRepo(ctx context.Context, reg distribution.Namespace, repoName string) error
}
// NewEnumerator returns an enumerator which provides functions to iterate over
// a repository's tags, calling the given tagEnumerator function for each tag.
func NewEnumerator(onGetTag tagEnumerator) Enumerator {
return &enumerator{onGetTag}
}
// tagEnumerator is a function signature for handling a specific repository's tag
// on each tieration
type tagEnumerator func(ctx context.Context, repo distribution.Repository, tagName string, tag distribution.Descriptor) error
// enumerator handles iterating over a repository's tags, calling `onGetTag` on
// each tag
type enumerator struct {
onGetTag tagEnumerator
}
// EnumerateRepo iterates over a given repository's tags, calling `EnumerateTag`
// on each tag. The repository is specified as a string via the `repoName`
// argument.
// A context and registry (distribution.Namespace) must be supplied with valid,
// instantiated drivers.
func (e *enumerator) EnumerateRepo(ctx context.Context, reg distribution.Namespace, repoName string) error {
named, err := reference.ParseNamed(repoName)
if err != nil {
log.WithField("error", err).Errorf("failed to parse repo name %s", repoName)
return nil
}
repo, err := reg.Repository(ctx, named)
if err != nil {
log.WithField("error", err).Errorf("failed to construct repository %s", repoName)
return nil
}
// enumerate all repository tags
tags, err := repo.Tags(ctx).All(ctx)
if err != nil {
log.WithField("error", err).Errorf("failed to return all tags for repository %s", repoName)
return nil
}
for _, t := range tags {
if err = e.EnumerateTags(ctx, repo, t); err != nil {
log.WithField("error", err).Errorf("error processing tag during enumeration %s", t)
}
}
return nil
}
// EnumerateTags is called with a tag name as a string, loads the tag's
// descriptor and delegates to `enumerator.onGetTag` with the tag name
// and descriptor for further processing.
//
// This allows us to pass custom functions for migration and consistency
// checking whilst leveraging the same enumeration code.
func (e *enumerator) EnumerateTags(ctx context.Context, repo distribution.Repository, tagName string) error {
// TagService.All returns a slice of strings instead of a concrete
// distribution.Descriptor. Here we transform the tag name into a
// descriptor and call the supplied onGetTag function.
desc, err := repo.Tags(ctx).Get(ctx, tagName)
if err != nil {
return stacktrace.NewError("failed retrieving tag descriptor for tag %s: %s", tagName, err)
}
return e.onGetTag(ctx, repo, tagName, desc)
}

View file

@ -0,0 +1,156 @@
package migration
import (
"github.com/docker/dhe-deploy/manager/schema"
"github.com/docker/dhe-deploy/registry/middleware"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/manifest/schema2"
"github.com/palantir/stacktrace"
)
func NewMigration(reg distribution.Namespace, store middleware.Store) *migration {
m := &migration{
isFromResume: false,
reg: reg,
store: store,
}
m.enumerator = NewEnumerator(m.AddTagAndManifest)
return m
}
func NewMigrationWithEnumerator(reg distribution.Namespace, e Enumerator) *migration {
return &migration{
isFromResume: false,
enumerator: e,
reg: reg,
}
}
// migration handles the migration process for moving tag and manifest
// information for repositories (stored as files in distribution) into our
// tagstore.
type migration struct {
// reg is a distribution.Namespace instance instantiated with storage
// drivers
reg distribution.Namespace
// isFromResume indicates whether this migration has been started because
// of a previously failed attempt
isFromResume bool
// currentRepo stores the repository we're currently migrating (or have
// just resumed from)
currentRepo string
// enumerator handles iterating through each repository's tags
enumerator Enumerator
// store
store middleware.Store
}
func (m *migration) Resume(from string) {
m.isFromResume = true
m.currentRepo = from
}
// Migrate begins migration from either the start of all repositories or
// `currentRepo` if `isFromResume` is true.
//
// If the migration fails the name of the current repository and the error is
// returned.
func (m *migration) Migrate(ctx context.Context) (repo string, err error) {
repositoryEnumerator, ok := m.reg.(distribution.RepositoryEnumerator)
if !ok {
return "", stacktrace.NewError("unable to convert Namespace to RepositoryEnumerator")
}
hasResumed := false
err = repositoryEnumerator.Enumerate(ctx, func(repoName string) error {
repo = repoName
if m.isFromResume && !hasResumed {
// if the repository we're iterating through is before `currentRepo`,
// therefore we can skip this as we've already migrated this repo
// in a previous migration attempt
if repoName != m.currentRepo {
return nil
}
// this is the same repo as the last attempt, so we can continue
// the migration.
hasResumed = true
}
context.GetLoggerWithFields(ctx, map[interface{}]interface{}{
"repo": repoName,
}).Infof("enumerating repository")
err := m.enumerator.EnumerateRepo(ctx, m.reg, repoName)
if err != nil {
context.GetLoggerWithFields(ctx, map[interface{}]interface{}{
"repo": repoName,
"error": err,
}).Errorf("error enumerating repository")
}
return err
})
return repo, err
}
// tag represents a singla tag which is being migrated into the tagstore.
type tag struct {
dbTag *schema.Tag
dbManifest *schema.Manifest
// store is an implementation of the middleware store interface which
// saves tags and manifests to the DB
store middleware.Store
}
// resolveTagAndManifest constructs a concrete schema.Tag and schema.Manifest
// from the blobs stored within the registry.
func (m *migration) AddTagAndManifest(ctx context.Context, repo distribution.Repository, tagName string, tag distribution.Descriptor) error {
repoName := repo.Named().Name()
// Load the manifest as referred to by the tag
mfstService, err := repo.Manifests(ctx)
if err != nil {
return stacktrace.NewError("unable to construct manifest service for '%s:%s': %v", repoName, tagName, err)
}
manifest, err := mfstService.Get(ctx, tag.Digest)
if err != nil {
return stacktrace.NewError("unable to retrieve manifest service for '%s:%s': %v", repoName, tagName, err)
}
// Note that the store expects the context to have a key named "target"
// with the config blob; this is due to how registry works when statting
// and verifying uploads.
//
// In order to re-use code for loading manifest information from a blob
// into the DB we should load the config blob if necessary and store it
// in the context.
// Tackle manifest metadata such as layers, arch and OS
if v2m, ok := manifest.(*schema2.DeserializedManifest); ok {
// The target refers to the manifest config. We need this in order to store
// metadata such as the OS and architecture of this manifest, so instead of
// calling Stat we'll retrieve this blob and store it in the context for the
// Store to process
target := v2m.Target()
content, err := repo.Blobs(ctx).Get(ctx, target.Digest)
if err != nil {
return stacktrace.NewError("unable to retrieve manifest config for '%s:%s' (digest %s): %v", repoName, tagName, target.Digest, err)
}
ctx = context.WithValue(ctx, "target", content)
}
// Manifest's PKs are formatted as `namespace/repo@sha256:...`
named := repo.Named().String()
if err = m.store.PutManifest(ctx, named, tag.Digest.String(), manifest); err != nil {
return stacktrace.NewError("unable to save manifest in store for '%s:%s': %v", repoName, tagName, err)
}
if err = m.store.PutTag(ctx, repo, tagName, tag); err != nil {
return stacktrace.NewError("unable to save tag in store for '%s:%s': %v", repoName, tagName, err)
}
return nil
}

View file

@ -0,0 +1,275 @@
package migration
import (
"fmt"
"reflect"
"testing"
"github.com/docker/dhe-deploy/registry/middleware/mocks"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/cache/memory"
"github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/inmemory"
"github.com/stretchr/testify/mock"
)
const root = "/docker/registry/v2/"
type env struct {
registry distribution.Namespace
driver driver.StorageDriver
ctx context.Context
}
func setupRegistry(t *testing.T) *env {
d := inmemory.New()
ctx := context.Background()
registry, err := storage.NewRegistry(
ctx,
d,
storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()),
storage.EnableRedirect,
)
if err != nil {
t.Fatalf("error iunstantiating registry: %v", err)
}
// Add data to registry
var prefix = root + "repositories/admin/"
data := map[string]interface{}{
"content": map[string]string{
// REPOSITORIES
//a
prefix + "a-repo/_layers/sha256/1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566/link": "sha256:1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566",
prefix + "a-repo/_layers/sha256/6bf8e372a8396bbf22c0b2e0eebdad5ac3da97357621fe68de694bd4de23639d/link": "sha256:6bf8e372a8396bbf22c0b2e0eebdad5ac3da97357621fe68de694bd4de23639d",
prefix + "a-repo/_manifests/revisions/sha256/1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566/link": "sha256:1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566",
prefix + "a-repo/_manifests/tags/a-tag/current/link": "sha256:1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566",
prefix + "a-repo/_manifests/tags/a-tag/index/sha256/1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566/link": "sha256:1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566",
//b
prefix + "b-repo/_layers/sha256/1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566/link": "sha256:1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566",
prefix + "b-repo/_layers/sha256/6bf8e372a8396bbf22c0b2e0eebdad5ac3da97357621fe68de694bd4de23639d/link": "sha256:6bf8e372a8396bbf22c0b2e0eebdad5ac3da97357621fe68de694bd4de23639d",
prefix + "b-repo/_manifests/revisions/sha256/1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566/link": "sha256:1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566",
prefix + "b-repo/_manifests/tags/b-tag/current/link": "sha256:1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566",
prefix + "b-repo/_manifests/tags/b-tag/index/sha256/1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566/link": "sha256:1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566",
// MANIFESTS
root + "blobs/sha256/1f/1f8d6e1edee77de035d79ca992df4e5cc8d358ec38f527077a84945a79907566/data": V2_MANIFEST_1,
root + "blobs/sha256/6b/6bf8e372a8396bbf22c0b2e0eebdad5ac3da97357621fe68de694bd4de23639d/data": V2_MANIFEST_CONFIG_1,
},
}
for path, blob := range data["content"].(map[string]string) {
d.PutContent(ctx, path, []byte(blob))
}
return &env{
registry,
d,
ctx,
}
}
func TestMigrateResumes(t *testing.T) {
env := setupRegistry(t)
tests := []struct {
migration *migration
expectedRepos []string
}{
{
migration: &migration{
reg: env.registry,
isFromResume: false,
},
expectedRepos: []string{"admin/a-repo", "admin/b-repo"},
},
{
migration: &migration{
reg: env.registry,
isFromResume: true,
currentRepo: "admin/b-repo",
},
expectedRepos: []string{"admin/b-repo"},
},
}
for _, test := range tests {
// Iterate through the repositories, storing each repository name within
// iteratedRepos. We can then compare which repos were passed to onTagFunc
// to check resumes
iteratedRepos := []string{}
onTagFunc := func(ctx context.Context, repo distribution.Repository, tagName string, tag distribution.Descriptor) error {
iteratedRepos = append(iteratedRepos, repo.Named().Name())
return nil
}
test.migration.enumerator = NewEnumerator(onTagFunc)
if _, err := test.migration.Migrate(env.ctx); err != nil {
t.Fatalf("error migrating: %s", err)
}
if !reflect.DeepEqual(iteratedRepos, test.expectedRepos) {
t.Fatalf("resume failed, expected vs actual repo iteration: %s vs %s", test.expectedRepos, iteratedRepos)
}
}
}
// This is a basic test asserting that there are no obvious errors with
// the migration logic.
func TestAddTagAndManifest(t *testing.T) {
env := setupRegistry(t)
store := mocks.NewStore()
migration := NewMigration(env.registry, store)
store.TagStore.On(
"PutTag",
mock.AnythingOfType("*context.valueCtx"),
mock.AnythingOfTypeArgument("*storage.repository"),
mock.AnythingOfType("string"),
mock.AnythingOfType("distribution.Descriptor"),
).Return(nil).Run(func(a mock.Arguments) {
fmt.Printf("%#v", a)
})
store.ManifestStore.On(
"PutManifest",
mock.AnythingOfType("*context.valueCtx"),
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
mock.AnythingOfType("*schema2.DeserializedManifest"),
).Return(nil).Run(func(a mock.Arguments) {
fmt.Printf("%#v", a)
})
_, err := migration.Migrate(env.ctx)
if err != nil {
t.Fatalf("unexpected error during migration: %s", err)
}
}
// Assert that failing during a migration returns no error
// and instead only logs the error
func TestAddTagAndManifestReturnsNil(t *testing.T) {
env := setupRegistry(t)
store := mocks.NewStore()
migration := NewMigration(env.registry, store)
// When we get admin/a-repo we can fail fast.
store.TagStore.On(
"PutTag",
mock.AnythingOfType("*context.valueCtx"),
mock.AnythingOfTypeArgument("*storage.repository"),
mock.AnythingOfType("string"),
mock.AnythingOfType("distribution.Descriptor"),
).Return(nil)
store.ManifestStore.On(
"PutManifest",
mock.AnythingOfType("*context.valueCtx"),
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
mock.AnythingOfType("*schema2.DeserializedManifest"),
).Return(nil)
_, err := migration.Migrate(env.ctx)
if err != nil {
t.Fatalf("unexpected error during migration: %v", err)
}
}
const V2_MANIFEST_1 = `
{
"schemaVersion": 2,
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
"config": {
"mediaType": "application/vnd.docker.container.image.v1+json",
"size": 1473,
"digest": "sha256:6bf8e372a8396bbf22c0b2e0eebdad5ac3da97357621fe68de694bd4de23639d"
},
"layers": [
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 146,
"digest": "sha256:c170e8502f05562c30101cd65993e514cf63d242d6f14af6ca49896168c59ffd"
}
]
}
`
const V2_MANIFEST_CONFIG_1 = `
{
"architecture": "amd64",
"config": {
"Hostname": "9aec87ce8e45",
"Domainname": "",
"User": "",
"AttachStdin": false,
"AttachStdout": false,
"AttachStderr": false,
"Tty": false,
"OpenStdin": false,
"StdinOnce": false,
"Env": [
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
],
"Cmd": [
"/true"
],
"Image": "sha256:bbadf13f1e9e0d1629c07ad1e7eedcc5a6383300b7701c131a6f0beac49866ad",
"Volumes": null,
"WorkingDir": "",
"Entrypoint": null,
"OnBuild": null,
"Labels": {
}
},
"container": "dab58e1226ef3b699c25b7befc7cec562707a959135d130f667a039e18e63f72",
"container_config": {
"Hostname": "9aec87ce8e45",
"Domainname": "",
"User": "",
"AttachStdin": false,
"AttachStdout": false,
"AttachStderr": false,
"Tty": false,
"OpenStdin": false,
"StdinOnce": false,
"Env": [
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
],
"Cmd": [
"/bin/sh",
"-c",
"#(nop) CMD [\"/true\"]"
],
"Image": "sha256:bbadf13f1e9e0d1629c07ad1e7eedcc5a6383300b7701c131a6f0beac49866ad",
"Volumes": null,
"WorkingDir": "",
"Entrypoint": null,
"OnBuild": null,
"Labels": {
}
},
"created": "2016-05-19T20:38:48.345518736Z",
"docker_version": "1.11.1",
"history": [
{
"created": "2016-05-19T20:38:48.277232795Z",
"created_by": "/bin/sh -c #(nop) ADD file:513005a00bb6ce26c9eb571d6f16e0c12378ba40f8e3100bcb484db53008e3b2 in /true"
},
{
"created": "2016-05-19T20:38:48.345518736Z",
"created_by": "/bin/sh -c #(nop) CMD [\"/true\"]",
"empty_layer": true
}
],
"os": "linux",
"rootfs": {
"type": "layers",
"diff_ids": [
"sha256:af593d271f82964b57d51cc5e647c6076fb160bf8620f605848130110f0ed647"
]
}
}
`