Decouple redis dependency from blob descriptor cache
Ensure that clients can use the blob descriptor cache provider without needing the redis package. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
fd982a8bd5
commit
812c8099a6
12 changed files with 53 additions and 50 deletions
222
docs/storage/cache/redis/redis.go
vendored
Normal file
222
docs/storage/cache/redis/redis.go
vendored
Normal file
|
@ -0,0 +1,222 @@
|
|||
package redis
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/docker/distribution"
|
||||
"github.com/docker/distribution/context"
|
||||
"github.com/docker/distribution/digest"
|
||||
"github.com/docker/distribution/registry/api/v2"
|
||||
"github.com/docker/distribution/registry/storage/cache"
|
||||
"github.com/garyburd/redigo/redis"
|
||||
)
|
||||
|
||||
// redisBlobStatService provides an implementation of
|
||||
// BlobDescriptorCacheProvider based on redis. Blob descritors are stored in
|
||||
// two parts. The first provide fast access to repository membership through a
|
||||
// redis set for each repo. The second is a redis hash keyed by the digest of
|
||||
// the layer, providing path, length and mediatype information. There is also
|
||||
// a per-repository redis hash of the blob descriptor, allowing override of
|
||||
// data. This is currently used to override the mediatype on a per-repository
|
||||
// basis.
|
||||
//
|
||||
// Note that there is no implied relationship between these two caches. The
|
||||
// layer may exist in one, both or none and the code must be written this way.
|
||||
type redisBlobDescriptorService struct {
|
||||
pool *redis.Pool
|
||||
|
||||
// TODO(stevvooe): We use a pool because we don't have great control over
|
||||
// the cache lifecycle to manage connections. A new connection if fetched
|
||||
// for each operation. Once we have better lifecycle management of the
|
||||
// request objects, we can change this to a connection.
|
||||
}
|
||||
|
||||
// NewRedisBlobDescriptorCacheProvider returns a new redis-based
|
||||
// BlobDescriptorCacheProvider using the provided redis connection pool.
|
||||
func NewRedisBlobDescriptorCacheProvider(pool *redis.Pool) cache.BlobDescriptorCacheProvider {
|
||||
return &redisBlobDescriptorService{
|
||||
pool: pool,
|
||||
}
|
||||
}
|
||||
|
||||
// RepositoryScoped returns the scoped cache.
|
||||
func (rbds *redisBlobDescriptorService) RepositoryScoped(repo string) (distribution.BlobDescriptorService, error) {
|
||||
if err := v2.ValidateRepositoryName(repo); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &repositoryScopedRedisBlobDescriptorService{
|
||||
repo: repo,
|
||||
upstream: rbds,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Stat retrieves the descriptor data from the redis hash entry.
|
||||
func (rbds *redisBlobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
if err := dgst.Validate(); err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
conn := rbds.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
return rbds.stat(ctx, conn, dgst)
|
||||
}
|
||||
|
||||
// stat provides an internal stat call that takes a connection parameter. This
|
||||
// allows some internal management of the connection scope.
|
||||
func (rbds *redisBlobDescriptorService) stat(ctx context.Context, conn redis.Conn, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
reply, err := redis.Values(conn.Do("HMGET", rbds.blobDescriptorHashKey(dgst), "digest", "length", "mediatype"))
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
if len(reply) < 2 || reply[0] == nil || reply[1] == nil { // don't care if mediatype is nil
|
||||
return distribution.Descriptor{}, distribution.ErrBlobUnknown
|
||||
}
|
||||
|
||||
var desc distribution.Descriptor
|
||||
if _, err := redis.Scan(reply, &desc.Digest, &desc.Length, &desc.MediaType); err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
return desc, nil
|
||||
}
|
||||
|
||||
// SetDescriptor sets the descriptor data for the given digest using a redis
|
||||
// hash. A hash is used here since we may store unrelated fields about a layer
|
||||
// in the future.
|
||||
func (rbds *redisBlobDescriptorService) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
if err := dgst.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := cache.ValidateDescriptor(desc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn := rbds.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
return rbds.setDescriptor(ctx, conn, dgst, desc)
|
||||
}
|
||||
|
||||
func (rbds *redisBlobDescriptorService) setDescriptor(ctx context.Context, conn redis.Conn, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
if _, err := conn.Do("HMSET", rbds.blobDescriptorHashKey(dgst),
|
||||
"digest", desc.Digest,
|
||||
"length", desc.Length); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Only set mediatype if not already set.
|
||||
if _, err := conn.Do("HSETNX", rbds.blobDescriptorHashKey(dgst),
|
||||
"mediatype", desc.MediaType); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rbds *redisBlobDescriptorService) blobDescriptorHashKey(dgst digest.Digest) string {
|
||||
return "blobs::" + dgst.String()
|
||||
}
|
||||
|
||||
type repositoryScopedRedisBlobDescriptorService struct {
|
||||
repo string
|
||||
upstream *redisBlobDescriptorService
|
||||
}
|
||||
|
||||
var _ distribution.BlobDescriptorService = &repositoryScopedRedisBlobDescriptorService{}
|
||||
|
||||
// Stat ensures that the digest is a member of the specified repository and
|
||||
// forwards the descriptor request to the global blob store. If the media type
|
||||
// differs for the repository, we override it.
|
||||
func (rsrbds *repositoryScopedRedisBlobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
if err := dgst.Validate(); err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
conn := rsrbds.upstream.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
// Check membership to repository first
|
||||
member, err := redis.Bool(conn.Do("SISMEMBER", rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst))
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
if !member {
|
||||
return distribution.Descriptor{}, distribution.ErrBlobUnknown
|
||||
}
|
||||
|
||||
upstream, err := rsrbds.upstream.stat(ctx, conn, dgst)
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
// We allow a per repository mediatype, let's look it up here.
|
||||
mediatype, err := redis.String(conn.Do("HGET", rsrbds.blobDescriptorHashKey(dgst), "mediatype"))
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
if mediatype != "" {
|
||||
upstream.MediaType = mediatype
|
||||
}
|
||||
|
||||
return upstream, nil
|
||||
}
|
||||
|
||||
func (rsrbds *repositoryScopedRedisBlobDescriptorService) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
if err := dgst.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := cache.ValidateDescriptor(desc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if dgst != desc.Digest {
|
||||
if dgst.Algorithm() == desc.Digest.Algorithm() {
|
||||
return fmt.Errorf("redis cache: digest for descriptors differ but algorthim does not: %q != %q", dgst, desc.Digest)
|
||||
}
|
||||
}
|
||||
|
||||
conn := rsrbds.upstream.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
return rsrbds.setDescriptor(ctx, conn, dgst, desc)
|
||||
}
|
||||
|
||||
func (rsrbds *repositoryScopedRedisBlobDescriptorService) setDescriptor(ctx context.Context, conn redis.Conn, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
if _, err := conn.Do("SADD", rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := rsrbds.upstream.setDescriptor(ctx, conn, dgst, desc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Override repository mediatype.
|
||||
if _, err := conn.Do("HSET", rsrbds.blobDescriptorHashKey(dgst), "mediatype", desc.MediaType); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Also set the values for the primary descriptor, if they differ by
|
||||
// algorithm (ie sha256 vs tarsum).
|
||||
if desc.Digest != "" && dgst != desc.Digest && dgst.Algorithm() != desc.Digest.Algorithm() {
|
||||
if err := rsrbds.setDescriptor(ctx, conn, desc.Digest, desc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rsrbds *repositoryScopedRedisBlobDescriptorService) blobDescriptorHashKey(dgst digest.Digest) string {
|
||||
return "repository::" + rsrbds.repo + "::blobs::" + dgst.String()
|
||||
}
|
||||
|
||||
func (rsrbds *repositoryScopedRedisBlobDescriptorService) repositoryBlobSetKey(repo string) string {
|
||||
return "repository::" + rsrbds.repo + "::blobs"
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue