Merge pull request #2944 from coreos-inc/joseph.schorr/QS-91/v2-caching
V2 registry blob caching
This commit is contained in:
commit
024c183f67
14 changed files with 256 additions and 32 deletions
12
app.py
12
app.py
|
@ -23,6 +23,7 @@ from data import model
|
|||
from data.archivedlogs import LogArchive
|
||||
from data.billing import Billing
|
||||
from data.buildlogs import BuildLogs
|
||||
from data.cache import InMemoryDataModelCache
|
||||
from data.model.user import LoginWrappedDBUser
|
||||
from data.queue import WorkQueue, BuildMetricQueueReporter
|
||||
from data.userevent import UserEventsBuilderModule
|
||||
|
@ -160,16 +161,7 @@ def _request_end(resp):
|
|||
if request.user_agent is not None:
|
||||
extra["user-agent"] = request.user_agent.string
|
||||
|
||||
user = get_authenticated_user()
|
||||
|
||||
if user:
|
||||
extra['user'] = {'email': user.email,
|
||||
'uuid': user.uuid,
|
||||
'org': user.organization,
|
||||
'robot': user.robot}
|
||||
|
||||
logger.info("request-end", extra=extra)
|
||||
|
||||
logger.debug('Ending request: %s', request.path)
|
||||
return resp
|
||||
|
||||
|
@ -188,6 +180,8 @@ Principal(app, use_sessions=False)
|
|||
|
||||
tf = app.config['DB_TRANSACTION_FACTORY']
|
||||
|
||||
# TODO(jschorr): make this configurable
|
||||
model_cache = InMemoryDataModelCache()
|
||||
|
||||
avatar = Avatar(app)
|
||||
login_manager = LoginManager(app)
|
||||
|
|
62
data/cache/__init__.py
vendored
Normal file
62
data/cache/__init__.py
vendored
Normal file
|
@ -0,0 +1,62 @@
|
|||
import logging
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from six import add_metaclass
|
||||
|
||||
from util.expiresdict import ExpiresDict
|
||||
from util.timedeltastring import convert_to_timedelta
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def is_not_none(value):
|
||||
return value is not None
|
||||
|
||||
|
||||
@add_metaclass(ABCMeta)
|
||||
class DataModelCache(object):
|
||||
""" Defines an interface for cache storing and returning tuple data model objects. """
|
||||
|
||||
@abstractmethod
|
||||
def retrieve(self, cache_key, loader, should_cache=is_not_none):
|
||||
""" Checks the cache for the specified cache key and returns the value found (if any). If none
|
||||
found, the loader is called to get a result and populate the cache.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class NoopDataModelCache(DataModelCache):
|
||||
""" Implementation of the data model cache which does nothing. """
|
||||
|
||||
def retrieve(self, cache_key, loader, should_cache=is_not_none):
|
||||
return loader()
|
||||
|
||||
|
||||
class InMemoryDataModelCache(DataModelCache):
|
||||
""" Implementation of the data model cache backed by an in-memory dictionary. """
|
||||
def __init__(self):
|
||||
self.cache = ExpiresDict(rebuilder=lambda: {})
|
||||
|
||||
def retrieve(self, cache_key, loader, should_cache=is_not_none):
|
||||
not_found = [None]
|
||||
logger.debug('Checking cache for key %s', cache_key.key)
|
||||
result = self.cache.get(cache_key.key, default_value=not_found)
|
||||
if result != not_found:
|
||||
logger.debug('Found result in cache for key %s: %s', cache_key.key, result)
|
||||
return result
|
||||
|
||||
logger.debug('Found no result in cache for key %s; calling loader', cache_key.key)
|
||||
result = loader()
|
||||
logger.debug('Got loaded result for key %s: %s', cache_key.key, result)
|
||||
if should_cache(result):
|
||||
logger.debug('Caching loaded result for key %s with expiration %s: %s', cache_key.key,
|
||||
result, cache_key.expiration)
|
||||
expires = convert_to_timedelta(cache_key.expiration) + datetime.now()
|
||||
self.cache.set(cache_key.key, result, expires=expires)
|
||||
logger.debug('Cached loaded result for key %s with expiration %s: %s', cache_key.key,
|
||||
result, cache_key.expiration)
|
||||
else:
|
||||
logger.debug('Not caching loaded result for key %s: %s', cache_key.key, result)
|
||||
|
||||
return result
|
8
data/cache/cache_key.py
vendored
Normal file
8
data/cache/cache_key.py
vendored
Normal file
|
@ -0,0 +1,8 @@
|
|||
from collections import namedtuple
|
||||
|
||||
class CacheKey(namedtuple('CacheKey', ['key', 'expiration'])):
|
||||
""" Defines a key into the data model cache. """
|
||||
pass
|
||||
|
||||
def for_repository_blob(namespace_name, repo_name, digest):
|
||||
return CacheKey('repository_blob:%s:%s:%s' % (namespace_name, repo_name, digest), '60s')
|
16
data/cache/test/test_cache.py
vendored
Normal file
16
data/cache/test/test_cache.py
vendored
Normal file
|
@ -0,0 +1,16 @@
|
|||
import pytest
|
||||
|
||||
from data.cache import InMemoryDataModelCache, NoopDataModelCache
|
||||
from data.cache.cache_key import CacheKey
|
||||
|
||||
@pytest.mark.parametrize('cache_type', [
|
||||
(NoopDataModelCache),
|
||||
(InMemoryDataModelCache),
|
||||
])
|
||||
def test_caching(cache_type):
|
||||
key = CacheKey('foo', '60m')
|
||||
cache = cache_type()
|
||||
|
||||
# Perform two retrievals, and make sure both return.
|
||||
assert cache.retrieve(key, lambda: 1234) == 1234
|
||||
assert cache.retrieve(key, lambda: 1234) == 1234
|
|
@ -248,12 +248,19 @@ def get_storage_by_uuid(storage_uuid):
|
|||
|
||||
def get_layer_path(storage_record):
|
||||
""" Returns the path in the storage engine to the layer data referenced by the storage row. """
|
||||
store = config.store
|
||||
if not storage_record.cas_path:
|
||||
logger.debug('Serving layer from legacy v1 path')
|
||||
return store.v1_image_layer_path(storage_record.uuid)
|
||||
return get_layer_path_for_storage(storage_record.uuid, storage_record.cas_path,
|
||||
storage_record.content_checksum)
|
||||
|
||||
return store.blob_path(storage_record.content_checksum)
|
||||
|
||||
def get_layer_path_for_storage(storage_uuid, cas_path, content_checksum):
|
||||
""" Returns the path in the storage engine to the layer data referenced by the storage
|
||||
information. """
|
||||
store = config.store
|
||||
if not cas_path:
|
||||
logger.debug('Serving layer from legacy v1 path for storage %s', storage_uuid)
|
||||
return store.v1_image_layer_path(storage_uuid)
|
||||
|
||||
return store.blob_path(content_checksum)
|
||||
|
||||
|
||||
def lookup_repo_storages_by_content_checksum(repo, checksums):
|
||||
|
|
|
@ -210,4 +210,4 @@ def test_change_tag_expiration(expiration_offset, expected_offset, initialized_d
|
|||
start_date = datetime.utcfromtimestamp(footag_updated.lifetime_start_ts)
|
||||
end_date = datetime.utcfromtimestamp(footag_updated.lifetime_end_ts)
|
||||
expected_end_date = start_date + convert_to_timedelta(expected_offset)
|
||||
assert end_date == expected_end_date
|
||||
assert (expected_end_date - end_date).total_seconds() < 5 # variance in test
|
||||
|
|
|
@ -95,21 +95,26 @@ def _require_repo_permission(permission_class, scopes=None, allow_public=False):
|
|||
def wrapped(namespace_name, repo_name, *args, **kwargs):
|
||||
logger.debug('Checking permission %s for repo: %s/%s', permission_class, namespace_name,
|
||||
repo_name)
|
||||
repository = namespace_name + '/' + repo_name
|
||||
repo = model.get_repository(namespace_name, repo_name)
|
||||
if repo is None:
|
||||
raise Unauthorized(repository=repository, scopes=scopes)
|
||||
|
||||
permission = permission_class(namespace_name, repo_name)
|
||||
if (permission.can() or (allow_public and repo.is_public)):
|
||||
if permission.can():
|
||||
return func(namespace_name, repo_name, *args, **kwargs)
|
||||
|
||||
repository = namespace_name + '/' + repo_name
|
||||
if allow_public:
|
||||
repo = model.get_repository(namespace_name, repo_name)
|
||||
if repo is None or not repo.is_public:
|
||||
raise Unauthorized(repository=repository, scopes=scopes)
|
||||
|
||||
if repo.kind != 'image':
|
||||
msg = 'This repository is for managing %s resources and not container images.' % repo.kind
|
||||
raise Unsupported(detail=msg)
|
||||
return func(namespace_name, repo_name, *args, **kwargs)
|
||||
|
||||
if repo.is_public:
|
||||
return func(namespace_name, repo_name, *args, **kwargs)
|
||||
|
||||
raise Unauthorized(repository=repository, scopes=scopes)
|
||||
|
||||
return wrapped
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
|
|
|
@ -7,9 +7,10 @@ from flask import url_for, request, redirect, Response, abort as flask_abort
|
|||
import bitmath
|
||||
import resumablehashlib
|
||||
|
||||
from app import storage, app, get_app_url, metric_queue
|
||||
from app import storage, app, get_app_url, metric_queue, model_cache
|
||||
from auth.registry_jwt_auth import process_registry_jwt_auth
|
||||
from data import database
|
||||
from data.cache import cache_key
|
||||
from digest import digest_tools
|
||||
from endpoints.decorators import anon_protect, parse_repository_name
|
||||
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
|
||||
|
@ -33,6 +34,18 @@ class _InvalidRangeHeader(Exception):
|
|||
pass
|
||||
|
||||
|
||||
def _get_repository_blob(namespace_name, repo_name, digest):
|
||||
""" Returns the blob with the given digest under the repository with the given
|
||||
name. If one does not exist (or it is still uploading), returns None.
|
||||
Automatically handles caching.
|
||||
"""
|
||||
def load_blob():
|
||||
return model.get_blob_by_digest(namespace_name, repo_name, digest)
|
||||
|
||||
blob_cache_key = cache_key.for_repository_blob(namespace_name, repo_name, digest)
|
||||
return model_cache.retrieve(blob_cache_key, load_blob)
|
||||
|
||||
|
||||
@v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD'])
|
||||
@parse_repository_name()
|
||||
@process_registry_jwt_auth(scopes=['pull'])
|
||||
|
@ -41,7 +54,7 @@ class _InvalidRangeHeader(Exception):
|
|||
@cache_control(max_age=31436000)
|
||||
def check_blob_exists(namespace_name, repo_name, digest):
|
||||
# Find the blob.
|
||||
blob = model.get_blob_by_digest(namespace_name, repo_name, digest)
|
||||
blob = _get_repository_blob(namespace_name, repo_name, digest)
|
||||
if blob is None:
|
||||
raise BlobUnknown()
|
||||
|
||||
|
@ -49,7 +62,8 @@ def check_blob_exists(namespace_name, repo_name, digest):
|
|||
headers = {
|
||||
'Docker-Content-Digest': digest,
|
||||
'Content-Length': blob.size,
|
||||
'Content-Type': BLOB_CONTENT_TYPE,}
|
||||
'Content-Type': BLOB_CONTENT_TYPE,
|
||||
}
|
||||
|
||||
# If our storage supports range requests, let the client know.
|
||||
if storage.get_supports_resumable_downloads(blob.locations):
|
||||
|
@ -67,7 +81,7 @@ def check_blob_exists(namespace_name, repo_name, digest):
|
|||
@cache_control(max_age=31536000)
|
||||
def download_blob(namespace_name, repo_name, digest):
|
||||
# Find the blob.
|
||||
blob = model.get_blob_by_digest(namespace_name, repo_name, digest)
|
||||
blob = _get_repository_blob(namespace_name, repo_name, digest)
|
||||
if blob is None:
|
||||
raise BlobUnknown()
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ class BlobUpload(
|
|||
"""
|
||||
|
||||
|
||||
class Blob(namedtuple('Blob', ['uuid', 'digest', 'size', 'locations'])):
|
||||
class Blob(namedtuple('Blob', ['uuid', 'digest', 'size', 'locations', 'cas_path'])):
|
||||
"""
|
||||
Blob represents an opaque binary blob saved to the storage system.
|
||||
"""
|
||||
|
|
|
@ -211,7 +211,8 @@ class PreOCIModel(DockerRegistryV2DataInterface):
|
|||
uuid=blob_record.uuid,
|
||||
digest=blob_digest,
|
||||
size=blob_upload.byte_count,
|
||||
locations=[blob_upload.location_name],)
|
||||
locations=[blob_upload.location_name],
|
||||
cas_path=blob_record.cas_path)
|
||||
|
||||
def lookup_blobs_by_digest(self, namespace_name, repo_name, digests):
|
||||
def _blob_view(blob_record):
|
||||
|
@ -219,6 +220,7 @@ class PreOCIModel(DockerRegistryV2DataInterface):
|
|||
uuid=blob_record.uuid,
|
||||
digest=blob_record.content_checksum,
|
||||
size=blob_record.image_size,
|
||||
cas_path=blob_record.cas_path,
|
||||
locations=None, # Note: Locations is None in this case.
|
||||
)
|
||||
|
||||
|
@ -235,7 +237,8 @@ class PreOCIModel(DockerRegistryV2DataInterface):
|
|||
uuid=blob_record.uuid,
|
||||
digest=digest,
|
||||
size=blob_record.image_size,
|
||||
locations=blob_record.locations,)
|
||||
locations=blob_record.locations,
|
||||
cas_path=blob_record.cas_path)
|
||||
except model.BlobDoesNotExist:
|
||||
return None
|
||||
|
||||
|
@ -254,8 +257,7 @@ class PreOCIModel(DockerRegistryV2DataInterface):
|
|||
label.media_type)
|
||||
|
||||
def get_blob_path(self, blob):
|
||||
blob_record = model.storage.get_storage_by_uuid(blob.uuid)
|
||||
return model.storage.get_layer_path(blob_record)
|
||||
return model.storage.get_layer_path_for_storage(blob.uuid, blob.cas_path, blob.digest)
|
||||
|
||||
def set_manifest_expires_after(self, namespace_name, repo_name, digest, expires_after_sec):
|
||||
try:
|
||||
|
|
59
endpoints/v2/test/test_blob.py
Normal file
59
endpoints/v2/test/test_blob.py
Normal file
|
@ -0,0 +1,59 @@
|
|||
import hashlib
|
||||
import pytest
|
||||
|
||||
from mock import patch
|
||||
from flask import url_for
|
||||
from playhouse.test_utils import assert_query_count
|
||||
|
||||
from app import instance_keys, app as realapp
|
||||
from data import model
|
||||
from data.cache import InMemoryDataModelCache
|
||||
from data.database import ImageStorageLocation
|
||||
from endpoints.test.shared import conduct_call
|
||||
from util.security.registry_jwt import generate_bearer_token, build_context_and_subject
|
||||
from test.fixtures import *
|
||||
|
||||
@pytest.mark.parametrize('method, endpoint', [
|
||||
('GET', 'download_blob'),
|
||||
('HEAD', 'check_blob_exists'),
|
||||
])
|
||||
def test_blob_caching(method, endpoint, client, app):
|
||||
digest = 'sha256:' + hashlib.sha256("a").hexdigest()
|
||||
location = ImageStorageLocation.get(name='local_us')
|
||||
model.blob.store_blob_record_and_temp_link('devtable', 'simple', digest, location, 1, 10000000)
|
||||
|
||||
params = {
|
||||
'repository': 'devtable/simple',
|
||||
'digest': digest,
|
||||
}
|
||||
|
||||
user = model.user.get_user('devtable')
|
||||
access = [{
|
||||
'type': 'repository',
|
||||
'name': 'devtable/simple',
|
||||
'actions': ['pull'],
|
||||
}]
|
||||
|
||||
context, subject = build_context_and_subject(user=user)
|
||||
token = generate_bearer_token(realapp.config['SERVER_HOSTNAME'], subject, context, access, 600,
|
||||
instance_keys)
|
||||
|
||||
headers = {
|
||||
'Authorization': 'Bearer %s' % token,
|
||||
}
|
||||
|
||||
# Run without caching to make sure the request works. This also preloads some of
|
||||
# our global model caches.
|
||||
conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200,
|
||||
headers=headers)
|
||||
|
||||
with patch('endpoints.v2.blob.model_cache', InMemoryDataModelCache()):
|
||||
# First request should make a DB query to retrieve the blob.
|
||||
with assert_query_count(1):
|
||||
conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200,
|
||||
headers=headers)
|
||||
|
||||
# Subsequent requests should use the cached blob.
|
||||
with assert_query_count(0):
|
||||
conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200,
|
||||
headers=headers)
|
25
endpoints/v2/test/test_models_pre_oci.py
Normal file
25
endpoints/v2/test/test_models_pre_oci.py
Normal file
|
@ -0,0 +1,25 @@
|
|||
import hashlib
|
||||
|
||||
from playhouse.test_utils import assert_query_count
|
||||
|
||||
from data import model
|
||||
from data.database import ImageStorageLocation
|
||||
from endpoints.v2.models_pre_oci import data_model
|
||||
from test.fixtures import *
|
||||
|
||||
def test_get_blob_path(initialized_db):
|
||||
# Add a blob.
|
||||
digest = 'sha256:' + hashlib.sha256("a").hexdigest()
|
||||
location = ImageStorageLocation.get(name='local_us')
|
||||
db_blob = model.blob.store_blob_record_and_temp_link('devtable', 'simple', digest, location, 1,
|
||||
10000000)
|
||||
|
||||
with assert_query_count(1):
|
||||
blob = data_model.get_blob_by_digest('devtable', 'simple', digest)
|
||||
assert blob.uuid == db_blob.uuid
|
||||
|
||||
# The blob tuple should have everything get_blob_path needs, so there should be no queries.
|
||||
with assert_query_count(0):
|
||||
assert data_model.get_blob_path(blob)
|
||||
|
||||
assert data_model.get_blob_path(blob) == model.storage.get_layer_path(db_blob)
|
|
@ -15,6 +15,7 @@ from data.model.user import LoginWrappedDBUser
|
|||
from endpoints.api import api_bp
|
||||
from endpoints.appr import appr_bp
|
||||
from endpoints.web import web
|
||||
from endpoints.v2 import v2_bp
|
||||
from endpoints.verbs import verbs as verbs_bp
|
||||
|
||||
from initdb import initialize_database, populate_database
|
||||
|
@ -173,6 +174,7 @@ def app(appconfig, initialized_db):
|
|||
app.register_blueprint(appr_bp, url_prefix='/cnr')
|
||||
app.register_blueprint(web, url_prefix='/')
|
||||
app.register_blueprint(verbs_bp, url_prefix='/c1')
|
||||
app.register_blueprint(v2_bp, url_prefix='/v2')
|
||||
|
||||
app.config.update(appconfig)
|
||||
return app
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import binascii
|
||||
import copy
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
|
@ -118,6 +119,19 @@ def addtoken():
|
|||
return 'OK'
|
||||
|
||||
|
||||
@testbp.route('/breakdatabase', methods=['POST'])
|
||||
def break_database():
|
||||
# Close any existing connection.
|
||||
close_db_filter(None)
|
||||
|
||||
# Reload the database config with an invalid connection.
|
||||
config = copy.copy(app.config)
|
||||
config['DB_URI'] = 'sqlite:///not/a/valid/database'
|
||||
configure(config)
|
||||
|
||||
return 'OK'
|
||||
|
||||
|
||||
@testbp.route('/reloadapp', methods=['POST'])
|
||||
def reload_app():
|
||||
# Close any existing connection.
|
||||
|
@ -1674,6 +1688,22 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix
|
|||
def test_cancel_push(self):
|
||||
self.do_push('devtable', 'newrepo', 'devtable', 'password', cancel=True)
|
||||
|
||||
def test_with_blob_caching(self):
|
||||
# Add a repository and do a pull, to prime the cache.
|
||||
_, manifests = self.do_push('devtable', 'newrepo', 'devtable', 'password')
|
||||
self.do_pull('devtable', 'newrepo', 'devtable', 'password')
|
||||
|
||||
# Purposefully break the database so that we can check if caching works.
|
||||
self.conduct('POST', '/__test/breakdatabase')
|
||||
|
||||
# Attempt to pull the blobs and ensure we get back a result. Since the database is broken,
|
||||
# this will only work if caching is working and no additional queries/connections are made.
|
||||
repo_name = 'devtable/newrepo'
|
||||
for tag_name in manifests:
|
||||
for layer in manifests[tag_name].layers:
|
||||
blob_id = str(layer.digest)
|
||||
self.conduct('GET', '/v2/%s/blobs/%s' % (repo_name, blob_id), expected_code=200, auth='jwt')
|
||||
|
||||
def test_pull_by_checksum(self):
|
||||
# Add a new repository under the user, so we have a real repository to pull.
|
||||
_, manifests = self.do_push('devtable', 'newrepo', 'devtable', 'password')
|
||||
|
|
Reference in a new issue