From 4c7c63b5575f1418ee41a4fcf3438a17149f5f93 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Wed, 4 Mar 2020 17:51:02 -0800 Subject: [PATCH 1/6] Add cache unit test Test base functionality of the cache statter Signed-off-by: Derek McGowan --- registry/storage/cache/cache_test.go | 131 +++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 registry/storage/cache/cache_test.go diff --git a/registry/storage/cache/cache_test.go b/registry/storage/cache/cache_test.go new file mode 100644 index 00000000..2e7f0556 --- /dev/null +++ b/registry/storage/cache/cache_test.go @@ -0,0 +1,131 @@ +package cache + +import ( + "context" + "errors" + "testing" + + "github.com/docker/distribution" + digest "github.com/opencontainers/go-digest" +) + +func TestCacheSet(t *testing.T) { + cache := newTestStatter() + backend := newTestStatter() + st := NewCachedBlobStatter(cache, backend) + ctx := context.Background() + + dgst := digest.Digest("dontvalidate") + _, err := st.Stat(ctx, dgst) + if err != distribution.ErrBlobUnknown { + t.Fatalf("Unexpected error %v, expected %v", err, distribution.ErrBlobUnknown) + } + + desc := distribution.Descriptor{ + Digest: dgst, + } + if err := backend.SetDescriptor(ctx, dgst, desc); err != nil { + t.Fatal(err) + } + + actual, err := st.Stat(ctx, dgst) + if err != nil { + t.Fatal(err) + } + if actual.Digest != desc.Digest { + t.Fatalf("Unexpected descriptor %v, expected %v", actual, desc) + } + + if len(cache.sets) != 1 || len(cache.sets[dgst]) == 0 { + t.Fatalf("Expected cache set") + } + if cache.sets[dgst][0].Digest != desc.Digest { + t.Fatalf("Unexpected descriptor %v, expected %v", cache.sets[dgst][0], desc) + } + + desc2 := distribution.Descriptor{ + Digest: digest.Digest("dontvalidate 2"), + } + cache.sets[dgst] = append(cache.sets[dgst], desc2) + + actual, err = st.Stat(ctx, dgst) + if err != nil { + t.Fatal(err) + } + if actual.Digest != desc2.Digest { + t.Fatalf("Unexpected descriptor %v, expected %v", actual, desc) + } +} + +func TestCacheError(t *testing.T) { + cache := newErrTestStatter(errors.New("cache error")) + backend := newTestStatter() + st := NewCachedBlobStatter(cache, backend) + ctx := context.Background() + + dgst := digest.Digest("dontvalidate") + _, err := st.Stat(ctx, dgst) + if err != distribution.ErrBlobUnknown { + t.Fatalf("Unexpected error %v, expected %v", err, distribution.ErrBlobUnknown) + } + + desc := distribution.Descriptor{ + Digest: dgst, + } + if err := backend.SetDescriptor(ctx, dgst, desc); err != nil { + t.Fatal(err) + } + + actual, err := st.Stat(ctx, dgst) + if err != nil { + t.Fatal(err) + } + if actual.Digest != desc.Digest { + t.Fatalf("Unexpected descriptor %v, expected %v", actual, desc) + } + + if len(cache.sets) > 0 { + t.Fatalf("Set should not be called after stat error") + } +} + +func newTestStatter() *testStatter { + return &testStatter{ + stats: []digest.Digest{}, + sets: map[digest.Digest][]distribution.Descriptor{}, + } +} + +func newErrTestStatter(err error) *testStatter { + return &testStatter{ + sets: map[digest.Digest][]distribution.Descriptor{}, + err: err, + } +} + +type testStatter struct { + stats []digest.Digest + sets map[digest.Digest][]distribution.Descriptor + err error +} + +func (s *testStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + if s.err != nil { + return distribution.Descriptor{}, s.err + } + + if set := s.sets[dgst]; len(set) > 0 { + return set[len(set)-1], nil + } + + return distribution.Descriptor{}, distribution.ErrBlobUnknown +} + +func (s *testStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { + s.sets[dgst] = append(s.sets[dgst], desc) + return s.err +} + +func (s *testStatter) Clear(ctx context.Context, dgst digest.Digest) error { + return s.err +} From ce101280fe93f446650d8dddb7e9d992fdbf467e Mon Sep 17 00:00:00 2001 From: Manish Tomar Date: Tue, 19 Nov 2019 15:31:25 -0800 Subject: [PATCH 2/6] fix redis caching issue * fix redis caching issue earlier redis cache was updated when there was any error including any temporary connectivity issue. This would trigger set calls which would further increase load and possibly connectivity errors from redis leaving the system with continuous errors and high latency. Now the cache is updated only when it is genuine cache miss. Other errors do not trigger a cache update. * add back tracker Hit() and Miss() calls *squashed commits* (cherry picked from commit 6f3e1c10260ef59ba4e9c42e939329fad9fdd8c3) (cherry picked from commit 6738ff3320cf82cc2df919a95a1bde2f7789a501) Signed-off-by: Derek McGowan --- .../cache/cachedblobdescriptorstore.go | 49 +++++++++++-------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/registry/storage/cache/cachedblobdescriptorstore.go b/registry/storage/cache/cachedblobdescriptorstore.go index ac4c4521..8694b704 100644 --- a/registry/storage/cache/cachedblobdescriptorstore.go +++ b/registry/storage/cache/cachedblobdescriptorstore.go @@ -4,6 +4,7 @@ import ( "context" "github.com/docker/distribution" + dcontext "github.com/docker/distribution/context" prometheus "github.com/docker/distribution/metrics" "github.com/opencontainers/go-digest" ) @@ -65,35 +66,41 @@ func NewCachedBlobStatterWithMetrics(cache distribution.BlobDescriptorService, b func (cbds *cachedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { cacheCount.WithValues("Request").Inc(1) - desc, err := cbds.cache.Stat(ctx, dgst) - if err != nil { - if err != distribution.ErrBlobUnknown { - logErrorf(ctx, cbds.tracker, "error retrieving descriptor from cache: %v", err) - } - goto fallback + // try getting from cache + desc, cacheErr := cbds.cache.Stat(ctx, dgst) + if cacheErr == nil { + cacheCount.WithValues("Hit").Inc(1) + if cbds.tracker != nil { + cbds.tracker.Hit() + } + return desc, nil } - cacheCount.WithValues("Hit").Inc(1) - if cbds.tracker != nil { - cbds.tracker.Hit() - } - return desc, nil -fallback: - cacheCount.WithValues("Miss").Inc(1) - if cbds.tracker != nil { - cbds.tracker.Miss() - } - desc, err = cbds.backend.Stat(ctx, dgst) + + // couldn't get from cache; get from backend + desc, err := cbds.backend.Stat(ctx, dgst) if err != nil { return desc, err } - if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil { - logErrorf(ctx, cbds.tracker, "error adding descriptor %v to cache: %v", desc.Digest, err) + if cacheErr == distribution.ErrBlobUnknown { + // cache doesn't have info. update it with info got from backend + cacheCount.WithValues("Miss").Inc(1) + if cbds.tracker != nil { + cbds.tracker.Miss() + } + if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil { + dcontext.GetLoggerWithField(ctx, "blob", dgst).WithError(err).Error("error from cache setting desc") + } + // we don't need to return cache error upstream if any. continue returning value from backend + return desc, nil } - return desc, err + // unknown error from cache. just log and error. do not store cache as it may be trigger many set calls + dcontext.GetLoggerWithField(ctx, "blob", dgst).WithError(cacheErr).Error("error from cache stat(ing) blob") + cacheCount.WithValues("Error").Inc(1) + return desc, nil } func (cbds *cachedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) error { @@ -111,7 +118,7 @@ func (cbds *cachedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) er func (cbds *cachedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil { - logErrorf(ctx, cbds.tracker, "error adding descriptor %v to cache: %v", desc.Digest, err) + dcontext.GetLoggerWithField(ctx, "blob", dgst).WithError(err).Error("error from cache setting desc") } return nil } From 795892662b69d79c730d1f9028314ac081cfe5af Mon Sep 17 00:00:00 2001 From: Manish Tomar Date: Tue, 19 Nov 2019 15:43:59 -0800 Subject: [PATCH 3/6] redis metrics * redis metrics it is working but metrics are not very useful since default buckets start from 5ms and almost all of them are in that range. * remove extra comment (cherry picked from commit ba1a1d74e7eb047dd1056548ccf0695e8846782c) Signed-off-by: Derek McGowan --- registry/storage/cache/metrics/prom.go | 68 ++++++++++++++++++++++++++ registry/storage/cache/redis/redis.go | 11 +++-- 2 files changed, 76 insertions(+), 3 deletions(-) create mode 100644 registry/storage/cache/metrics/prom.go diff --git a/registry/storage/cache/metrics/prom.go b/registry/storage/cache/metrics/prom.go new file mode 100644 index 00000000..22e847b6 --- /dev/null +++ b/registry/storage/cache/metrics/prom.go @@ -0,0 +1,68 @@ +package metrics + +import ( + "context" + "github.com/docker/distribution" + "github.com/docker/distribution/registry/storage/cache" + "github.com/docker/go-metrics" + "github.com/opencontainers/go-digest" + prometheus "github.com/docker/distribution/metrics" + "time" +) + +type prometheusCacheProvider struct { + cache.BlobDescriptorCacheProvider + latencyTimer metrics.LabeledTimer +} + +func NewPrometheusCacheProvider(wrap cache.BlobDescriptorCacheProvider, name, help string) cache.BlobDescriptorCacheProvider { + return &prometheusCacheProvider{ + wrap, + // TODO: May want to have fine grained buckets since redis calls are generally <1ms and the default minimum bucket is 5ms. + prometheus.StorageNamespace.NewLabeledTimer(name, help, "operation"), + } +} + +func (p *prometheusCacheProvider) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + start := time.Now() + d, e := p.BlobDescriptorCacheProvider.Stat(ctx, dgst) + p.latencyTimer.WithValues("Stat").UpdateSince(start) + return d, e +} + +func (p *prometheusCacheProvider) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { + start := time.Now() + e := p.BlobDescriptorCacheProvider.SetDescriptor(ctx, dgst, desc) + p.latencyTimer.WithValues("SetDescriptor").UpdateSince(start) + return e +} + +type prometheusRepoCacheProvider struct { + distribution.BlobDescriptorService + latencyTimer metrics.LabeledTimer +} + +func (p *prometheusRepoCacheProvider) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + start := time.Now() + d, e := p.BlobDescriptorService.Stat(ctx, dgst) + p.latencyTimer.WithValues("RepoStat").UpdateSince(start) + return d, e +} + +func (p *prometheusRepoCacheProvider) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error { + start := time.Now() + e := p.BlobDescriptorService.SetDescriptor(ctx, dgst, desc) + p.latencyTimer.WithValues("RepoSetDescriptor").UpdateSince(start) + return e +} + +func (p *prometheusCacheProvider) RepositoryScoped(repo string) (distribution.BlobDescriptorService, error) { + s, err := p.BlobDescriptorCacheProvider.RepositoryScoped(repo) + if err != nil { + return nil, err + } + return &prometheusRepoCacheProvider{ + s, + p.latencyTimer, + }, nil +} \ No newline at end of file diff --git a/registry/storage/cache/redis/redis.go b/registry/storage/cache/redis/redis.go index 550d703b..531fae2f 100644 --- a/registry/storage/cache/redis/redis.go +++ b/registry/storage/cache/redis/redis.go @@ -3,6 +3,7 @@ package redis import ( "context" "fmt" + "github.com/docker/distribution/registry/storage/cache/metrics" "github.com/docker/distribution" "github.com/docker/distribution/reference" @@ -34,9 +35,13 @@ type redisBlobDescriptorService struct { // NewRedisBlobDescriptorCacheProvider returns a new redis-based // BlobDescriptorCacheProvider using the provided redis connection pool. func NewRedisBlobDescriptorCacheProvider(pool *redis.Pool) cache.BlobDescriptorCacheProvider { - return &redisBlobDescriptorService{ - pool: pool, - } + return metrics.NewPrometheusCacheProvider( + &redisBlobDescriptorService{ + pool: pool, + }, + "cache_redis", + "Number of seconds taken by redis", + ) } // RepositoryScoped returns the scoped cache. From 74d442a058a9c83cc01a74565e50b5c79b23815a Mon Sep 17 00:00:00 2001 From: Grant Watters Date: Mon, 10 Feb 2020 10:57:52 -0800 Subject: [PATCH 4/6] Consider redis.ErrNil as distribution.ErrBlobUnknown for Stat HGET * Update redis.go If the dgst key does not exist in the cache when calling HGET, `redis.String` will return an `ErrNil` which we need to translate into `distribution.ErrBlobUnknown` so that the error being returned can be properly handled. This will ensure that `SetDescriptor` is properly called from `cachedBlobStatter::Stat` for `repositoryScopedRedisBlobDescriptorService` which will update the redis cache and be considered as a Miss rather than an Error. cc @manishtomar * Update suite.go Add unit test to ensure missing blobs for scoped repo properly return ErrBlobUnknown when HGET returns redis.ErrNil. (cherry picked from commit dca6b9526a1d30dd218a9f321c4f84ecc4b5e62e) Signed-off-by: Derek McGowan --- registry/storage/cache/cachecheck/suite.go | 4 ++++ registry/storage/cache/redis/redis.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/registry/storage/cache/cachecheck/suite.go b/registry/storage/cache/cachecheck/suite.go index d241bd04..12d6e45d 100644 --- a/registry/storage/cache/cachecheck/suite.go +++ b/registry/storage/cache/cachecheck/suite.go @@ -54,6 +54,10 @@ func checkBlobDescriptorCacheEmptyRepository(ctx context.Context, t *testing.T, t.Fatalf("expected error checking for cache item with empty digest: %v", err) } + if _, err := cache.Stat(ctx, "sha384:cba111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"); err != distribution.ErrBlobUnknown { + t.Fatalf("expected unknown blob error with uncached repo: %v", err) + } + if _, err := cache.Stat(ctx, "sha384:abc111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"); err != distribution.ErrBlobUnknown { t.Fatalf("expected unknown blob error with empty repo: %v", err) } diff --git a/registry/storage/cache/redis/redis.go b/registry/storage/cache/redis/redis.go index 531fae2f..8d32aa48 100644 --- a/registry/storage/cache/redis/redis.go +++ b/registry/storage/cache/redis/redis.go @@ -186,6 +186,10 @@ func (rsrbds *repositoryScopedRedisBlobDescriptorService) Stat(ctx context.Conte // We allow a per repository mediatype, let's look it up here. mediatype, err := redis.String(conn.Do("HGET", rsrbds.blobDescriptorHashKey(dgst), "mediatype")) if err != nil { + if err == redis.ErrNil { + return distribution.Descriptor{}, distribution.ErrBlobUnknown + } + return distribution.Descriptor{}, err } From 495a4af7cf597f564abe98ba46d2c7a841899c7e Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Wed, 4 Mar 2020 14:41:00 -0800 Subject: [PATCH 5/6] Fix goimports Separate fix for cherry-picked code Signed-off-by: Derek McGowan --- registry/storage/cache/metrics/prom.go | 7 ++++--- registry/storage/cache/redis/redis.go | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/registry/storage/cache/metrics/prom.go b/registry/storage/cache/metrics/prom.go index 22e847b6..7b50e93f 100644 --- a/registry/storage/cache/metrics/prom.go +++ b/registry/storage/cache/metrics/prom.go @@ -2,12 +2,13 @@ package metrics import ( "context" + "time" + "github.com/docker/distribution" + prometheus "github.com/docker/distribution/metrics" "github.com/docker/distribution/registry/storage/cache" "github.com/docker/go-metrics" "github.com/opencontainers/go-digest" - prometheus "github.com/docker/distribution/metrics" - "time" ) type prometheusCacheProvider struct { @@ -65,4 +66,4 @@ func (p *prometheusCacheProvider) RepositoryScoped(repo string) (distribution.Bl s, p.latencyTimer, }, nil -} \ No newline at end of file +} diff --git a/registry/storage/cache/redis/redis.go b/registry/storage/cache/redis/redis.go index 8d32aa48..ed03f9f0 100644 --- a/registry/storage/cache/redis/redis.go +++ b/registry/storage/cache/redis/redis.go @@ -3,11 +3,11 @@ package redis import ( "context" "fmt" - "github.com/docker/distribution/registry/storage/cache/metrics" "github.com/docker/distribution" "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/storage/cache" + "github.com/docker/distribution/registry/storage/cache/metrics" "github.com/garyburd/redigo/redis" "github.com/opencontainers/go-digest" ) From be29c05a1e82b084f2227596223df8254840c9a1 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Wed, 4 Mar 2020 14:37:31 -0800 Subject: [PATCH 6/6] Remove deprecated cache metrics code The metrics tracker in cached blob statter was replaced with prometheus metrics and no longer needed. Remove unused log wrapping. Signed-off-by: Derek McGowan --- registry/storage/blobcachemetrics.go | 66 ------------------- .../cache/cachedblobdescriptorstore.go | 63 ++---------------- 2 files changed, 4 insertions(+), 125 deletions(-) delete mode 100644 registry/storage/blobcachemetrics.go diff --git a/registry/storage/blobcachemetrics.go b/registry/storage/blobcachemetrics.go deleted file mode 100644 index 238b5806..00000000 --- a/registry/storage/blobcachemetrics.go +++ /dev/null @@ -1,66 +0,0 @@ -package storage - -import ( - "context" - "expvar" - "sync/atomic" - - dcontext "github.com/docker/distribution/context" - "github.com/docker/distribution/registry/storage/cache" -) - -type blobStatCollector struct { - metrics cache.Metrics -} - -func (bsc *blobStatCollector) Hit() { - atomic.AddUint64(&bsc.metrics.Requests, 1) - atomic.AddUint64(&bsc.metrics.Hits, 1) -} - -func (bsc *blobStatCollector) Miss() { - atomic.AddUint64(&bsc.metrics.Requests, 1) - atomic.AddUint64(&bsc.metrics.Misses, 1) -} - -func (bsc *blobStatCollector) Metrics() cache.Metrics { - return bsc.metrics -} - -func (bsc *blobStatCollector) Logger(ctx context.Context) cache.Logger { - return dcontext.GetLogger(ctx) -} - -// blobStatterCacheMetrics keeps track of cache metrics for blob descriptor -// cache requests. Note this is kept globally and made available via expvar. -// For more detailed metrics, its recommend to instrument a particular cache -// implementation. -var blobStatterCacheMetrics cache.MetricsTracker = &blobStatCollector{} - -func init() { - registry := expvar.Get("registry") - if registry == nil { - registry = expvar.NewMap("registry") - } - - cache := registry.(*expvar.Map).Get("cache") - if cache == nil { - cache = &expvar.Map{} - cache.(*expvar.Map).Init() - registry.(*expvar.Map).Set("cache", cache) - } - - storage := cache.(*expvar.Map).Get("storage") - if storage == nil { - storage = &expvar.Map{} - storage.(*expvar.Map).Init() - cache.(*expvar.Map).Set("storage", storage) - } - - storage.(*expvar.Map).Set("blobdescriptor", expvar.Func(func() interface{} { - // no need for synchronous access: the increments are atomic and - // during reading, we don't care if the data is up to date. The - // numbers will always *eventually* be reported correctly. - return blobStatterCacheMetrics - })) -} diff --git a/registry/storage/cache/cachedblobdescriptorstore.go b/registry/storage/cache/cachedblobdescriptorstore.go index 8694b704..f25d68d9 100644 --- a/registry/storage/cache/cachedblobdescriptorstore.go +++ b/registry/storage/cache/cachedblobdescriptorstore.go @@ -9,35 +9,9 @@ import ( "github.com/opencontainers/go-digest" ) -// Metrics is used to hold metric counters -// related to the number of times a cache was -// hit or missed. -type Metrics struct { - Requests uint64 - Hits uint64 - Misses uint64 -} - -// Logger can be provided on the MetricsTracker to log errors. -// -// Usually, this is just a proxy to dcontext.GetLogger. -type Logger interface { - Errorf(format string, args ...interface{}) -} - -// MetricsTracker represents a metric tracker -// which simply counts the number of hits and misses. -type MetricsTracker interface { - Hit() - Miss() - Metrics() Metrics - Logger(context.Context) Logger -} - type cachedBlobStatter struct { cache distribution.BlobDescriptorService backend distribution.BlobDescriptorService - tracker MetricsTracker } var ( @@ -54,16 +28,6 @@ func NewCachedBlobStatter(cache distribution.BlobDescriptorService, backend dist } } -// NewCachedBlobStatterWithMetrics creates a new statter which prefers a cache and -// falls back to a backend. Hits and misses will send to the tracker. -func NewCachedBlobStatterWithMetrics(cache distribution.BlobDescriptorService, backend distribution.BlobDescriptorService, tracker MetricsTracker) distribution.BlobStatter { - return &cachedBlobStatter{ - cache: cache, - backend: backend, - tracker: tracker, - } -} - func (cbds *cachedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { cacheCount.WithValues("Request").Inc(1) @@ -71,9 +35,6 @@ func (cbds *cachedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (di desc, cacheErr := cbds.cache.Stat(ctx, dgst) if cacheErr == nil { cacheCount.WithValues("Hit").Inc(1) - if cbds.tracker != nil { - cbds.tracker.Hit() - } return desc, nil } @@ -86,20 +47,16 @@ func (cbds *cachedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (di if cacheErr == distribution.ErrBlobUnknown { // cache doesn't have info. update it with info got from backend cacheCount.WithValues("Miss").Inc(1) - if cbds.tracker != nil { - cbds.tracker.Miss() - } if err := cbds.cache.SetDescriptor(ctx, dgst, desc); err != nil { dcontext.GetLoggerWithField(ctx, "blob", dgst).WithError(err).Error("error from cache setting desc") } // we don't need to return cache error upstream if any. continue returning value from backend - return desc, nil + } else { + // unknown error from cache. just log and error. do not store cache as it may be trigger many set calls + dcontext.GetLoggerWithField(ctx, "blob", dgst).WithError(cacheErr).Error("error from cache stat(ing) blob") + cacheCount.WithValues("Error").Inc(1) } - // unknown error from cache. just log and error. do not store cache as it may be trigger many set calls - dcontext.GetLoggerWithField(ctx, "blob", dgst).WithError(cacheErr).Error("error from cache stat(ing) blob") - cacheCount.WithValues("Error").Inc(1) - return desc, nil } @@ -122,15 +79,3 @@ func (cbds *cachedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Di } return nil } - -func logErrorf(ctx context.Context, tracker MetricsTracker, format string, args ...interface{}) { - if tracker == nil { - return - } - - logger := tracker.Logger(ctx) - if logger == nil { - return - } - logger.Errorf(format, args...) -}