diff --git a/docs/doc.go b/docs/doc.go index 5049dae3..1c01e42e 100644 --- a/docs/doc.go +++ b/docs/doc.go @@ -1,3 +1,3 @@ // Package registry is a placeholder package for registry interface -// destinations and utilities. +// definitions and utilities. package registry diff --git a/docs/handlers/app.go b/docs/handlers/app.go index 1b5effbc..fac93382 100644 --- a/docs/handlers/app.go +++ b/docs/handlers/app.go @@ -1,10 +1,12 @@ package handlers import ( + "expvar" "fmt" "net" "net/http" "os" + "time" "code.google.com/p/go-uuid/uuid" "github.com/docker/distribution" @@ -16,9 +18,11 @@ import ( registrymiddleware "github.com/docker/distribution/registry/middleware/registry" repositorymiddleware "github.com/docker/distribution/registry/middleware/repository" "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/registry/storage/cache" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/factory" storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware" + "github.com/garyburd/redigo/redis" "github.com/gorilla/mux" "golang.org/x/net/context" ) @@ -44,6 +48,8 @@ type App struct { sink notifications.Sink source notifications.SourceRecord } + + redis *redis.Pool } // Value intercepts calls context.Context.Value, returning the current app id, @@ -95,8 +101,32 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App } app.configureEvents(&configuration) + app.configureRedis(&configuration) + + // configure storage caches + if cc, ok := configuration.Storage["cache"]; ok { + switch cc["layerinfo"] { + case "redis": + if app.redis == nil { + panic("redis configuration required to use for layerinfo cache") + } + app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewRedisLayerInfoCache(app.redis)) + ctxu.GetLogger(app).Infof("using redis layerinfo cache") + case "inmemory": + app.registry = storage.NewRegistryWithDriver(app.driver, cache.NewInMemoryLayerInfoCache()) + ctxu.GetLogger(app).Infof("using inmemory layerinfo cache") + default: + if cc["layerinfo"] != "" { + ctxu.GetLogger(app).Warnf("unkown cache type %q, caching disabled", configuration.Storage["cache"]) + } + } + } + + if app.registry == nil { + // configure the registry if no cache section is available. + app.registry = storage.NewRegistryWithDriver(app.driver, nil) + } - app.registry = storage.NewRegistryWithDriver(app.driver) app.registry, err = applyRegistryMiddleware(app.registry, configuration.Middleware["registry"]) if err != nil { panic(err) @@ -174,6 +204,88 @@ func (app *App) configureEvents(configuration *configuration.Configuration) { } } +func (app *App) configureRedis(configuration *configuration.Configuration) { + if configuration.Redis.Addr == "" { + ctxu.GetLogger(app).Infof("redis not configured") + return + } + + pool := &redis.Pool{ + Dial: func() (redis.Conn, error) { + // TODO(stevvooe): Yet another use case for contextual timing. + ctx := context.WithValue(app, "redis.connect.startedat", time.Now()) + + done := func(err error) { + logger := ctxu.GetLoggerWithField(ctx, "redis.connect.duration", + ctxu.Since(ctx, "redis.connect.startedat")) + if err != nil { + logger.Errorf("redis: error connecting: %v", err) + } else { + logger.Infof("redis: connect %v", configuration.Redis.Addr) + } + } + + conn, err := redis.DialTimeout("tcp", + configuration.Redis.Addr, + configuration.Redis.DialTimeout, + configuration.Redis.ReadTimeout, + configuration.Redis.WriteTimeout) + if err != nil { + ctxu.GetLogger(app).Errorf("error connecting to redis instance %s: %v", + configuration.Redis.Addr, err) + done(err) + return nil, err + } + + // authorize the connection + if configuration.Redis.Password != "" { + if _, err = conn.Do("AUTH", configuration.Redis.Password); err != nil { + defer conn.Close() + done(err) + return nil, err + } + } + + // select the database to use + if configuration.Redis.DB != 0 { + if _, err = conn.Do("SELECT", configuration.Redis.DB); err != nil { + defer conn.Close() + done(err) + return nil, err + } + } + + done(nil) + return conn, nil + }, + MaxIdle: configuration.Redis.Pool.MaxIdle, + MaxActive: configuration.Redis.Pool.MaxActive, + IdleTimeout: configuration.Redis.Pool.IdleTimeout, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + // TODO(stevvooe): We can probably do something more interesting + // here with the health package. + _, err := c.Do("PING") + return err + }, + Wait: false, // if a connection is not avialable, proceed without cache. + } + + app.redis = pool + + // setup expvar + registry := expvar.Get("registry") + if registry == nil { + registry = expvar.NewMap("registry") + } + + registry.(*expvar.Map).Set("redis", expvar.Func(func() interface{} { + return map[string]interface{}{ + "Config": configuration.Redis, + "Active": app.redis.ActiveCount(), + } + })) +} + func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() // ensure that request body is always closed. diff --git a/docs/handlers/app_test.go b/docs/handlers/app_test.go index cd515dd0..d0b9174d 100644 --- a/docs/handlers/app_test.go +++ b/docs/handlers/app_test.go @@ -13,6 +13,7 @@ import ( "github.com/docker/distribution/registry/auth" _ "github.com/docker/distribution/registry/auth/silly" "github.com/docker/distribution/registry/storage" + "github.com/docker/distribution/registry/storage/cache" "github.com/docker/distribution/registry/storage/driver/inmemory" "golang.org/x/net/context" ) @@ -28,7 +29,7 @@ func TestAppDispatcher(t *testing.T) { Context: context.Background(), router: v2.Router(), driver: driver, - registry: storage.NewRegistryWithDriver(driver), + registry: storage.NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()), } server := httptest.NewServer(app) router := v2.Router() diff --git a/docs/storage/blobstore.go b/docs/storage/blobstore.go index 975df19f..8bab2f5e 100644 --- a/docs/storage/blobstore.go +++ b/docs/storage/blobstore.go @@ -18,8 +18,9 @@ import ( // abstraction, providing utility methods that support creating and traversing // backend links. type blobStore struct { - *registry - ctx context.Context + driver storagedriver.StorageDriver + pm *pathMapper + ctx context.Context } // exists reports whether or not the path exists. If the driver returns error diff --git a/docs/storage/cache/cache.go b/docs/storage/cache/cache.go new file mode 100644 index 00000000..a21cefd5 --- /dev/null +++ b/docs/storage/cache/cache.go @@ -0,0 +1,98 @@ +// Package cache provides facilities to speed up access to the storage +// backend. Typically cache implementations deal with internal implementation +// details at the backend level, rather than generalized caches for +// distribution related interfaces. In other words, unless the cache is +// specific to the storage package, it belongs in another package. +package cache + +import ( + "fmt" + + "github.com/docker/distribution/digest" + "golang.org/x/net/context" +) + +// ErrNotFound is returned when a meta item is not found. +var ErrNotFound = fmt.Errorf("not found") + +// LayerMeta describes the backend location and length of layer data. +type LayerMeta struct { + Path string + Length int64 +} + +// LayerInfoCache is a driver-aware cache of layer metadata. Basically, it +// provides a fast cache for checks against repository metadata, avoiding +// round trips to backend storage. Note that this is different from a pure +// layer cache, which would also provide access to backing data, as well. Such +// a cache should be implemented as a middleware, rather than integrated with +// the storage backend. +// +// Note that most implementations rely on the caller to do strict checks on on +// repo and dgst arguments, since these are mostly used behind existing +// implementations. +type LayerInfoCache interface { + // Contains returns true if the repository with name contains the layer. + Contains(ctx context.Context, repo string, dgst digest.Digest) (bool, error) + + // Add includes the layer in the given repository cache. + Add(ctx context.Context, repo string, dgst digest.Digest) error + + // Meta provides the location of the layer on the backend and its size. Membership of a + // repository should be tested before using the result, if required. + Meta(ctx context.Context, dgst digest.Digest) (LayerMeta, error) + + // SetMeta sets the meta data for the given layer. + SetMeta(ctx context.Context, dgst digest.Digest, meta LayerMeta) error +} + +// base implements common checks between cache implementations. Note that +// these are not full checks of input, since that should be done by the +// caller. +type base struct { + LayerInfoCache +} + +func (b *base) Contains(ctx context.Context, repo string, dgst digest.Digest) (bool, error) { + if repo == "" { + return false, fmt.Errorf("cache: cannot check for empty repository name") + } + + if dgst == "" { + return false, fmt.Errorf("cache: cannot check for empty digests") + } + + return b.LayerInfoCache.Contains(ctx, repo, dgst) +} + +func (b *base) Add(ctx context.Context, repo string, dgst digest.Digest) error { + if repo == "" { + return fmt.Errorf("cache: cannot add empty repository name") + } + + if dgst == "" { + return fmt.Errorf("cache: cannot add empty digest") + } + + return b.LayerInfoCache.Add(ctx, repo, dgst) +} + +func (b *base) Meta(ctx context.Context, dgst digest.Digest) (LayerMeta, error) { + if dgst == "" { + return LayerMeta{}, fmt.Errorf("cache: cannot get meta for empty digest") + } + + return b.LayerInfoCache.Meta(ctx, dgst) +} + +func (b *base) SetMeta(ctx context.Context, dgst digest.Digest, meta LayerMeta) error { + if dgst == "" { + return fmt.Errorf("cache: cannot set meta for empty digest") + } + + if meta.Path == "" { + return fmt.Errorf("cache: cannot set empty path for meta") + } + + return b.LayerInfoCache.SetMeta(ctx, dgst, meta) +} diff --git a/docs/storage/cache/cache_test.go b/docs/storage/cache/cache_test.go new file mode 100644 index 00000000..48cef955 --- /dev/null +++ b/docs/storage/cache/cache_test.go @@ -0,0 +1,86 @@ +package cache + +import ( + "testing" + + "golang.org/x/net/context" +) + +// checkLayerInfoCache takes a cache implementation through a common set of +// operations. If adding new tests, please add them here so new +// implementations get the benefit. +func checkLayerInfoCache(t *testing.T, lic LayerInfoCache) { + ctx := context.Background() + + exists, err := lic.Contains(ctx, "", "fake:abc") + if err == nil { + t.Fatalf("expected error checking for cache item with empty repo") + } + + exists, err = lic.Contains(ctx, "foo/bar", "") + if err == nil { + t.Fatalf("expected error checking for cache item with empty digest") + } + + exists, err = lic.Contains(ctx, "foo/bar", "fake:abc") + if err != nil { + t.Fatalf("unexpected error checking for cache item: %v", err) + } + + if exists { + t.Fatalf("item should not exist") + } + + if err := lic.Add(ctx, "", "fake:abc"); err == nil { + t.Fatalf("expected error adding cache item with empty name") + } + + if err := lic.Add(ctx, "foo/bar", ""); err == nil { + t.Fatalf("expected error adding cache item with empty digest") + } + + if err := lic.Add(ctx, "foo/bar", "fake:abc"); err != nil { + t.Fatalf("unexpected error adding item: %v", err) + } + + exists, err = lic.Contains(ctx, "foo/bar", "fake:abc") + if err != nil { + t.Fatalf("unexpected error checking for cache item: %v", err) + } + + if !exists { + t.Fatalf("item should exist") + } + + _, err = lic.Meta(ctx, "") + if err == nil || err == ErrNotFound { + t.Fatalf("expected error getting meta for cache item with empty digest") + } + + _, err = lic.Meta(ctx, "fake:abc") + if err != ErrNotFound { + t.Fatalf("expected unknown layer error getting meta for cache item with empty digest") + } + + if err = lic.SetMeta(ctx, "", LayerMeta{}); err == nil { + t.Fatalf("expected error setting meta for cache item with empty digest") + } + + if err = lic.SetMeta(ctx, "foo/bar", LayerMeta{}); err == nil { + t.Fatalf("expected error setting meta for cache item with empty meta") + } + + expected := LayerMeta{Path: "/foo/bar", Length: 20} + if err := lic.SetMeta(ctx, "foo/bar", expected); err != nil { + t.Fatalf("unexpected error setting meta: %v", err) + } + + meta, err := lic.Meta(ctx, "foo/bar") + if err != nil { + t.Fatalf("unexpected error getting meta: %v", err) + } + + if meta != expected { + t.Fatalf("retrieved meta data did not match: %v", err) + } +} diff --git a/docs/storage/cache/memory.go b/docs/storage/cache/memory.go new file mode 100644 index 00000000..6d949792 --- /dev/null +++ b/docs/storage/cache/memory.go @@ -0,0 +1,63 @@ +package cache + +import ( + "github.com/docker/distribution/digest" + "golang.org/x/net/context" +) + +// inmemoryLayerInfoCache is a map-based implementation of LayerInfoCache. +type inmemoryLayerInfoCache struct { + membership map[string]map[digest.Digest]struct{} + meta map[digest.Digest]LayerMeta +} + +// NewInMemoryLayerInfoCache provides an implementation of LayerInfoCache that +// stores results in memory. +func NewInMemoryLayerInfoCache() LayerInfoCache { + return &base{&inmemoryLayerInfoCache{ + membership: make(map[string]map[digest.Digest]struct{}), + meta: make(map[digest.Digest]LayerMeta), + }} +} + +func (ilic *inmemoryLayerInfoCache) Contains(ctx context.Context, repo string, dgst digest.Digest) (bool, error) { + members, ok := ilic.membership[repo] + if !ok { + return false, nil + } + + _, ok = members[dgst] + return ok, nil +} + +// Add adds the layer to the redis repository blob set. +func (ilic *inmemoryLayerInfoCache) Add(ctx context.Context, repo string, dgst digest.Digest) error { + members, ok := ilic.membership[repo] + if !ok { + members = make(map[digest.Digest]struct{}) + ilic.membership[repo] = members + } + + members[dgst] = struct{}{} + + return nil +} + +// Meta retrieves the layer meta data from the redis hash, returning +// ErrUnknownLayer if not found. +func (ilic *inmemoryLayerInfoCache) Meta(ctx context.Context, dgst digest.Digest) (LayerMeta, error) { + meta, ok := ilic.meta[dgst] + if !ok { + return LayerMeta{}, ErrNotFound + } + + return meta, nil +} + +// SetMeta sets the meta data for the given digest using a redis hash. A hash +// is used here since we may store unrelated fields about a layer in the +// future. +func (ilic *inmemoryLayerInfoCache) SetMeta(ctx context.Context, dgst digest.Digest, meta LayerMeta) error { + ilic.meta[dgst] = meta + return nil +} diff --git a/docs/storage/cache/memory_test.go b/docs/storage/cache/memory_test.go new file mode 100644 index 00000000..417e982e --- /dev/null +++ b/docs/storage/cache/memory_test.go @@ -0,0 +1,9 @@ +package cache + +import "testing" + +// TestInMemoryLayerInfoCache checks the in memory implementation is working +// correctly. +func TestInMemoryLayerInfoCache(t *testing.T) { + checkLayerInfoCache(t, NewInMemoryLayerInfoCache()) +} diff --git a/docs/storage/cache/redis.go b/docs/storage/cache/redis.go new file mode 100644 index 00000000..6b8f7679 --- /dev/null +++ b/docs/storage/cache/redis.go @@ -0,0 +1,98 @@ +package cache + +import ( + ctxu "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/garyburd/redigo/redis" + "golang.org/x/net/context" +) + +// redisLayerInfoCache provides an implementation of storage.LayerInfoCache +// based on redis. Layer info is stored in two parts. The first provide fast +// access to repository membership through a redis set for each repo. The +// second is a redis hash keyed by the digest of the layer, providing path and +// length information. Note that there is no implied relationship between +// these two caches. The layer may exist in one, both or none and the code +// must be written this way. +type redisLayerInfoCache struct { + pool *redis.Pool + + // TODO(stevvooe): We use a pool because we don't have great control over + // the cache lifecycle to manage connections. A new connection if fetched + // for each operation. Once we have better lifecycle management of the + // request objects, we can change this to a connection. +} + +// NewRedisLayerInfoCache returns a new redis-based LayerInfoCache using the +// provided redis connection pool. +func NewRedisLayerInfoCache(pool *redis.Pool) LayerInfoCache { + return &base{&redisLayerInfoCache{ + pool: pool, + }} +} + +// Contains does a membership check on the repository blob set in redis. This +// is used as an access check before looking up global path information. If +// false is returned, the caller should still check the backend to if it +// exists elsewhere. +func (rlic *redisLayerInfoCache) Contains(ctx context.Context, repo string, dgst digest.Digest) (bool, error) { + conn := rlic.pool.Get() + defer conn.Close() + + ctxu.GetLogger(ctx).Debugf("(*redisLayerInfoCache).Contains(%q, %q)", repo, dgst) + return redis.Bool(conn.Do("SISMEMBER", rlic.repositoryBlobSetKey(repo), dgst)) +} + +// Add adds the layer to the redis repository blob set. +func (rlic *redisLayerInfoCache) Add(ctx context.Context, repo string, dgst digest.Digest) error { + conn := rlic.pool.Get() + defer conn.Close() + + ctxu.GetLogger(ctx).Debugf("(*redisLayerInfoCache).Add(%q, %q)", repo, dgst) + _, err := conn.Do("SADD", rlic.repositoryBlobSetKey(repo), dgst) + return err +} + +// Meta retrieves the layer meta data from the redis hash, returning +// ErrUnknownLayer if not found. +func (rlic *redisLayerInfoCache) Meta(ctx context.Context, dgst digest.Digest) (LayerMeta, error) { + conn := rlic.pool.Get() + defer conn.Close() + + reply, err := redis.Values(conn.Do("HMGET", rlic.blobMetaHashKey(dgst), "path", "length")) + if err != nil { + return LayerMeta{}, err + } + + if len(reply) < 2 || reply[0] == nil || reply[1] == nil { + return LayerMeta{}, ErrNotFound + } + + var meta LayerMeta + if _, err := redis.Scan(reply, &meta.Path, &meta.Length); err != nil { + return LayerMeta{}, err + } + + return meta, nil +} + +// SetMeta sets the meta data for the given digest using a redis hash. A hash +// is used here since we may store unrelated fields about a layer in the +// future. +func (rlic *redisLayerInfoCache) SetMeta(ctx context.Context, dgst digest.Digest, meta LayerMeta) error { + conn := rlic.pool.Get() + defer conn.Close() + + _, err := conn.Do("HMSET", rlic.blobMetaHashKey(dgst), "path", meta.Path, "length", meta.Length) + return err +} + +// repositoryBlobSetKey returns the key for the blob set in the cache. +func (rlic *redisLayerInfoCache) repositoryBlobSetKey(repo string) string { + return "repository::" + repo + "::blobs" +} + +// blobMetaHashKey returns the cache key for immutable blob meta data. +func (rlic *redisLayerInfoCache) blobMetaHashKey(dgst digest.Digest) string { + return "blobs::" + dgst.String() +} diff --git a/docs/storage/cache/redis_test.go b/docs/storage/cache/redis_test.go new file mode 100644 index 00000000..7422a7eb --- /dev/null +++ b/docs/storage/cache/redis_test.go @@ -0,0 +1,50 @@ +package cache + +import ( + "flag" + "os" + "testing" + "time" + + "github.com/garyburd/redigo/redis" +) + +var redisAddr string + +func init() { + flag.StringVar(&redisAddr, "test.registry.storage.cache.redis.addr", "", "configure the address of a test instance of redis") +} + +// TestRedisLayerInfoCache exercises a live redis instance using the cache +// implementation. +func TestRedisLayerInfoCache(t *testing.T) { + if redisAddr == "" { + // fallback to an environement variable + redisAddr = os.Getenv("TEST_REGISTRY_STORAGE_CACHE_REDIS_ADDR") + } + + if redisAddr == "" { + // skip if still not set + t.Skip("please set -registry.storage.cache.redis to test layer info cache against redis") + } + + pool := &redis.Pool{ + Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", redisAddr) + }, + MaxIdle: 1, + MaxActive: 2, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err + }, + Wait: false, // if a connection is not avialable, proceed without cache. + } + + // Clear the database + if _, err := pool.Get().Do("FLUSHDB"); err != nil { + t.Fatalf("unexpected error flushing redis db: %v", err) + } + + checkLayerInfoCache(t, NewRedisLayerInfoCache(pool)) +} diff --git a/docs/storage/filereader.go b/docs/storage/filereader.go index b70b1fb2..65d4347f 100644 --- a/docs/storage/filereader.go +++ b/docs/storage/filereader.go @@ -27,8 +27,8 @@ type fileReader struct { // identifying fields path string - size int64 // size is the total layer size, must be set. - modtime time.Time + size int64 // size is the total size, must be set. + modtime time.Time // TODO(stevvooe): This is not needed anymore. // mutable fields rc io.ReadCloser // remote read closer diff --git a/docs/storage/layer_test.go b/docs/storage/layer_test.go index 43e028d5..e225d068 100644 --- a/docs/storage/layer_test.go +++ b/docs/storage/layer_test.go @@ -11,6 +11,7 @@ import ( "github.com/docker/distribution" "github.com/docker/distribution/digest" + "github.com/docker/distribution/registry/storage/cache" storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/inmemory" "github.com/docker/distribution/testutil" @@ -35,7 +36,7 @@ func TestSimpleLayerUpload(t *testing.T) { ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() - registry := NewRegistryWithDriver(driver) + registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) @@ -143,7 +144,7 @@ func TestSimpleLayerRead(t *testing.T) { ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() - registry := NewRegistryWithDriver(driver) + registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) @@ -180,7 +181,7 @@ func TestSimpleLayerRead(t *testing.T) { t.Fatalf("unexpected error fetching non-existent layer: %v", err) } - randomLayerDigest, err := writeTestLayer(driver, ls.(*layerStore).repository.pm, imageName, dgst, randomLayerReader) + randomLayerDigest, err := writeTestLayer(driver, defaultPathMapper, imageName, dgst, randomLayerReader) if err != nil { t.Fatalf("unexpected error writing test layer: %v", err) } @@ -252,7 +253,7 @@ func TestLayerUploadZeroLength(t *testing.T) { ctx := context.Background() imageName := "foo/bar" driver := inmemory.New() - registry := NewRegistryWithDriver(driver) + registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()) repository, err := registry.Repository(ctx, imageName) if err != nil { t.Fatalf("unexpected error getting repo: %v", err) diff --git a/docs/storage/layercache.go b/docs/storage/layercache.go new file mode 100644 index 00000000..b9732f20 --- /dev/null +++ b/docs/storage/layercache.go @@ -0,0 +1,202 @@ +package storage + +import ( + "expvar" + "sync/atomic" + "time" + + "github.com/docker/distribution" + ctxu "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/registry/storage/cache" + "github.com/docker/distribution/registry/storage/driver" + "golang.org/x/net/context" +) + +// cachedLayerService implements the layer service with path-aware caching, +// using a LayerInfoCache interface. +type cachedLayerService struct { + distribution.LayerService // upstream layer service + repository distribution.Repository + ctx context.Context + driver driver.StorageDriver + *blobStore // global blob store + cache cache.LayerInfoCache +} + +// Exists checks for existence of the digest in the cache, immediately +// returning if it exists for the repository. If not, the upstream is checked. +// When a positive result is found, it is written into the cache. +func (lc *cachedLayerService) Exists(dgst digest.Digest) (bool, error) { + ctxu.GetLogger(lc.ctx).Debugf("(*cachedLayerService).Exists(%q)", dgst) + now := time.Now() + defer func() { + // TODO(stevvooe): Replace this with a decent context-based metrics solution + ctxu.GetLoggerWithField(lc.ctx, "blob.exists.duration", time.Since(now)). + Infof("(*cachedLayerService).Exists(%q)", dgst) + }() + + atomic.AddUint64(&layerInfoCacheMetrics.Exists.Requests, 1) + available, err := lc.cache.Contains(lc.ctx, lc.repository.Name(), dgst) + if err != nil { + ctxu.GetLogger(lc.ctx).Errorf("error checking availability of %v@%v: %v", lc.repository.Name(), dgst, err) + goto fallback + } + + if available { + atomic.AddUint64(&layerInfoCacheMetrics.Exists.Hits, 1) + return true, nil + } + +fallback: + atomic.AddUint64(&layerInfoCacheMetrics.Exists.Misses, 1) + exists, err := lc.LayerService.Exists(dgst) + if err != nil { + return exists, err + } + + if exists { + // we can only cache this if the existence is positive. + if err := lc.cache.Add(lc.ctx, lc.repository.Name(), dgst); err != nil { + ctxu.GetLogger(lc.ctx).Errorf("error adding %v@%v to cache: %v", lc.repository.Name(), dgst, err) + } + } + + return exists, err +} + +// Fetch checks for the availability of the layer in the repository via the +// cache. If present, the metadata is resolved and the layer is returned. If +// any operation fails, the layer is read directly from the upstream. The +// results are cached, if possible. +func (lc *cachedLayerService) Fetch(dgst digest.Digest) (distribution.Layer, error) { + ctxu.GetLogger(lc.ctx).Debugf("(*layerInfoCache).Fetch(%q)", dgst) + now := time.Now() + defer func() { + ctxu.GetLoggerWithField(lc.ctx, "blob.fetch.duration", time.Since(now)). + Infof("(*layerInfoCache).Fetch(%q)", dgst) + }() + + atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Requests, 1) + available, err := lc.cache.Contains(lc.ctx, lc.repository.Name(), dgst) + if err != nil { + ctxu.GetLogger(lc.ctx).Errorf("error checking availability of %v@%v: %v", lc.repository.Name(), dgst, err) + goto fallback + } + + if available { + // fast path: get the layer info and return + meta, err := lc.cache.Meta(lc.ctx, dgst) + if err != nil { + ctxu.GetLogger(lc.ctx).Errorf("error fetching %v@%v from cache: %v", lc.repository.Name(), dgst, err) + goto fallback + } + + atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Hits, 1) + return newLayerReader(lc.driver, dgst, meta.Path, meta.Length) + } + + // NOTE(stevvooe): Unfortunately, the cache here only makes checks for + // existing layers faster. We'd have to provide more careful + // synchronization with the backend to make the missing case as fast. + +fallback: + atomic.AddUint64(&layerInfoCacheMetrics.Fetch.Misses, 1) + layer, err := lc.LayerService.Fetch(dgst) + if err != nil { + return nil, err + } + + // add the layer to the repository + if err := lc.cache.Add(lc.ctx, lc.repository.Name(), dgst); err != nil { + ctxu.GetLogger(lc.ctx). + Errorf("error caching repository relationship for %v@%v: %v", lc.repository.Name(), dgst, err) + } + + // lookup layer path and add it to the cache, if it succeds. Note that we + // still return the layer even if we have trouble caching it. + if path, err := lc.resolveLayerPath(layer); err != nil { + ctxu.GetLogger(lc.ctx). + Errorf("error resolving path while caching %v@%v: %v", lc.repository.Name(), dgst, err) + } else { + // add the layer to the cache once we've resolved the path. + if err := lc.cache.SetMeta(lc.ctx, dgst, cache.LayerMeta{Path: path, Length: layer.Length()}); err != nil { + ctxu.GetLogger(lc.ctx).Errorf("error adding meta for %v@%v to cache: %v", lc.repository.Name(), dgst, err) + } + } + + return layer, err +} + +// extractLayerInfo pulls the layerInfo from the layer, attempting to get the +// path information from either the concrete object or by resolving the +// primary blob store path. +func (lc *cachedLayerService) resolveLayerPath(layer distribution.Layer) (path string, err error) { + // try and resolve the type and driver, so we don't have to traverse links + switch v := layer.(type) { + case *layerReader: + // only set path if we have same driver instance. + if v.driver == lc.driver { + return v.path, nil + } + } + + ctxu.GetLogger(lc.ctx).Warnf("resolving layer path during cache lookup (%v@%v)", lc.repository.Name(), layer.Digest()) + // we have to do an expensive stat to resolve the layer location but no + // need to check the link, since we already have layer instance for this + // repository. + bp, err := lc.blobStore.path(layer.Digest()) + if err != nil { + return "", err + } + + return bp, nil +} + +// layerInfoCacheMetrics keeps track of cache metrics for layer info cache +// requests. Note this is kept globally and made available via expvar. For +// more detailed metrics, its recommend to instrument a particular cache +// implementation. +var layerInfoCacheMetrics struct { + // Exists tracks calls to the Exists caches. + Exists struct { + Requests uint64 + Hits uint64 + Misses uint64 + } + + // Fetch tracks calls to the fetch caches. + Fetch struct { + Requests uint64 + Hits uint64 + Misses uint64 + } +} + +func init() { + registry := expvar.Get("registry") + if registry == nil { + registry = expvar.NewMap("registry") + } + + cache := registry.(*expvar.Map).Get("cache") + if cache == nil { + cache = &expvar.Map{} + cache.(*expvar.Map).Init() + registry.(*expvar.Map).Set("cache", cache) + } + + storage := cache.(*expvar.Map).Get("storage") + if storage == nil { + storage = &expvar.Map{} + storage.(*expvar.Map).Init() + cache.(*expvar.Map).Set("storage", storage) + } + + storage.(*expvar.Map).Set("layerinfo", expvar.Func(func() interface{} { + // no need for synchronous access: the increments are atomic and + // during reading, we don't care if the data is up to date. The + // numbers will always *eventually* be reported correctly. + return layerInfoCacheMetrics + })) +} diff --git a/docs/storage/layerreader.go b/docs/storage/layerreader.go index 414951d9..40deba6a 100644 --- a/docs/storage/layerreader.go +++ b/docs/storage/layerreader.go @@ -17,6 +17,21 @@ type layerReader struct { digest digest.Digest } +// newLayerReader returns a new layerReader with the digest, path and length, +// eliding round trips to the storage backend. +func newLayerReader(driver driver.StorageDriver, dgst digest.Digest, path string, length int64) (*layerReader, error) { + fr := &fileReader{ + driver: driver, + path: path, + size: length, + } + + return &layerReader{ + fileReader: *fr, + digest: dgst, + }, nil +} + var _ distribution.Layer = &layerReader{} func (lr *layerReader) Digest() digest.Digest { diff --git a/docs/storage/manifeststore_test.go b/docs/storage/manifeststore_test.go index dc03dced..fe75868b 100644 --- a/docs/storage/manifeststore_test.go +++ b/docs/storage/manifeststore_test.go @@ -6,6 +6,8 @@ import ( "reflect" "testing" + "github.com/docker/distribution/registry/storage/cache" + "github.com/docker/distribution" "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest" @@ -28,7 +30,7 @@ type manifestStoreTestEnv struct { func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv { ctx := context.Background() driver := inmemory.New() - registry := NewRegistryWithDriver(driver) + registry := NewRegistryWithDriver(driver, cache.NewInMemoryLayerInfoCache()) repo, err := registry.Repository(ctx, name) if err != nil { diff --git a/docs/storage/registry.go b/docs/storage/registry.go index 8d7ea16e..9ad43acb 100644 --- a/docs/storage/registry.go +++ b/docs/storage/registry.go @@ -3,6 +3,7 @@ package storage import ( "github.com/docker/distribution" "github.com/docker/distribution/registry/api/v2" + "github.com/docker/distribution/registry/storage/cache" storagedriver "github.com/docker/distribution/registry/storage/driver" "golang.org/x/net/context" ) @@ -10,28 +11,29 @@ import ( // registry is the top-level implementation of Registry for use in the storage // package. All instances should descend from this object. type registry struct { - driver storagedriver.StorageDriver - pm *pathMapper - blobStore *blobStore + driver storagedriver.StorageDriver + pm *pathMapper + blobStore *blobStore + layerInfoCache cache.LayerInfoCache } // NewRegistryWithDriver creates a new registry instance from the provided // driver. The resulting registry may be shared by multiple goroutines but is // cheap to allocate. -func NewRegistryWithDriver(driver storagedriver.StorageDriver) distribution.Registry { - bs := &blobStore{} +func NewRegistryWithDriver(driver storagedriver.StorageDriver, layerInfoCache cache.LayerInfoCache) distribution.Registry { + bs := &blobStore{ + driver: driver, + pm: defaultPathMapper, + } - reg := ®istry{ + return ®istry{ driver: driver, blobStore: bs, // TODO(sday): This should be configurable. - pm: defaultPathMapper, + pm: defaultPathMapper, + layerInfoCache: layerInfoCache, } - - reg.blobStore.registry = reg - - return reg } // Repository returns an instance of the repository tied to the registry. @@ -83,9 +85,29 @@ func (repo *repository) Manifests() distribution.ManifestService { // may be context sensitive in the future. The instance should be used similar // to a request local. func (repo *repository) Layers() distribution.LayerService { - return &layerStore{ + ls := &layerStore{ repository: repo, } + + if repo.registry.layerInfoCache != nil { + // TODO(stevvooe): This is not the best place to setup a cache. We would + // really like to decouple the cache from the backend but also have the + // manifeset service use the layer service cache. For now, we can simply + // integrate the cache directly. The main issue is that we have layer + // access and layer data coupled in a single object. Work is already under + // way to decouple this. + + return &cachedLayerService{ + LayerService: ls, + repository: repo, + ctx: repo.ctx, + driver: repo.driver, + blobStore: repo.blobStore, + cache: repo.registry.layerInfoCache, + } + } + + return ls } func (repo *repository) Signatures() distribution.SignatureService {