diff --git a/docs/client/blob_writer.go b/docs/client/blob_writer.go index 5f6f01f7..c7eee4e8 100644 --- a/docs/client/blob_writer.go +++ b/docs/client/blob_writer.go @@ -25,6 +25,10 @@ type httpBlobUpload struct { closed bool } +func (hbu *httpBlobUpload) Reader() (io.ReadCloser, error) { + panic("Not implemented") +} + func (hbu *httpBlobUpload) handleErrorResponse(resp *http.Response) error { if resp.StatusCode == http.StatusNotFound { return distribution.ErrBlobUploadUnknown diff --git a/docs/client/repository.go b/docs/client/repository.go index d0079f09..c1e8e07f 100644 --- a/docs/client/repository.go +++ b/docs/client/repository.go @@ -280,14 +280,13 @@ func (ms *manifests) GetByTag(tag string, options ...distribution.ManifestServic } if _, ok := ms.etags[tag]; ok { - req.Header.Set("eTag", ms.etags[tag]) + req.Header.Set("If-None-Match", ms.etags[tag]) } resp, err := ms.client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() - if resp.StatusCode == http.StatusNotModified { return nil, nil } else if SuccessStatus(resp.StatusCode) { diff --git a/docs/client/repository_test.go b/docs/client/repository_test.go index 7219fff1..26201763 100644 --- a/docs/client/repository_test.go +++ b/docs/client/repository_test.go @@ -463,7 +463,7 @@ func addTestManifestWithEtag(repo, reference string, content []byte, m *testutil Method: "GET", Route: "/v2/" + repo + "/manifests/" + reference, Headers: http.Header(map[string][]string{ - "Etag": {fmt.Sprintf(`"%s"`, dgst)}, + "If-None-Match": {fmt.Sprintf(`"%s"`, dgst)}, }), } diff --git a/docs/handlers/app.go b/docs/handlers/app.go index 1fcf13fc..f60290d0 100644 --- a/docs/handlers/app.go +++ b/docs/handlers/app.go @@ -20,6 +20,7 @@ import ( "github.com/docker/distribution/registry/auth" registrymiddleware "github.com/docker/distribution/registry/middleware/registry" repositorymiddleware "github.com/docker/distribution/registry/middleware/repository" + "github.com/docker/distribution/registry/proxy" "github.com/docker/distribution/registry/storage" memorycache "github.com/docker/distribution/registry/storage/cache/memory" rediscache "github.com/docker/distribution/registry/storage/cache/redis" @@ -55,6 +56,9 @@ type App struct { } redis *redis.Pool + + // true if this registry is configured as a pull through cache + isCache bool } // NewApp takes a configuration and returns a configured app, ready to serve @@ -65,6 +69,7 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App Config: configuration, Context: ctx, router: v2.RouterWithPrefix(configuration.HTTP.Prefix), + isCache: configuration.Proxy.RemoteURL != "", } app.Context = ctxu.WithLogger(app.Context, ctxu.GetLogger(app, "instance.id")) @@ -152,10 +157,10 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App if app.redis == nil { panic("redis configuration required to use for layerinfo cache") } - app.registry = storage.NewRegistryWithDriver(app, app.driver, rediscache.NewRedisBlobDescriptorCacheProvider(app.redis), deleteEnabled, !redirectDisabled) + app.registry = storage.NewRegistryWithDriver(app, app.driver, rediscache.NewRedisBlobDescriptorCacheProvider(app.redis), deleteEnabled, !redirectDisabled, app.isCache) ctxu.GetLogger(app).Infof("using redis blob descriptor cache") case "inmemory": - app.registry = storage.NewRegistryWithDriver(app, app.driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), deleteEnabled, !redirectDisabled) + app.registry = storage.NewRegistryWithDriver(app, app.driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), deleteEnabled, !redirectDisabled, app.isCache) ctxu.GetLogger(app).Infof("using inmemory blob descriptor cache") default: if v != "" { @@ -166,10 +171,10 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App if app.registry == nil { // configure the registry if no cache section is available. - app.registry = storage.NewRegistryWithDriver(app.Context, app.driver, nil, deleteEnabled, !redirectDisabled) + app.registry = storage.NewRegistryWithDriver(app.Context, app.driver, nil, deleteEnabled, !redirectDisabled, app.isCache) } - app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"]) + app.registry, err = applyRegistryMiddleware(app.Context, app.registry, configuration.Middleware["registry"]) if err != nil { panic(err) } @@ -185,6 +190,16 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App ctxu.GetLogger(app).Debugf("configured %q access controller", authType) } + // configure as a pull through cache + if configuration.Proxy.RemoteURL != "" { + app.registry, err = proxy.NewRegistryPullThroughCache(ctx, app.registry, app.driver, configuration.Proxy) + if err != nil { + panic(err.Error()) + } + app.isCache = true + ctxu.GetLogger(app).Info("Registry configured as a proxy cache to ", configuration.Proxy.RemoteURL) + } + return app } @@ -447,7 +462,7 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler { repository, app.eventBridge(context, r)) - context.Repository, err = applyRepoMiddleware(context.Repository, app.Config.Middleware["repository"]) + context.Repository, err = applyRepoMiddleware(context.Context, context.Repository, app.Config.Middleware["repository"]) if err != nil { ctxu.GetLogger(context).Errorf("error initializing repository middleware: %v", err) context.Errors = append(context.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) @@ -668,9 +683,9 @@ func appendCatalogAccessRecord(accessRecords []auth.Access, r *http.Request) []a } // applyRegistryMiddleware wraps a registry instance with the configured middlewares -func applyRegistryMiddleware(registry distribution.Namespace, middlewares []configuration.Middleware) (distribution.Namespace, error) { +func applyRegistryMiddleware(ctx context.Context, registry distribution.Namespace, middlewares []configuration.Middleware) (distribution.Namespace, error) { for _, mw := range middlewares { - rmw, err := registrymiddleware.Get(mw.Name, mw.Options, registry) + rmw, err := registrymiddleware.Get(ctx, mw.Name, mw.Options, registry) if err != nil { return nil, fmt.Errorf("unable to configure registry middleware (%s): %s", mw.Name, err) } @@ -681,9 +696,9 @@ func applyRegistryMiddleware(registry distribution.Namespace, middlewares []conf } // applyRepoMiddleware wraps a repository with the configured middlewares -func applyRepoMiddleware(repository distribution.Repository, middlewares []configuration.Middleware) (distribution.Repository, error) { +func applyRepoMiddleware(ctx context.Context, repository distribution.Repository, middlewares []configuration.Middleware) (distribution.Repository, error) { for _, mw := range middlewares { - rmw, err := repositorymiddleware.Get(mw.Name, mw.Options, repository) + rmw, err := repositorymiddleware.Get(ctx, mw.Name, mw.Options, repository) if err != nil { return nil, err } diff --git a/docs/handlers/app_test.go b/docs/handlers/app_test.go index 84d842e3..6f597527 100644 --- a/docs/handlers/app_test.go +++ b/docs/handlers/app_test.go @@ -31,7 +31,7 @@ func TestAppDispatcher(t *testing.T) { Context: ctx, router: v2.Router(), driver: driver, - registry: storage.NewRegistryWithDriver(ctx, driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), true, true), + registry: storage.NewRegistryWithDriver(ctx, driver, memorycache.NewInMemoryBlobDescriptorCacheProvider(), true, true, false), } server := httptest.NewServer(app) router := v2.Router() diff --git a/docs/middleware/registry/middleware.go b/docs/middleware/registry/middleware.go index 048603b8..7535c6db 100644 --- a/docs/middleware/registry/middleware.go +++ b/docs/middleware/registry/middleware.go @@ -4,11 +4,12 @@ import ( "fmt" "github.com/docker/distribution" + "github.com/docker/distribution/context" ) // InitFunc is the type of a RegistryMiddleware factory function and is // used to register the constructor for different RegistryMiddleware backends. -type InitFunc func(registry distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error) +type InitFunc func(ctx context.Context, registry distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error) var middlewares map[string]InitFunc @@ -28,10 +29,10 @@ func Register(name string, initFunc InitFunc) error { } // Get constructs a RegistryMiddleware with the given options using the named backend. -func Get(name string, options map[string]interface{}, registry distribution.Namespace) (distribution.Namespace, error) { +func Get(ctx context.Context, name string, options map[string]interface{}, registry distribution.Namespace) (distribution.Namespace, error) { if middlewares != nil { if initFunc, exists := middlewares[name]; exists { - return initFunc(registry, options) + return initFunc(ctx, registry, options) } } diff --git a/docs/middleware/repository/middleware.go b/docs/middleware/repository/middleware.go index d6330fc4..27b42aec 100644 --- a/docs/middleware/repository/middleware.go +++ b/docs/middleware/repository/middleware.go @@ -4,11 +4,12 @@ import ( "fmt" "github.com/docker/distribution" + "github.com/docker/distribution/context" ) // InitFunc is the type of a RepositoryMiddleware factory function and is // used to register the constructor for different RepositoryMiddleware backends. -type InitFunc func(repository distribution.Repository, options map[string]interface{}) (distribution.Repository, error) +type InitFunc func(ctx context.Context, repository distribution.Repository, options map[string]interface{}) (distribution.Repository, error) var middlewares map[string]InitFunc @@ -28,10 +29,10 @@ func Register(name string, initFunc InitFunc) error { } // Get constructs a RepositoryMiddleware with the given options using the named backend. -func Get(name string, options map[string]interface{}, repository distribution.Repository) (distribution.Repository, error) { +func Get(ctx context.Context, name string, options map[string]interface{}, repository distribution.Repository) (distribution.Repository, error) { if middlewares != nil { if initFunc, exists := middlewares[name]; exists { - return initFunc(repository, options) + return initFunc(ctx, repository, options) } } diff --git a/docs/proxy/proxyauth.go b/docs/proxy/proxyauth.go new file mode 100644 index 00000000..e4bec75a --- /dev/null +++ b/docs/proxy/proxyauth.go @@ -0,0 +1,54 @@ +package proxy + +import ( + "net/http" + "net/url" + + "github.com/docker/distribution/registry/client/auth" +) + +const tokenURL = "https://auth.docker.io/token" + +type userpass struct { + username string + password string +} + +type credentials struct { + creds map[string]userpass +} + +func (c credentials) Basic(u *url.URL) (string, string) { + up := c.creds[u.String()] + + return up.username, up.password +} + +// ConfigureAuth authorizes with the upstream registry +func ConfigureAuth(remoteURL, username, password string, cm auth.ChallengeManager) (auth.CredentialStore, error) { + if err := ping(cm, remoteURL+"/v2/", "Docker-Distribution-Api-Version"); err != nil { + return nil, err + } + + creds := map[string]userpass{ + tokenURL: { + username: username, + password: password, + }, + } + return credentials{creds: creds}, nil +} + +func ping(manager auth.ChallengeManager, endpoint, versionHeader string) error { + resp, err := http.Get(endpoint) + if err != nil { + return err + } + defer resp.Body.Close() + + if err := manager.AddResponse(resp); err != nil { + return err + } + + return nil +} diff --git a/docs/proxy/proxyblobstore.go b/docs/proxy/proxyblobstore.go new file mode 100644 index 00000000..b480a111 --- /dev/null +++ b/docs/proxy/proxyblobstore.go @@ -0,0 +1,214 @@ +package proxy + +import ( + "io" + "net/http" + "strconv" + "sync" + "time" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/registry/proxy/scheduler" +) + +// todo(richardscothern): from cache control header or config file +const blobTTL = time.Duration(24 * 7 * time.Hour) + +type proxyBlobStore struct { + localStore distribution.BlobStore + remoteStore distribution.BlobService + scheduler *scheduler.TTLExpirationScheduler +} + +var _ distribution.BlobStore = proxyBlobStore{} + +type inflightBlob struct { + refCount int + bw distribution.BlobWriter +} + +// inflight tracks currently downloading blobs +var inflight = make(map[digest.Digest]*inflightBlob) + +// mu protects inflight +var mu sync.Mutex + +func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) { + w.Header().Set("Content-Length", strconv.FormatInt(length, 10)) + w.Header().Set("Content-Type", mediaType) + w.Header().Set("Docker-Content-Digest", digest.String()) + w.Header().Set("Etag", digest.String()) +} + +func (pbs proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { + desc, err := pbs.localStore.Stat(ctx, dgst) + if err != nil && err != distribution.ErrBlobUnknown { + return err + } + + if err == nil { + proxyMetrics.BlobPush(uint64(desc.Size)) + return pbs.localStore.ServeBlob(ctx, w, r, dgst) + } + + desc, err = pbs.remoteStore.Stat(ctx, dgst) + if err != nil { + return err + } + + remoteReader, err := pbs.remoteStore.Open(ctx, dgst) + if err != nil { + return err + } + + bw, isNew, cleanup, err := getOrCreateBlobWriter(ctx, pbs.localStore, desc) + if err != nil { + return err + } + defer cleanup() + + if isNew { + go func() { + err := streamToStorage(ctx, remoteReader, desc, bw) + if err != nil { + context.GetLogger(ctx).Error(err) + } + + proxyMetrics.BlobPull(uint64(desc.Size)) + }() + err := streamToClient(ctx, w, desc, bw) + if err != nil { + return err + } + + proxyMetrics.BlobPush(uint64(desc.Size)) + pbs.scheduler.AddBlob(dgst.String(), blobTTL) + return nil + } + + err = streamToClient(ctx, w, desc, bw) + if err != nil { + return err + } + proxyMetrics.BlobPush(uint64(desc.Size)) + return nil +} + +type cleanupFunc func() + +// getOrCreateBlobWriter will track which blobs are currently being downloaded and enable client requesting +// the same blob concurrently to read from the existing stream. +func getOrCreateBlobWriter(ctx context.Context, blobs distribution.BlobService, desc distribution.Descriptor) (distribution.BlobWriter, bool, cleanupFunc, error) { + mu.Lock() + defer mu.Unlock() + dgst := desc.Digest + + cleanup := func() { + mu.Lock() + defer mu.Unlock() + inflight[dgst].refCount-- + + if inflight[dgst].refCount == 0 { + defer delete(inflight, dgst) + _, err := inflight[dgst].bw.Commit(ctx, desc) + if err != nil { + // There is a narrow race here where Commit can be called while this blob's TTL is expiring + // and its being removed from storage. In that case, the client stream will continue + // uninterruped and the blob will be pulled through on the next request, so just log it + context.GetLogger(ctx).Errorf("Error committing blob: %q", err) + } + + } + } + + var bw distribution.BlobWriter + _, ok := inflight[dgst] + if ok { + bw = inflight[dgst].bw + inflight[dgst].refCount++ + return bw, false, cleanup, nil + } + + var err error + bw, err = blobs.Create(ctx) + if err != nil { + return nil, false, nil, err + } + + inflight[dgst] = &inflightBlob{refCount: 1, bw: bw} + return bw, true, cleanup, nil +} + +func streamToStorage(ctx context.Context, remoteReader distribution.ReadSeekCloser, desc distribution.Descriptor, bw distribution.BlobWriter) error { + _, err := io.CopyN(bw, remoteReader, desc.Size) + if err != nil { + return err + } + + return nil +} + +func streamToClient(ctx context.Context, w http.ResponseWriter, desc distribution.Descriptor, bw distribution.BlobWriter) error { + setResponseHeaders(w, desc.Size, desc.MediaType, desc.Digest) + + reader, err := bw.Reader() + if err != nil { + return err + } + defer reader.Close() + teeReader := io.TeeReader(reader, w) + buf := make([]byte, 32768, 32786) + var soFar int64 + for { + rd, err := teeReader.Read(buf) + if err == nil || err == io.EOF { + soFar += int64(rd) + if soFar < desc.Size { + // buffer underflow, keep trying + continue + } + return nil + } + return err + } +} + +func (pbs proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + desc, err := pbs.localStore.Stat(ctx, dgst) + if err == nil { + return desc, err + } + + if err != distribution.ErrBlobUnknown { + return distribution.Descriptor{}, err + } + + return pbs.remoteStore.Stat(ctx, dgst) +} + +// Unsupported functions +func (pbs proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { + return distribution.Descriptor{}, distribution.ErrUnsupported +} + +func (pbs proxyBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) { + return nil, distribution.ErrUnsupported +} + +func (pbs proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { + return nil, distribution.ErrUnsupported +} + +func (pbs proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { + return nil, distribution.ErrUnsupported +} + +func (pbs proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { + return nil, distribution.ErrUnsupported +} + +func (pbs proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { + return distribution.ErrUnsupported +} diff --git a/docs/proxy/proxyblobstore_test.go b/docs/proxy/proxyblobstore_test.go new file mode 100644 index 00000000..65d5f922 --- /dev/null +++ b/docs/proxy/proxyblobstore_test.go @@ -0,0 +1,231 @@ +package proxy + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/registry/proxy/scheduler" + "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/registry/storage/cache/memory" + "github.com/docker/distribution/registry/storage/driver/inmemory" +) + +type statsBlobStore struct { + stats map[string]int + blobs distribution.BlobStore +} + +func (sbs statsBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { + sbs.stats["put"]++ + return sbs.blobs.Put(ctx, mediaType, p) +} + +func (sbs statsBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { + sbs.stats["get"]++ + return sbs.blobs.Get(ctx, dgst) +} + +func (sbs statsBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) { + sbs.stats["create"]++ + return sbs.blobs.Create(ctx) +} + +func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { + sbs.stats["resume"]++ + return sbs.blobs.Resume(ctx, id) +} + +func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { + sbs.stats["open"]++ + return sbs.blobs.Open(ctx, dgst) +} + +func (sbs statsBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { + sbs.stats["serveblob"]++ + return sbs.blobs.ServeBlob(ctx, w, r, dgst) +} + +func (sbs statsBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + sbs.stats["stat"]++ + return sbs.blobs.Stat(ctx, dgst) +} + +func (sbs statsBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { + sbs.stats["delete"]++ + return sbs.blobs.Delete(ctx, dgst) +} + +type testEnv struct { + inRemote []distribution.Descriptor + store proxyBlobStore + ctx context.Context +} + +func (te testEnv) LocalStats() *map[string]int { + ls := te.store.localStore.(statsBlobStore).stats + return &ls +} + +func (te testEnv) RemoteStats() *map[string]int { + rs := te.store.remoteStore.(statsBlobStore).stats + return &rs +} + +// Populate remote store and record the digests +func makeTestEnv(t *testing.T, name string) testEnv { + ctx := context.Background() + + localRegistry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, true) + localRepo, err := localRegistry.Repository(ctx, name) + if err != nil { + t.Fatalf("unexpected error getting repo: %v", err) + } + + truthRegistry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), false, false, false) + truthRepo, err := truthRegistry.Repository(ctx, name) + if err != nil { + t.Fatalf("unexpected error getting repo: %v", err) + } + + truthBlobs := statsBlobStore{ + stats: make(map[string]int), + blobs: truthRepo.Blobs(ctx), + } + + localBlobs := statsBlobStore{ + stats: make(map[string]int), + blobs: localRepo.Blobs(ctx), + } + + s := scheduler.New(ctx, inmemory.New(), "/scheduler-state.json") + + proxyBlobStore := proxyBlobStore{ + remoteStore: truthBlobs, + localStore: localBlobs, + scheduler: s, + } + + te := testEnv{ + store: proxyBlobStore, + ctx: ctx, + } + return te +} + +func populate(t *testing.T, te *testEnv, blobCount int) { + var inRemote []distribution.Descriptor + for i := 0; i < blobCount; i++ { + bytes := []byte(fmt.Sprintf("blob%d", i)) + + desc, err := te.store.remoteStore.Put(te.ctx, "", bytes) + if err != nil { + t.Errorf("Put in store") + } + inRemote = append(inRemote, desc) + } + + te.inRemote = inRemote + +} + +func TestProxyStoreStat(t *testing.T) { + te := makeTestEnv(t, "foo/bar") + remoteBlobCount := 1 + populate(t, &te, remoteBlobCount) + + localStats := te.LocalStats() + remoteStats := te.RemoteStats() + + // Stat - touches both stores + for _, d := range te.inRemote { + _, err := te.store.Stat(te.ctx, d.Digest) + if err != nil { + t.Fatalf("Error stating proxy store") + } + } + + if (*localStats)["stat"] != remoteBlobCount { + t.Errorf("Unexpected local stat count") + } + + if (*remoteStats)["stat"] != remoteBlobCount { + t.Errorf("Unexpected remote stat count") + } +} + +func TestProxyStoreServe(t *testing.T) { + te := makeTestEnv(t, "foo/bar") + remoteBlobCount := 1 + populate(t, &te, remoteBlobCount) + + localStats := te.LocalStats() + remoteStats := te.RemoteStats() + + // Serveblob - pulls through blobs + for _, dr := range te.inRemote { + w := httptest.NewRecorder() + r, err := http.NewRequest("GET", "", nil) + if err != nil { + t.Fatal(err) + } + + err = te.store.ServeBlob(te.ctx, w, r, dr.Digest) + if err != nil { + t.Fatalf(err.Error()) + } + + dl, err := digest.FromBytes(w.Body.Bytes()) + if err != nil { + t.Fatalf("Error making digest from blob") + } + if dl != dr.Digest { + t.Errorf("Mismatching blob fetch from proxy") + } + } + + if (*localStats)["stat"] != remoteBlobCount && (*localStats)["create"] != remoteBlobCount { + t.Fatalf("unexpected local stats") + } + if (*remoteStats)["stat"] != remoteBlobCount && (*remoteStats)["open"] != remoteBlobCount { + t.Fatalf("unexpected local stats") + } + + // Serveblob - blobs come from local + for _, dr := range te.inRemote { + w := httptest.NewRecorder() + r, err := http.NewRequest("GET", "", nil) + if err != nil { + t.Fatal(err) + } + + err = te.store.ServeBlob(te.ctx, w, r, dr.Digest) + if err != nil { + t.Fatalf(err.Error()) + } + + dl, err := digest.FromBytes(w.Body.Bytes()) + if err != nil { + t.Fatalf("Error making digest from blob") + } + if dl != dr.Digest { + t.Errorf("Mismatching blob fetch from proxy") + } + } + + // Stat to find local, but no new blobs were created + if (*localStats)["stat"] != remoteBlobCount*2 && (*localStats)["create"] != remoteBlobCount*2 { + t.Fatalf("unexpected local stats") + } + + // Remote unchanged + if (*remoteStats)["stat"] != remoteBlobCount && (*remoteStats)["open"] != remoteBlobCount { + fmt.Printf("\tlocal=%#v, \n\tremote=%#v\n", localStats, remoteStats) + t.Fatalf("unexpected local stats") + } + +} diff --git a/docs/proxy/proxymanifeststore.go b/docs/proxy/proxymanifeststore.go new file mode 100644 index 00000000..5b79c8ce --- /dev/null +++ b/docs/proxy/proxymanifeststore.go @@ -0,0 +1,155 @@ +package proxy + +import ( + "time" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/registry/api/v2" + "github.com/docker/distribution/registry/client" + "github.com/docker/distribution/registry/proxy/scheduler" +) + +// todo(richardscothern): from cache control header or config +const repositoryTTL = time.Duration(24 * 7 * time.Hour) + +type proxyManifestStore struct { + ctx context.Context + localManifests distribution.ManifestService + remoteManifests distribution.ManifestService + repositoryName string + scheduler *scheduler.TTLExpirationScheduler +} + +var _ distribution.ManifestService = &proxyManifestStore{} + +func (pms proxyManifestStore) Exists(dgst digest.Digest) (bool, error) { + exists, err := pms.localManifests.Exists(dgst) + if err != nil { + return false, err + } + if exists { + return true, nil + } + + return pms.remoteManifests.Exists(dgst) +} + +func (pms proxyManifestStore) Get(dgst digest.Digest) (*manifest.SignedManifest, error) { + sm, err := pms.localManifests.Get(dgst) + if err == nil { + proxyMetrics.ManifestPush(uint64(len(sm.Raw))) + return sm, err + } + + sm, err = pms.remoteManifests.Get(dgst) + if err != nil { + return nil, err + } + + proxyMetrics.ManifestPull(uint64(len(sm.Raw))) + err = pms.localManifests.Put(sm) + if err != nil { + return nil, err + } + + // Schedule the repo for removal + pms.scheduler.AddManifest(pms.repositoryName, repositoryTTL) + + // Ensure the manifest blob is cleaned up + pms.scheduler.AddBlob(dgst.String(), repositoryTTL) + + proxyMetrics.ManifestPush(uint64(len(sm.Raw))) + + return sm, err +} + +func (pms proxyManifestStore) Tags() ([]string, error) { + return pms.localManifests.Tags() +} + +func (pms proxyManifestStore) ExistsByTag(tag string) (bool, error) { + exists, err := pms.localManifests.ExistsByTag(tag) + if err != nil { + return false, err + } + if exists { + return true, nil + } + + return pms.remoteManifests.ExistsByTag(tag) +} + +func (pms proxyManifestStore) GetByTag(tag string, options ...distribution.ManifestServiceOption) (*manifest.SignedManifest, error) { + var localDigest digest.Digest + + localManifest, err := pms.localManifests.GetByTag(tag, options...) + switch err.(type) { + case distribution.ErrManifestUnknown, distribution.ErrManifestUnknownRevision: + goto fromremote + case nil: + break + default: + return nil, err + } + + localDigest, err = manifestDigest(localManifest) + if err != nil { + return nil, err + } + +fromremote: + var sm *manifest.SignedManifest + sm, err = pms.remoteManifests.GetByTag(tag, client.AddEtagToTag(tag, localDigest.String())) + if err != nil { + return nil, err + } + + if sm == nil { + context.GetLogger(pms.ctx).Debugf("Local manifest for %q is latest, dgst=%s", tag, localDigest.String()) + return localManifest, nil + } + context.GetLogger(pms.ctx).Debugf("Updated manifest for %q, dgst=%s", tag, localDigest.String()) + + err = pms.localManifests.Put(sm) + if err != nil { + return nil, err + } + + dgst, err := manifestDigest(sm) + if err != nil { + return nil, err + } + pms.scheduler.AddBlob(dgst.String(), repositoryTTL) + pms.scheduler.AddManifest(pms.repositoryName, repositoryTTL) + + proxyMetrics.ManifestPull(uint64(len(sm.Raw))) + proxyMetrics.ManifestPush(uint64(len(sm.Raw))) + + return sm, err +} + +func manifestDigest(sm *manifest.SignedManifest) (digest.Digest, error) { + payload, err := sm.Payload() + if err != nil { + return "", err + + } + + dgst, err := digest.FromBytes(payload) + if err != nil { + return "", err + } + + return dgst, nil +} + +func (pms proxyManifestStore) Put(manifest *manifest.SignedManifest) error { + return v2.ErrorCodeUnsupported +} + +func (pms proxyManifestStore) Delete(dgst digest.Digest) error { + return v2.ErrorCodeUnsupported +} diff --git a/docs/proxy/proxymanifeststore_test.go b/docs/proxy/proxymanifeststore_test.go new file mode 100644 index 00000000..7b9b8091 --- /dev/null +++ b/docs/proxy/proxymanifeststore_test.go @@ -0,0 +1,235 @@ +package proxy + +import ( + "io" + "testing" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/registry/proxy/scheduler" + "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/registry/storage/cache/memory" + "github.com/docker/distribution/registry/storage/driver/inmemory" + "github.com/docker/distribution/testutil" + "github.com/docker/libtrust" +) + +type statsManifest struct { + manifests distribution.ManifestService + stats map[string]int +} + +type manifestStoreTestEnv struct { + manifestDigest digest.Digest // digest of the signed manifest in the local storage + manifests proxyManifestStore +} + +func (te manifestStoreTestEnv) LocalStats() *map[string]int { + ls := te.manifests.localManifests.(statsManifest).stats + return &ls +} + +func (te manifestStoreTestEnv) RemoteStats() *map[string]int { + rs := te.manifests.remoteManifests.(statsManifest).stats + return &rs +} + +func (sm statsManifest) Delete(dgst digest.Digest) error { + sm.stats["delete"]++ + return sm.manifests.Delete(dgst) +} + +func (sm statsManifest) Exists(dgst digest.Digest) (bool, error) { + sm.stats["exists"]++ + return sm.manifests.Exists(dgst) +} + +func (sm statsManifest) ExistsByTag(tag string) (bool, error) { + sm.stats["existbytag"]++ + return sm.manifests.ExistsByTag(tag) +} + +func (sm statsManifest) Get(dgst digest.Digest) (*manifest.SignedManifest, error) { + sm.stats["get"]++ + return sm.manifests.Get(dgst) +} + +func (sm statsManifest) GetByTag(tag string, options ...distribution.ManifestServiceOption) (*manifest.SignedManifest, error) { + sm.stats["getbytag"]++ + return sm.manifests.GetByTag(tag, options...) +} + +func (sm statsManifest) Put(manifest *manifest.SignedManifest) error { + sm.stats["put"]++ + return sm.manifests.Put(manifest) +} + +func (sm statsManifest) Tags() ([]string, error) { + sm.stats["tags"]++ + return sm.manifests.Tags() +} + +func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv { + ctx := context.Background() + truthRegistry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), false, false, false) + truthRepo, err := truthRegistry.Repository(ctx, name) + if err != nil { + t.Fatalf("unexpected error getting repo: %v", err) + } + tr, err := truthRepo.Manifests(ctx) + if err != nil { + t.Fatal(err.Error()) + } + truthManifests := statsManifest{ + manifests: tr, + stats: make(map[string]int), + } + + manifestDigest, err := populateRepo(t, ctx, truthRepo, name, tag) + if err != nil { + t.Fatalf(err.Error()) + } + + localRegistry := storage.NewRegistryWithDriver(ctx, inmemory.New(), memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, true) + localRepo, err := localRegistry.Repository(ctx, name) + if err != nil { + t.Fatalf("unexpected error getting repo: %v", err) + } + lr, err := localRepo.Manifests(ctx) + if err != nil { + t.Fatal(err.Error()) + } + + localManifests := statsManifest{ + manifests: lr, + stats: make(map[string]int), + } + + s := scheduler.New(ctx, inmemory.New(), "/scheduler-state.json") + return &manifestStoreTestEnv{ + manifestDigest: manifestDigest, + manifests: proxyManifestStore{ + ctx: ctx, + localManifests: localManifests, + remoteManifests: truthManifests, + scheduler: s, + }, + } +} + +func populateRepo(t *testing.T, ctx context.Context, repository distribution.Repository, name, tag string) (digest.Digest, error) { + m := manifest.Manifest{ + Versioned: manifest.Versioned{ + SchemaVersion: 1, + }, + Name: name, + Tag: tag, + } + + for i := 0; i < 2; i++ { + wr, err := repository.Blobs(ctx).Create(ctx) + if err != nil { + t.Fatalf("unexpected error creating test upload: %v", err) + } + + rs, ts, err := testutil.CreateRandomTarFile() + if err != nil { + t.Fatalf("unexpected error generating test layer file") + } + dgst := digest.Digest(ts) + if _, err := io.Copy(wr, rs); err != nil { + t.Fatalf("unexpected error copying to upload: %v", err) + } + + if _, err := wr.Commit(ctx, distribution.Descriptor{Digest: dgst}); err != nil { + t.Fatalf("unexpected error finishing upload: %v", err) + } + } + + pk, err := libtrust.GenerateECP256PrivateKey() + if err != nil { + t.Fatalf("unexpected error generating private key: %v", err) + } + + sm, err := manifest.Sign(&m, pk) + if err != nil { + t.Fatalf("error signing manifest: %v", err) + } + + ms, err := repository.Manifests(ctx) + if err != nil { + t.Fatalf(err.Error()) + } + ms.Put(sm) + if err != nil { + t.Fatalf("unexpected errors putting manifest: %v", err) + } + pl, err := sm.Payload() + if err != nil { + t.Fatal(err) + } + return digest.FromBytes(pl) +} + +// TestProxyManifests contains basic acceptance tests +// for the pull-through behavior +func TestProxyManifests(t *testing.T) { + name := "foo/bar" + env := newManifestStoreTestEnv(t, name, "latest") + + localStats := env.LocalStats() + remoteStats := env.RemoteStats() + + // Stat - must check local and remote + exists, err := env.manifests.ExistsByTag("latest") + if err != nil { + t.Fatalf("Error checking existance") + } + if !exists { + t.Errorf("Unexpected non-existant manifest") + } + + if (*localStats)["existbytag"] != 1 && (*remoteStats)["existbytag"] != 1 { + t.Errorf("Unexpected exists count") + } + + // Get - should succeed and pull manifest into local + _, err = env.manifests.Get(env.manifestDigest) + if err != nil { + t.Fatal(err) + } + if (*localStats)["get"] != 1 && (*remoteStats)["get"] != 1 { + t.Errorf("Unexpected get count") + } + + if (*localStats)["put"] != 1 { + t.Errorf("Expected local put") + } + + // Stat - should only go to local + exists, err = env.manifests.ExistsByTag("latest") + if err != nil { + t.Fatal(err) + } + if !exists { + t.Errorf("Unexpected non-existant manifest") + } + + if (*localStats)["existbytag"] != 2 && (*remoteStats)["existbytag"] != 1 { + t.Errorf("Unexpected exists count") + + } + + // Get - should get from remote, to test freshness + _, err = env.manifests.Get(env.manifestDigest) + if err != nil { + t.Fatal(err) + } + + if (*remoteStats)["get"] != 2 && (*remoteStats)["existsbytag"] != 1 && (*localStats)["put"] != 1 { + t.Errorf("Unexpected get count") + } + +} diff --git a/docs/proxy/proxymetrics.go b/docs/proxy/proxymetrics.go new file mode 100644 index 00000000..d3d84d78 --- /dev/null +++ b/docs/proxy/proxymetrics.go @@ -0,0 +1,74 @@ +package proxy + +import ( + "expvar" + "sync/atomic" +) + +// Metrics is used to hold metric counters +// related to the proxy +type Metrics struct { + Requests uint64 + Hits uint64 + Misses uint64 + BytesPulled uint64 + BytesPushed uint64 +} + +type proxyMetricsCollector struct { + blobMetrics Metrics + manifestMetrics Metrics +} + +// BlobPull tracks metrics about blobs pulled into the cache +func (pmc *proxyMetricsCollector) BlobPull(bytesPulled uint64) { + atomic.AddUint64(&pmc.blobMetrics.Misses, 1) + atomic.AddUint64(&pmc.blobMetrics.BytesPulled, bytesPulled) +} + +// BlobPush tracks metrics about blobs pushed to clients +func (pmc *proxyMetricsCollector) BlobPush(bytesPushed uint64) { + atomic.AddUint64(&pmc.blobMetrics.Requests, 1) + atomic.AddUint64(&pmc.blobMetrics.Hits, 1) + atomic.AddUint64(&pmc.blobMetrics.BytesPushed, bytesPushed) +} + +// ManifestPull tracks metrics related to Manifests pulled into the cache +func (pmc *proxyMetricsCollector) ManifestPull(bytesPulled uint64) { + atomic.AddUint64(&pmc.manifestMetrics.Misses, 1) + atomic.AddUint64(&pmc.manifestMetrics.BytesPulled, bytesPulled) +} + +// ManifestPush tracks metrics about manifests pushed to clients +func (pmc *proxyMetricsCollector) ManifestPush(bytesPushed uint64) { + atomic.AddUint64(&pmc.manifestMetrics.Requests, 1) + atomic.AddUint64(&pmc.manifestMetrics.Hits, 1) + atomic.AddUint64(&pmc.manifestMetrics.BytesPushed, bytesPushed) +} + +// proxyMetrics tracks metrics about the proxy cache. This is +// kept globally and made available via expvar. +var proxyMetrics = &proxyMetricsCollector{} + +func init() { + registry := expvar.Get("registry") + if registry == nil { + registry = expvar.NewMap("registry") + } + + pm := registry.(*expvar.Map).Get("proxy") + if pm == nil { + pm = &expvar.Map{} + pm.(*expvar.Map).Init() + registry.(*expvar.Map).Set("proxy", pm) + } + + pm.(*expvar.Map).Set("blobs", expvar.Func(func() interface{} { + return proxyMetrics.blobMetrics + })) + + pm.(*expvar.Map).Set("manifests", expvar.Func(func() interface{} { + return proxyMetrics.manifestMetrics + })) + +} diff --git a/docs/proxy/proxyregistry.go b/docs/proxy/proxyregistry.go new file mode 100644 index 00000000..e9dec2f7 --- /dev/null +++ b/docs/proxy/proxyregistry.go @@ -0,0 +1,139 @@ +package proxy + +import ( + "net/http" + "net/url" + + "github.com/docker/distribution" + "github.com/docker/distribution/configuration" + "github.com/docker/distribution/context" + "github.com/docker/distribution/registry/client" + "github.com/docker/distribution/registry/client/auth" + "github.com/docker/distribution/registry/client/transport" + "github.com/docker/distribution/registry/proxy/scheduler" + "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/registry/storage/driver" +) + +// proxyingRegistry fetches content from a remote registry and caches it locally +type proxyingRegistry struct { + embedded distribution.Namespace // provides local registry functionality + + scheduler *scheduler.TTLExpirationScheduler + + remoteURL string + credentialStore auth.CredentialStore + challengeManager auth.ChallengeManager +} + +// NewRegistryPullThroughCache creates a registry acting as a pull through cache +func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Namespace, driver driver.StorageDriver, config configuration.Proxy) (distribution.Namespace, error) { + _, err := url.Parse(config.RemoteURL) + if err != nil { + return nil, err + } + + v := storage.NewVacuum(ctx, driver) + + s := scheduler.New(ctx, driver, "/scheduler-state.json") + s.OnBlobExpire(func(digest string) error { + return v.RemoveBlob(digest) + }) + s.OnManifestExpire(func(repoName string) error { + return v.RemoveRepository(repoName) + }) + err = s.Start() + if err != nil { + return nil, err + } + + challengeManager := auth.NewSimpleChallengeManager() + cs, err := ConfigureAuth(config.RemoteURL, config.Username, config.Password, challengeManager) + if err != nil { + return nil, err + } + + return &proxyingRegistry{ + embedded: registry, + scheduler: s, + challengeManager: challengeManager, + credentialStore: cs, + remoteURL: config.RemoteURL, + }, nil +} + +func (pr *proxyingRegistry) Scope() distribution.Scope { + return distribution.GlobalScope +} + +func (pr *proxyingRegistry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) { + return pr.embedded.Repositories(ctx, repos, last) +} + +func (pr *proxyingRegistry) Repository(ctx context.Context, name string) (distribution.Repository, error) { + tr := transport.NewTransport(http.DefaultTransport, + auth.NewAuthorizer(pr.challengeManager, auth.NewTokenHandler(http.DefaultTransport, pr.credentialStore, name, "pull"))) + + localRepo, err := pr.embedded.Repository(ctx, name) + if err != nil { + return nil, err + } + localManifests, err := localRepo.Manifests(ctx, storage.SkipLayerVerification) + if err != nil { + return nil, err + } + + remoteRepo, err := client.NewRepository(ctx, name, pr.remoteURL, tr) + if err != nil { + return nil, err + } + + remoteManifests, err := remoteRepo.Manifests(ctx) + if err != nil { + return nil, err + } + + return &proxiedRepository{ + blobStore: proxyBlobStore{ + localStore: localRepo.Blobs(ctx), + remoteStore: remoteRepo.Blobs(ctx), + scheduler: pr.scheduler, + }, + manifests: proxyManifestStore{ + repositoryName: name, + localManifests: localManifests, // Options? + remoteManifests: remoteManifests, + ctx: ctx, + scheduler: pr.scheduler, + }, + name: name, + signatures: localRepo.Signatures(), + }, nil +} + +// proxiedRepository uses proxying blob and manifest services to serve content +// locally, or pulling it through from a remote and caching it locally if it doesn't +// already exist +type proxiedRepository struct { + blobStore distribution.BlobStore + manifests distribution.ManifestService + name string + signatures distribution.SignatureService +} + +func (pr *proxiedRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) { + // options + return pr.manifests, nil +} + +func (pr *proxiedRepository) Blobs(ctx context.Context) distribution.BlobStore { + return pr.blobStore +} + +func (pr *proxiedRepository) Name() string { + return pr.name +} + +func (pr *proxiedRepository) Signatures() distribution.SignatureService { + return pr.signatures +} diff --git a/docs/proxy/scheduler/scheduler.go b/docs/proxy/scheduler/scheduler.go new file mode 100644 index 00000000..056b148a --- /dev/null +++ b/docs/proxy/scheduler/scheduler.go @@ -0,0 +1,250 @@ +package scheduler + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/docker/distribution/context" + "github.com/docker/distribution/registry/storage/driver" +) + +// onTTLExpiryFunc is called when a repositories' TTL expires +type expiryFunc func(string) error + +const ( + entryTypeBlob = iota + entryTypeManifest +) + +// schedulerEntry represents an entry in the scheduler +// fields are exported for serialization +type schedulerEntry struct { + Key string `json:"Key"` + Expiry time.Time `json:"ExpiryData"` + EntryType int `json:"EntryType"` +} + +// New returns a new instance of the scheduler +func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpirationScheduler { + return &TTLExpirationScheduler{ + entries: make(map[string]schedulerEntry), + addChan: make(chan schedulerEntry), + stopChan: make(chan bool), + driver: driver, + pathToStateFile: path, + ctx: ctx, + stopped: true, + } +} + +// TTLExpirationScheduler is a scheduler used to perform actions +// when TTLs expire +type TTLExpirationScheduler struct { + entries map[string]schedulerEntry + addChan chan schedulerEntry + stopChan chan bool + + driver driver.StorageDriver + ctx context.Context + pathToStateFile string + + stopped bool + + onBlobExpire expiryFunc + onManifestExpire expiryFunc +} + +// addChan allows more TTLs to be pushed to the scheduler +type addChan chan schedulerEntry + +// stopChan allows the scheduler to be stopped - used for testing. +type stopChan chan bool + +// OnBlobExpire is called when a scheduled blob's TTL expires +func (ttles *TTLExpirationScheduler) OnBlobExpire(f expiryFunc) { + ttles.onBlobExpire = f +} + +// OnManifestExpire is called when a scheduled manifest's TTL expires +func (ttles *TTLExpirationScheduler) OnManifestExpire(f expiryFunc) { + ttles.onManifestExpire = f +} + +// AddBlob schedules a blob cleanup after ttl expires +func (ttles *TTLExpirationScheduler) AddBlob(dgst string, ttl time.Duration) error { + if ttles.stopped { + return fmt.Errorf("scheduler not started") + } + ttles.add(dgst, ttl, entryTypeBlob) + return nil +} + +// AddManifest schedules a manifest cleanup after ttl expires +func (ttles *TTLExpirationScheduler) AddManifest(repoName string, ttl time.Duration) error { + if ttles.stopped { + return fmt.Errorf("scheduler not started") + } + + ttles.add(repoName, ttl, entryTypeManifest) + return nil +} + +// Start starts the scheduler +func (ttles *TTLExpirationScheduler) Start() error { + return ttles.start() +} + +func (ttles *TTLExpirationScheduler) add(key string, ttl time.Duration, eType int) { + entry := schedulerEntry{ + Key: key, + Expiry: time.Now().Add(ttl), + EntryType: eType, + } + ttles.addChan <- entry +} + +func (ttles *TTLExpirationScheduler) stop() { + ttles.stopChan <- true +} + +func (ttles *TTLExpirationScheduler) start() error { + err := ttles.readState() + if err != nil { + return err + } + + if !ttles.stopped { + return fmt.Errorf("Scheduler already started") + } + + context.GetLogger(ttles.ctx).Infof("Starting cached object TTL expiration scheduler...") + ttles.stopped = false + go ttles.mainloop() + + return nil +} + +// mainloop uses a select statement to listen for events. Most of its time +// is spent in waiting on a TTL to expire but can be interrupted when TTLs +// are added. +func (ttles *TTLExpirationScheduler) mainloop() { + for { + if ttles.stopped { + return + } + + nextEntry, ttl := nextExpiringEntry(ttles.entries) + if len(ttles.entries) == 0 { + context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Nothing to do, sleeping...") + } else { + context.GetLogger(ttles.ctx).Infof("scheduler mainloop(): Sleeping for %s until cleanup of %s", ttl, nextEntry.Key) + } + + select { + case <-time.After(ttl): + var f expiryFunc + + switch nextEntry.EntryType { + case entryTypeBlob: + f = ttles.onBlobExpire + case entryTypeManifest: + f = ttles.onManifestExpire + default: + f = func(repoName string) error { + return fmt.Errorf("Unexpected scheduler entry type") + } + } + + if err := f(nextEntry.Key); err != nil { + context.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", nextEntry.Key, err) + } + + delete(ttles.entries, nextEntry.Key) + if err := ttles.writeState(); err != nil { + context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) + } + case entry := <-ttles.addChan: + context.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now())) + ttles.entries[entry.Key] = entry + if err := ttles.writeState(); err != nil { + context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) + } + break + + case <-ttles.stopChan: + if err := ttles.writeState(); err != nil { + context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err) + } + ttles.stopped = true + } + } +} + +func nextExpiringEntry(entries map[string]schedulerEntry) (*schedulerEntry, time.Duration) { + if len(entries) == 0 { + return nil, 24 * time.Hour + } + + // todo:(richardscothern) this is a primitive o(n) algorithm + // but n will never be *that* big and it's all in memory. Investigate + // time.AfterFunc for heap based expiries + + first := true + var nextEntry schedulerEntry + for _, entry := range entries { + if first { + nextEntry = entry + first = false + continue + } + if entry.Expiry.Before(nextEntry.Expiry) { + nextEntry = entry + } + } + + // Dates may be from the past if the scheduler has + // been restarted, set their ttl to 0 + if nextEntry.Expiry.Before(time.Now()) { + nextEntry.Expiry = time.Now() + return &nextEntry, 0 + } + + return &nextEntry, nextEntry.Expiry.Sub(time.Now()) +} + +func (ttles *TTLExpirationScheduler) writeState() error { + jsonBytes, err := json.Marshal(ttles.entries) + if err != nil { + return err + } + + err = ttles.driver.PutContent(ttles.ctx, ttles.pathToStateFile, jsonBytes) + if err != nil { + return err + } + return nil +} + +func (ttles *TTLExpirationScheduler) readState() error { + if _, err := ttles.driver.Stat(ttles.ctx, ttles.pathToStateFile); err != nil { + switch err := err.(type) { + case driver.PathNotFoundError: + return nil + default: + return err + } + } + + bytes, err := ttles.driver.GetContent(ttles.ctx, ttles.pathToStateFile) + if err != nil { + return err + } + + err = json.Unmarshal(bytes, &ttles.entries) + if err != nil { + return err + } + + return nil +} diff --git a/docs/proxy/scheduler/scheduler_test.go b/docs/proxy/scheduler/scheduler_test.go new file mode 100644 index 00000000..fb5479f0 --- /dev/null +++ b/docs/proxy/scheduler/scheduler_test.go @@ -0,0 +1,165 @@ +package scheduler + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/docker/distribution/context" + "github.com/docker/distribution/registry/storage/driver/inmemory" +) + +func TestSchedule(t *testing.T) { + timeUnit := time.Millisecond + remainingRepos := map[string]bool{ + "testBlob1": true, + "testBlob2": true, + "ch00": true, + } + + s := New(context.Background(), inmemory.New(), "/ttl") + deleteFunc := func(repoName string) error { + if len(remainingRepos) == 0 { + t.Fatalf("Incorrect expiry count") + } + _, ok := remainingRepos[repoName] + if !ok { + t.Fatalf("Trying to remove nonexistant repo: %s", repoName) + } + fmt.Println("removing", repoName) + delete(remainingRepos, repoName) + + return nil + } + s.onBlobExpire = deleteFunc + err := s.start() + if err != nil { + t.Fatalf("Error starting ttlExpirationScheduler: %s", err) + } + + s.add("testBlob1", 3*timeUnit, entryTypeBlob) + s.add("testBlob2", 1*timeUnit, entryTypeBlob) + + func() { + s.add("ch00", 1*timeUnit, entryTypeBlob) + + }() + + // Ensure all repos are deleted + <-time.After(50 * timeUnit) + if len(remainingRepos) != 0 { + t.Fatalf("Repositories remaining: %#v", remainingRepos) + } +} + +func TestRestoreOld(t *testing.T) { + remainingRepos := map[string]bool{ + "testBlob1": true, + "oldRepo": true, + } + + deleteFunc := func(repoName string) error { + if repoName == "oldRepo" && len(remainingRepos) == 3 { + t.Errorf("oldRepo should be removed first") + } + _, ok := remainingRepos[repoName] + if !ok { + t.Fatalf("Trying to remove nonexistant repo: %s", repoName) + } + delete(remainingRepos, repoName) + return nil + } + + timeUnit := time.Millisecond + serialized, err := json.Marshal(&map[string]schedulerEntry{ + "testBlob1": { + Expiry: time.Now().Add(1 * timeUnit), + Key: "testBlob1", + EntryType: 0, + }, + "oldRepo": { + Expiry: time.Now().Add(-3 * timeUnit), // TTL passed, should be removed first + Key: "oldRepo", + EntryType: 0, + }, + }) + if err != nil { + t.Fatalf("Error serializing test data: %s", err.Error()) + } + + ctx := context.Background() + pathToStatFile := "/ttl" + fs := inmemory.New() + err = fs.PutContent(ctx, pathToStatFile, serialized) + if err != nil { + t.Fatal("Unable to write serialized data to fs") + } + s := New(context.Background(), fs, "/ttl") + s.onBlobExpire = deleteFunc + err = s.start() + if err != nil { + t.Fatalf("Error starting ttlExpirationScheduler: %s", err) + } + + <-time.After(50 * timeUnit) + if len(remainingRepos) != 0 { + t.Fatalf("Repositories remaining: %#v", remainingRepos) + } +} + +func TestStopRestore(t *testing.T) { + timeUnit := time.Millisecond + remainingRepos := map[string]bool{ + "testBlob1": true, + "testBlob2": true, + } + deleteFunc := func(repoName string) error { + delete(remainingRepos, repoName) + return nil + } + + fs := inmemory.New() + pathToStateFile := "/ttl" + s := New(context.Background(), fs, pathToStateFile) + s.onBlobExpire = deleteFunc + + err := s.start() + if err != nil { + t.Fatalf(err.Error()) + } + s.add("testBlob1", 300*timeUnit, entryTypeBlob) + s.add("testBlob2", 100*timeUnit, entryTypeBlob) + + // Start and stop before all operations complete + // state will be written to fs + s.stop() + time.Sleep(10 * time.Millisecond) + + // v2 will restore state from fs + s2 := New(context.Background(), fs, pathToStateFile) + s2.onBlobExpire = deleteFunc + err = s2.start() + if err != nil { + t.Fatalf("Error starting v2: %s", err.Error()) + } + + <-time.After(500 * timeUnit) + if len(remainingRepos) != 0 { + t.Fatalf("Repositories remaining: %#v", remainingRepos) + } + +} + +func TestDoubleStart(t *testing.T) { + s := New(context.Background(), inmemory.New(), "/ttl") + err := s.start() + if err != nil { + t.Fatalf("Unable to start scheduler") + } + fmt.Printf("%#v", s) + err = s.start() + if err == nil { + t.Fatalf("Scheduler started twice without error") + } +} diff --git a/docs/storage/blob_test.go b/docs/storage/blob_test.go index 8f6fb6f2..e5cfa83e 100644 --- a/docs/storage/blob_test.go +++ b/docs/storage/blob_test.go @@ -33,7 +33,7 @@ func TestSimpleBlobUpload(t *testing.T) { ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() - registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true) + registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true, false) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) @@ -193,7 +193,7 @@ func TestSimpleBlobUpload(t *testing.T) { } // Reuse state to test delete with a delete-disabled registry - registry = NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true) + registry = NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, false) repository, err = registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) @@ -212,7 +212,7 @@ func TestSimpleBlobRead(t *testing.T) { ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() - registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true) + registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true, false) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) @@ -316,7 +316,7 @@ func TestLayerUploadZeroLength(t *testing.T) { ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() - registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true) + registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true, false) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) diff --git a/docs/storage/blobwriter.go b/docs/storage/blobwriter.go index 50da7699..2142c37f 100644 --- a/docs/storage/blobwriter.go +++ b/docs/storage/blobwriter.go @@ -31,6 +31,8 @@ type blobWriter struct { // implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy // LayerUpload Interface bufferedFileWriter + + resumableDigestEnabled bool } var _ distribution.BlobWriter = &blobWriter{} @@ -349,3 +351,29 @@ func (bw *blobWriter) removeResources(ctx context.Context) error { return nil } + +func (bw *blobWriter) Reader() (io.ReadCloser, error) { + // todo(richardscothern): Change to exponential backoff, i=0.5, e=2, n=4 + try := 1 + for try <= 5 { + _, err := bw.bufferedFileWriter.driver.Stat(bw.ctx, bw.path) + if err == nil { + break + } + switch err.(type) { + case storagedriver.PathNotFoundError: + context.GetLogger(bw.ctx).Debugf("Nothing found on try %d, sleeping...", try) + time.Sleep(1 * time.Second) + try++ + default: + return nil, err + } + } + + readCloser, err := bw.bufferedFileWriter.driver.ReadStream(bw.ctx, bw.path, 0) + if err != nil { + return nil, err + } + + return readCloser, nil +} diff --git a/docs/storage/blobwriter_resumable.go b/docs/storage/blobwriter_resumable.go index c2ab2123..a26ac2cc 100644 --- a/docs/storage/blobwriter_resumable.go +++ b/docs/storage/blobwriter_resumable.go @@ -24,6 +24,10 @@ import ( // offset. Any unhashed bytes remaining less than the given offset are hashed // from the content uploaded so far. func (bw *blobWriter) resumeDigestAt(ctx context.Context, offset int64) error { + if !bw.resumableDigestEnabled { + return errResumableDigestNotAvailable + } + if offset < 0 { return fmt.Errorf("cannot resume hash at negative offset: %d", offset) } @@ -143,6 +147,10 @@ func (bw *blobWriter) getStoredHashStates(ctx context.Context) ([]hashStateEntry } func (bw *blobWriter) storeHashState(ctx context.Context) error { + if !bw.resumableDigestEnabled { + return errResumableDigestNotAvailable + } + h, ok := bw.digester.Hash().(resumable.Hash) if !ok { return errResumableDigestNotAvailable diff --git a/docs/storage/catalog_test.go b/docs/storage/catalog_test.go index 862777aa..1a1dbac5 100644 --- a/docs/storage/catalog_test.go +++ b/docs/storage/catalog_test.go @@ -22,7 +22,7 @@ func setupFS(t *testing.T) *setupEnv { d := inmemory.New() c := []byte("") ctx := context.Background() - registry := NewRegistryWithDriver(ctx, d, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true) + registry := NewRegistryWithDriver(ctx, d, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, false) rootpath, _ := defaultPathMapper.path(repositoriesRootPathSpec{}) repos := []string{ diff --git a/docs/storage/linkedblobstore.go b/docs/storage/linkedblobstore.go index e7a98bbb..2ba62a95 100644 --- a/docs/storage/linkedblobstore.go +++ b/docs/storage/linkedblobstore.go @@ -16,11 +16,12 @@ import ( // that grant access to the global blob store. type linkedBlobStore struct { *blobStore - blobServer distribution.BlobServer - blobAccessController distribution.BlobDescriptorService - repository distribution.Repository - ctx context.Context // only to be used where context can't come through method args - deleteEnabled bool + blobServer distribution.BlobServer + blobAccessController distribution.BlobDescriptorService + repository distribution.Repository + ctx context.Context // only to be used where context can't come through method args + deleteEnabled bool + resumableDigestEnabled bool // linkPath allows one to control the repository blob link set to which // the blob store dispatches. This is required because manifest and layer @@ -189,11 +190,12 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string } bw := &blobWriter{ - blobStore: lbs, - id: uuid, - startedAt: startedAt, - digester: digest.Canonical.New(), - bufferedFileWriter: *fw, + blobStore: lbs, + id: uuid, + startedAt: startedAt, + digester: digest.Canonical.New(), + bufferedFileWriter: *fw, + resumableDigestEnabled: lbs.resumableDigestEnabled, } return bw, nil diff --git a/docs/storage/manifeststore_test.go b/docs/storage/manifeststore_test.go index 5bbbd4a2..a4ce9149 100644 --- a/docs/storage/manifeststore_test.go +++ b/docs/storage/manifeststore_test.go @@ -29,7 +29,7 @@ type manifestStoreTestEnv struct { func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv { ctx := context.Background() driver := inmemory.New() - registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true) + registry := NewRegistryWithDriver(ctx, driver, memory.NewInMemoryBlobDescriptorCacheProvider(), true, true, false) repo, err := registry.Repository(ctx, name) if err != nil { @@ -348,7 +348,7 @@ func TestManifestStorage(t *testing.T) { t.Errorf("Deleted manifest get returned non-nil") } - r := NewRegistryWithDriver(ctx, env.driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true) + r := NewRegistryWithDriver(ctx, env.driver, memory.NewInMemoryBlobDescriptorCacheProvider(), false, true, false) repo, err := r.Repository(ctx, env.name) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) diff --git a/docs/storage/registry.go b/docs/storage/registry.go index 8149be11..c5058b80 100644 --- a/docs/storage/registry.go +++ b/docs/storage/registry.go @@ -16,6 +16,7 @@ type registry struct { statter distribution.BlobStatter // global statter service. blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider deleteEnabled bool + resumableDigestEnabled bool } // NewRegistryWithDriver creates a new registry instance from the provided @@ -23,9 +24,9 @@ type registry struct { // cheap to allocate. If redirect is true, the backend blob server will // attempt to use (StorageDriver).URLFor to serve all blobs. // -// TODO(stevvooe): This function signature is getting out of hand. Move to +// TODO(stevvooe): This function signature is getting very out of hand. Move to // functional options for instance configuration. -func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriver, blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider, deleteEnabled bool, redirect bool) distribution.Namespace { +func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriver, blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider, deleteEnabled bool, redirect bool, isCache bool) distribution.Namespace { // create global statter, with cache. var statter distribution.BlobDescriptorService = &blobStatter{ driver: driver, @@ -52,6 +53,7 @@ func NewRegistryWithDriver(ctx context.Context, driver storagedriver.StorageDriv }, blobDescriptorCacheProvider: blobDescriptorCacheProvider, deleteEnabled: deleteEnabled, + resumableDigestEnabled: !isCache, } } diff --git a/docs/storage/vacuum.go b/docs/storage/vacuum.go new file mode 100644 index 00000000..46b8096b --- /dev/null +++ b/docs/storage/vacuum.go @@ -0,0 +1,67 @@ +package storage + +import ( + "path" + + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/registry/storage/driver" +) + +// vacuum contains functions for cleaning up repositories and blobs +// These functions will only reliably work on strongly consistent +// storage systems. +// https://en.wikipedia.org/wiki/Consistency_model + +// NewVacuum creates a new Vacuum +func NewVacuum(ctx context.Context, driver driver.StorageDriver) Vacuum { + return Vacuum{ + ctx: ctx, + driver: driver, + pm: defaultPathMapper, + } +} + +// Vacuum removes content from the filesystem +type Vacuum struct { + pm *pathMapper + driver driver.StorageDriver + ctx context.Context +} + +// RemoveBlob removes a blob from the filesystem +func (v Vacuum) RemoveBlob(dgst string) error { + d, err := digest.ParseDigest(dgst) + if err != nil { + return err + } + + blobPath, err := v.pm.path(blobDataPathSpec{digest: d}) + if err != nil { + return err + } + context.GetLogger(v.ctx).Infof("Deleting blob: %s", blobPath) + err = v.driver.Delete(v.ctx, blobPath) + if err != nil { + return err + } + + return nil +} + +// RemoveRepository removes a repository directory from the +// filesystem +func (v Vacuum) RemoveRepository(repoName string) error { + rootForRepository, err := v.pm.path(repositoriesRootPathSpec{}) + if err != nil { + return err + } + repoDir := path.Join(rootForRepository, repoName) + context.GetLogger(v.ctx).Infof("Deleting repo: %s", repoDir) + err = v.driver.Delete(v.ctx, repoDir) + if err != nil { + return err + } + + return nil +}