diff --git a/app.py b/app.py index 4e908ce41..853bf2532 100644 --- a/app.py +++ b/app.py @@ -23,6 +23,7 @@ from data import model from data.archivedlogs import LogArchive from data.billing import Billing from data.buildlogs import BuildLogs +from data.cache import InMemoryDataModelCache from data.model.user import LoginWrappedDBUser from data.queue import WorkQueue, BuildMetricQueueReporter from data.userevent import UserEventsBuilderModule @@ -160,16 +161,7 @@ def _request_end(resp): if request.user_agent is not None: extra["user-agent"] = request.user_agent.string - user = get_authenticated_user() - - if user: - extra['user'] = {'email': user.email, - 'uuid': user.uuid, - 'org': user.organization, - 'robot': user.robot} - logger.info("request-end", extra=extra) - logger.debug('Ending request: %s', request.path) return resp @@ -188,6 +180,8 @@ Principal(app, use_sessions=False) tf = app.config['DB_TRANSACTION_FACTORY'] +# TODO(jschorr): make this configurable +model_cache = InMemoryDataModelCache() avatar = Avatar(app) login_manager = LoginManager(app) diff --git a/data/cache/__init__.py b/data/cache/__init__.py new file mode 100644 index 000000000..96dc0223d --- /dev/null +++ b/data/cache/__init__.py @@ -0,0 +1,62 @@ +import logging + +from datetime import datetime + +from abc import ABCMeta, abstractmethod +from six import add_metaclass + +from util.expiresdict import ExpiresDict +from util.timedeltastring import convert_to_timedelta + +logger = logging.getLogger(__name__) + +def is_not_none(value): + return value is not None + + +@add_metaclass(ABCMeta) +class DataModelCache(object): + """ Defines an interface for cache storing and returning tuple data model objects. """ + + @abstractmethod + def retrieve(self, cache_key, loader, should_cache=is_not_none): + """ Checks the cache for the specified cache key and returns the value found (if any). If none + found, the loader is called to get a result and populate the cache. + """ + pass + + +class NoopDataModelCache(DataModelCache): + """ Implementation of the data model cache which does nothing. """ + + def retrieve(self, cache_key, loader, should_cache=is_not_none): + return loader() + + +class InMemoryDataModelCache(DataModelCache): + """ Implementation of the data model cache backed by an in-memory dictionary. """ + def __init__(self): + self.cache = ExpiresDict(rebuilder=lambda: {}) + + def retrieve(self, cache_key, loader, should_cache=is_not_none): + not_found = [None] + logger.debug('Checking cache for key %s', cache_key.key) + result = self.cache.get(cache_key.key, default_value=not_found) + if result != not_found: + logger.debug('Found result in cache for key %s: %s', cache_key.key, result) + return result + + logger.debug('Found no result in cache for key %s; calling loader', cache_key.key) + result = loader() + logger.debug('Got loaded result for key %s: %s', cache_key.key, result) + if should_cache(result): + logger.debug('Caching loaded result for key %s with expiration %s: %s', cache_key.key, + result, cache_key.expiration) + expires = convert_to_timedelta(cache_key.expiration) + datetime.now() + self.cache.set(cache_key.key, result, expires=expires) + logger.debug('Cached loaded result for key %s with expiration %s: %s', cache_key.key, + result, cache_key.expiration) + else: + logger.debug('Not caching loaded result for key %s: %s', cache_key.key, result) + + return result diff --git a/data/cache/cache_key.py b/data/cache/cache_key.py new file mode 100644 index 000000000..b0d4d7011 --- /dev/null +++ b/data/cache/cache_key.py @@ -0,0 +1,8 @@ +from collections import namedtuple + +class CacheKey(namedtuple('CacheKey', ['key', 'expiration'])): + """ Defines a key into the data model cache. """ + pass + +def for_repository_blob(namespace_name, repo_name, digest): + return CacheKey('repository_blob:%s:%s:%s' % (namespace_name, repo_name, digest), '60s') diff --git a/data/cache/test/test_cache.py b/data/cache/test/test_cache.py new file mode 100644 index 000000000..d9e3708d0 --- /dev/null +++ b/data/cache/test/test_cache.py @@ -0,0 +1,16 @@ +import pytest + +from data.cache import InMemoryDataModelCache, NoopDataModelCache +from data.cache.cache_key import CacheKey + +@pytest.mark.parametrize('cache_type', [ + (NoopDataModelCache), + (InMemoryDataModelCache), +]) +def test_caching(cache_type): + key = CacheKey('foo', '60m') + cache = cache_type() + + # Perform two retrievals, and make sure both return. + assert cache.retrieve(key, lambda: 1234) == 1234 + assert cache.retrieve(key, lambda: 1234) == 1234 diff --git a/data/model/storage.py b/data/model/storage.py index eef439d39..6a4ff291e 100644 --- a/data/model/storage.py +++ b/data/model/storage.py @@ -248,12 +248,19 @@ def get_storage_by_uuid(storage_uuid): def get_layer_path(storage_record): """ Returns the path in the storage engine to the layer data referenced by the storage row. """ - store = config.store - if not storage_record.cas_path: - logger.debug('Serving layer from legacy v1 path') - return store.v1_image_layer_path(storage_record.uuid) + return get_layer_path_for_storage(storage_record.uuid, storage_record.cas_path, + storage_record.content_checksum) - return store.blob_path(storage_record.content_checksum) + +def get_layer_path_for_storage(storage_uuid, cas_path, content_checksum): + """ Returns the path in the storage engine to the layer data referenced by the storage + information. """ + store = config.store + if not cas_path: + logger.debug('Serving layer from legacy v1 path for storage %s', storage_uuid) + return store.v1_image_layer_path(storage_uuid) + + return store.blob_path(content_checksum) def lookup_repo_storages_by_content_checksum(repo, checksums): diff --git a/data/model/test/test_tag.py b/data/model/test/test_tag.py index 6cfa7366d..b9ab2e3b2 100644 --- a/data/model/test/test_tag.py +++ b/data/model/test/test_tag.py @@ -210,4 +210,4 @@ def test_change_tag_expiration(expiration_offset, expected_offset, initialized_d start_date = datetime.utcfromtimestamp(footag_updated.lifetime_start_ts) end_date = datetime.utcfromtimestamp(footag_updated.lifetime_end_ts) expected_end_date = start_date + convert_to_timedelta(expected_offset) - assert end_date == expected_end_date + assert (expected_end_date - end_date).total_seconds() < 5 # variance in test diff --git a/endpoints/v2/__init__.py b/endpoints/v2/__init__.py index 198b56cbe..28a837ddd 100644 --- a/endpoints/v2/__init__.py +++ b/endpoints/v2/__init__.py @@ -95,21 +95,26 @@ def _require_repo_permission(permission_class, scopes=None, allow_public=False): def wrapped(namespace_name, repo_name, *args, **kwargs): logger.debug('Checking permission %s for repo: %s/%s', permission_class, namespace_name, repo_name) - repository = namespace_name + '/' + repo_name - repo = model.get_repository(namespace_name, repo_name) - if repo is None: - raise Unauthorized(repository=repository, scopes=scopes) permission = permission_class(namespace_name, repo_name) - if (permission.can() or (allow_public and repo.is_public)): + if permission.can(): + return func(namespace_name, repo_name, *args, **kwargs) + + repository = namespace_name + '/' + repo_name + if allow_public: + repo = model.get_repository(namespace_name, repo_name) + if repo is None or not repo.is_public: + raise Unauthorized(repository=repository, scopes=scopes) + if repo.kind != 'image': msg = 'This repository is for managing %s resources and not container images.' % repo.kind raise Unsupported(detail=msg) - return func(namespace_name, repo_name, *args, **kwargs) + + if repo.is_public: + return func(namespace_name, repo_name, *args, **kwargs) + raise Unauthorized(repository=repository, scopes=scopes) - return wrapped - return wrapper diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 7614e02ad..19f3fd832 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -7,9 +7,10 @@ from flask import url_for, request, redirect, Response, abort as flask_abort import bitmath import resumablehashlib -from app import storage, app, get_app_url, metric_queue +from app import storage, app, get_app_url, metric_queue, model_cache from auth.registry_jwt_auth import process_registry_jwt_auth from data import database +from data.cache import cache_key from digest import digest_tools from endpoints.decorators import anon_protect, parse_repository_name from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream @@ -33,6 +34,18 @@ class _InvalidRangeHeader(Exception): pass +def _get_repository_blob(namespace_name, repo_name, digest): + """ Returns the blob with the given digest under the repository with the given + name. If one does not exist (or it is still uploading), returns None. + Automatically handles caching. + """ + def load_blob(): + return model.get_blob_by_digest(namespace_name, repo_name, digest) + + blob_cache_key = cache_key.for_repository_blob(namespace_name, repo_name, digest) + return model_cache.retrieve(blob_cache_key, load_blob) + + @v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD']) @parse_repository_name() @process_registry_jwt_auth(scopes=['pull']) @@ -41,7 +54,7 @@ class _InvalidRangeHeader(Exception): @cache_control(max_age=31436000) def check_blob_exists(namespace_name, repo_name, digest): # Find the blob. - blob = model.get_blob_by_digest(namespace_name, repo_name, digest) + blob = _get_repository_blob(namespace_name, repo_name, digest) if blob is None: raise BlobUnknown() @@ -49,7 +62,8 @@ def check_blob_exists(namespace_name, repo_name, digest): headers = { 'Docker-Content-Digest': digest, 'Content-Length': blob.size, - 'Content-Type': BLOB_CONTENT_TYPE,} + 'Content-Type': BLOB_CONTENT_TYPE, + } # If our storage supports range requests, let the client know. if storage.get_supports_resumable_downloads(blob.locations): @@ -67,7 +81,7 @@ def check_blob_exists(namespace_name, repo_name, digest): @cache_control(max_age=31536000) def download_blob(namespace_name, repo_name, digest): # Find the blob. - blob = model.get_blob_by_digest(namespace_name, repo_name, digest) + blob = _get_repository_blob(namespace_name, repo_name, digest) if blob is None: raise BlobUnknown() diff --git a/endpoints/v2/models_interface.py b/endpoints/v2/models_interface.py index 66118a95f..bb6bb8bf7 100644 --- a/endpoints/v2/models_interface.py +++ b/endpoints/v2/models_interface.py @@ -41,7 +41,7 @@ class BlobUpload( """ -class Blob(namedtuple('Blob', ['uuid', 'digest', 'size', 'locations'])): +class Blob(namedtuple('Blob', ['uuid', 'digest', 'size', 'locations', 'cas_path'])): """ Blob represents an opaque binary blob saved to the storage system. """ diff --git a/endpoints/v2/models_pre_oci.py b/endpoints/v2/models_pre_oci.py index fa2e7dc49..0f2352716 100644 --- a/endpoints/v2/models_pre_oci.py +++ b/endpoints/v2/models_pre_oci.py @@ -211,7 +211,8 @@ class PreOCIModel(DockerRegistryV2DataInterface): uuid=blob_record.uuid, digest=blob_digest, size=blob_upload.byte_count, - locations=[blob_upload.location_name],) + locations=[blob_upload.location_name], + cas_path=blob_record.cas_path) def lookup_blobs_by_digest(self, namespace_name, repo_name, digests): def _blob_view(blob_record): @@ -219,6 +220,7 @@ class PreOCIModel(DockerRegistryV2DataInterface): uuid=blob_record.uuid, digest=blob_record.content_checksum, size=blob_record.image_size, + cas_path=blob_record.cas_path, locations=None, # Note: Locations is None in this case. ) @@ -235,7 +237,8 @@ class PreOCIModel(DockerRegistryV2DataInterface): uuid=blob_record.uuid, digest=digest, size=blob_record.image_size, - locations=blob_record.locations,) + locations=blob_record.locations, + cas_path=blob_record.cas_path) except model.BlobDoesNotExist: return None @@ -254,8 +257,7 @@ class PreOCIModel(DockerRegistryV2DataInterface): label.media_type) def get_blob_path(self, blob): - blob_record = model.storage.get_storage_by_uuid(blob.uuid) - return model.storage.get_layer_path(blob_record) + return model.storage.get_layer_path_for_storage(blob.uuid, blob.cas_path, blob.digest) def set_manifest_expires_after(self, namespace_name, repo_name, digest, expires_after_sec): try: diff --git a/endpoints/v2/test/test_blob.py b/endpoints/v2/test/test_blob.py new file mode 100644 index 000000000..0ec205949 --- /dev/null +++ b/endpoints/v2/test/test_blob.py @@ -0,0 +1,59 @@ +import hashlib +import pytest + +from mock import patch +from flask import url_for +from playhouse.test_utils import assert_query_count + +from app import instance_keys, app as realapp +from data import model +from data.cache import InMemoryDataModelCache +from data.database import ImageStorageLocation +from endpoints.test.shared import conduct_call +from util.security.registry_jwt import generate_bearer_token, build_context_and_subject +from test.fixtures import * + +@pytest.mark.parametrize('method, endpoint', [ + ('GET', 'download_blob'), + ('HEAD', 'check_blob_exists'), +]) +def test_blob_caching(method, endpoint, client, app): + digest = 'sha256:' + hashlib.sha256("a").hexdigest() + location = ImageStorageLocation.get(name='local_us') + model.blob.store_blob_record_and_temp_link('devtable', 'simple', digest, location, 1, 10000000) + + params = { + 'repository': 'devtable/simple', + 'digest': digest, + } + + user = model.user.get_user('devtable') + access = [{ + 'type': 'repository', + 'name': 'devtable/simple', + 'actions': ['pull'], + }] + + context, subject = build_context_and_subject(user=user) + token = generate_bearer_token(realapp.config['SERVER_HOSTNAME'], subject, context, access, 600, + instance_keys) + + headers = { + 'Authorization': 'Bearer %s' % token, + } + + # Run without caching to make sure the request works. This also preloads some of + # our global model caches. + conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200, + headers=headers) + + with patch('endpoints.v2.blob.model_cache', InMemoryDataModelCache()): + # First request should make a DB query to retrieve the blob. + with assert_query_count(1): + conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200, + headers=headers) + + # Subsequent requests should use the cached blob. + with assert_query_count(0): + conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200, + headers=headers) diff --git a/endpoints/v2/test/test_models_pre_oci.py b/endpoints/v2/test/test_models_pre_oci.py new file mode 100644 index 000000000..c0e232939 --- /dev/null +++ b/endpoints/v2/test/test_models_pre_oci.py @@ -0,0 +1,25 @@ +import hashlib + +from playhouse.test_utils import assert_query_count + +from data import model +from data.database import ImageStorageLocation +from endpoints.v2.models_pre_oci import data_model +from test.fixtures import * + +def test_get_blob_path(initialized_db): + # Add a blob. + digest = 'sha256:' + hashlib.sha256("a").hexdigest() + location = ImageStorageLocation.get(name='local_us') + db_blob = model.blob.store_blob_record_and_temp_link('devtable', 'simple', digest, location, 1, + 10000000) + + with assert_query_count(1): + blob = data_model.get_blob_by_digest('devtable', 'simple', digest) + assert blob.uuid == db_blob.uuid + + # The blob tuple should have everything get_blob_path needs, so there should be no queries. + with assert_query_count(0): + assert data_model.get_blob_path(blob) + + assert data_model.get_blob_path(blob) == model.storage.get_layer_path(db_blob) diff --git a/test/fixtures.py b/test/fixtures.py index e09336f70..684294f0c 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -15,6 +15,7 @@ from data.model.user import LoginWrappedDBUser from endpoints.api import api_bp from endpoints.appr import appr_bp from endpoints.web import web +from endpoints.v2 import v2_bp from endpoints.verbs import verbs as verbs_bp from initdb import initialize_database, populate_database @@ -173,6 +174,7 @@ def app(appconfig, initialized_db): app.register_blueprint(appr_bp, url_prefix='/cnr') app.register_blueprint(web, url_prefix='/') app.register_blueprint(verbs_bp, url_prefix='/c1') + app.register_blueprint(v2_bp, url_prefix='/v2') app.config.update(appconfig) return app diff --git a/test/registry_tests.py b/test/registry_tests.py index cef0e0a17..f85cdb2a4 100644 --- a/test/registry_tests.py +++ b/test/registry_tests.py @@ -1,4 +1,5 @@ import binascii +import copy import hashlib import json import logging @@ -118,6 +119,19 @@ def addtoken(): return 'OK' +@testbp.route('/breakdatabase', methods=['POST']) +def break_database(): + # Close any existing connection. + close_db_filter(None) + + # Reload the database config with an invalid connection. + config = copy.copy(app.config) + config['DB_URI'] = 'sqlite:///not/a/valid/database' + configure(config) + + return 'OK' + + @testbp.route('/reloadapp', methods=['POST']) def reload_app(): # Close any existing connection. @@ -1674,6 +1688,22 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix def test_cancel_push(self): self.do_push('devtable', 'newrepo', 'devtable', 'password', cancel=True) + def test_with_blob_caching(self): + # Add a repository and do a pull, to prime the cache. + _, manifests = self.do_push('devtable', 'newrepo', 'devtable', 'password') + self.do_pull('devtable', 'newrepo', 'devtable', 'password') + + # Purposefully break the database so that we can check if caching works. + self.conduct('POST', '/__test/breakdatabase') + + # Attempt to pull the blobs and ensure we get back a result. Since the database is broken, + # this will only work if caching is working and no additional queries/connections are made. + repo_name = 'devtable/newrepo' + for tag_name in manifests: + for layer in manifests[tag_name].layers: + blob_id = str(layer.digest) + self.conduct('GET', '/v2/%s/blobs/%s' % (repo_name, blob_id), expected_code=200, auth='jwt') + def test_pull_by_checksum(self): # Add a new repository under the user, so we have a real repository to pull. _, manifests = self.do_push('devtable', 'newrepo', 'devtable', 'password')