From 51e67ab7f513b78001b3391d57456bd1a0591d07 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 13 Dec 2017 16:27:46 -0500 Subject: [PATCH 1/6] Fix get_blob_path to not make any database calls and add a test This will be supported by caching, hopefully removing the need to hit the database when the blob object is cached --- data/model/storage.py | 17 +++++++++++----- endpoints/v2/models_interface.py | 2 +- endpoints/v2/models_pre_oci.py | 10 ++++++---- endpoints/v2/test/test_models_pre_oci.py | 25 ++++++++++++++++++++++++ 4 files changed, 44 insertions(+), 10 deletions(-) create mode 100644 endpoints/v2/test/test_models_pre_oci.py diff --git a/data/model/storage.py b/data/model/storage.py index eef439d39..6a4ff291e 100644 --- a/data/model/storage.py +++ b/data/model/storage.py @@ -248,12 +248,19 @@ def get_storage_by_uuid(storage_uuid): def get_layer_path(storage_record): """ Returns the path in the storage engine to the layer data referenced by the storage row. """ - store = config.store - if not storage_record.cas_path: - logger.debug('Serving layer from legacy v1 path') - return store.v1_image_layer_path(storage_record.uuid) + return get_layer_path_for_storage(storage_record.uuid, storage_record.cas_path, + storage_record.content_checksum) - return store.blob_path(storage_record.content_checksum) + +def get_layer_path_for_storage(storage_uuid, cas_path, content_checksum): + """ Returns the path in the storage engine to the layer data referenced by the storage + information. """ + store = config.store + if not cas_path: + logger.debug('Serving layer from legacy v1 path for storage %s', storage_uuid) + return store.v1_image_layer_path(storage_uuid) + + return store.blob_path(content_checksum) def lookup_repo_storages_by_content_checksum(repo, checksums): diff --git a/endpoints/v2/models_interface.py b/endpoints/v2/models_interface.py index 66118a95f..bb6bb8bf7 100644 --- a/endpoints/v2/models_interface.py +++ b/endpoints/v2/models_interface.py @@ -41,7 +41,7 @@ class BlobUpload( """ -class Blob(namedtuple('Blob', ['uuid', 'digest', 'size', 'locations'])): +class Blob(namedtuple('Blob', ['uuid', 'digest', 'size', 'locations', 'cas_path'])): """ Blob represents an opaque binary blob saved to the storage system. """ diff --git a/endpoints/v2/models_pre_oci.py b/endpoints/v2/models_pre_oci.py index fa2e7dc49..0f2352716 100644 --- a/endpoints/v2/models_pre_oci.py +++ b/endpoints/v2/models_pre_oci.py @@ -211,7 +211,8 @@ class PreOCIModel(DockerRegistryV2DataInterface): uuid=blob_record.uuid, digest=blob_digest, size=blob_upload.byte_count, - locations=[blob_upload.location_name],) + locations=[blob_upload.location_name], + cas_path=blob_record.cas_path) def lookup_blobs_by_digest(self, namespace_name, repo_name, digests): def _blob_view(blob_record): @@ -219,6 +220,7 @@ class PreOCIModel(DockerRegistryV2DataInterface): uuid=blob_record.uuid, digest=blob_record.content_checksum, size=blob_record.image_size, + cas_path=blob_record.cas_path, locations=None, # Note: Locations is None in this case. ) @@ -235,7 +237,8 @@ class PreOCIModel(DockerRegistryV2DataInterface): uuid=blob_record.uuid, digest=digest, size=blob_record.image_size, - locations=blob_record.locations,) + locations=blob_record.locations, + cas_path=blob_record.cas_path) except model.BlobDoesNotExist: return None @@ -254,8 +257,7 @@ class PreOCIModel(DockerRegistryV2DataInterface): label.media_type) def get_blob_path(self, blob): - blob_record = model.storage.get_storage_by_uuid(blob.uuid) - return model.storage.get_layer_path(blob_record) + return model.storage.get_layer_path_for_storage(blob.uuid, blob.cas_path, blob.digest) def set_manifest_expires_after(self, namespace_name, repo_name, digest, expires_after_sec): try: diff --git a/endpoints/v2/test/test_models_pre_oci.py b/endpoints/v2/test/test_models_pre_oci.py new file mode 100644 index 000000000..c0e232939 --- /dev/null +++ b/endpoints/v2/test/test_models_pre_oci.py @@ -0,0 +1,25 @@ +import hashlib + +from playhouse.test_utils import assert_query_count + +from data import model +from data.database import ImageStorageLocation +from endpoints.v2.models_pre_oci import data_model +from test.fixtures import * + +def test_get_blob_path(initialized_db): + # Add a blob. + digest = 'sha256:' + hashlib.sha256("a").hexdigest() + location = ImageStorageLocation.get(name='local_us') + db_blob = model.blob.store_blob_record_and_temp_link('devtable', 'simple', digest, location, 1, + 10000000) + + with assert_query_count(1): + blob = data_model.get_blob_by_digest('devtable', 'simple', digest) + assert blob.uuid == db_blob.uuid + + # The blob tuple should have everything get_blob_path needs, so there should be no queries. + with assert_query_count(0): + assert data_model.get_blob_path(blob) + + assert data_model.get_blob_path(blob) == model.storage.get_layer_path(db_blob) From 3c72e9878de3d8190e3f3a75f9cc0fa72e767c6e Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 14 Dec 2017 13:36:51 -0500 Subject: [PATCH 2/6] Add the concept of a data model cache, for caching of Namedtuple objects from the data model Will be used to cache blobs, thus removing the need to hit the database in most blob requests --- data/cache/__init__.py | 48 +++++++++++++++++++++++++++++++++++ data/cache/cache_key.py | 8 ++++++ data/cache/test/test_cache.py | 16 ++++++++++++ 3 files changed, 72 insertions(+) create mode 100644 data/cache/__init__.py create mode 100644 data/cache/cache_key.py create mode 100644 data/cache/test/test_cache.py diff --git a/data/cache/__init__.py b/data/cache/__init__.py new file mode 100644 index 000000000..01c572f38 --- /dev/null +++ b/data/cache/__init__.py @@ -0,0 +1,48 @@ +from datetime import datetime + +from abc import ABCMeta, abstractmethod +from six import add_metaclass + +from util.expiresdict import ExpiresDict +from util.timedeltastring import convert_to_timedelta + +def is_not_none(value): + return value is not None + + +@add_metaclass(ABCMeta) +class DataModelCache(object): + """ Defines an interface for cache storing and returning tuple data model objects. """ + + @abstractmethod + def retrieve(self, cache_key, loader, should_cache=is_not_none): + """ Checks the cache for the specified cache key and returns the value found (if any). If none + found, the loader is called to get a result and populate the cache. + """ + pass + + +class NoopDataModelCache(DataModelCache): + """ Implementation of the data model cache which does nothing. """ + + def retrieve(self, cache_key, loader, should_cache=is_not_none): + return loader() + + +class InMemoryDataModelCache(DataModelCache): + """ Implementation of the data model cache backed by an in-memory dictionary. """ + def __init__(self): + self.cache = ExpiresDict(rebuilder=lambda: {}) + + def retrieve(self, cache_key, loader, should_cache=is_not_none): + not_found = [None] + result = self.cache.get(cache_key.key, default_value=not_found) + if result != not_found: + return result + + result = loader() + if should_cache(result): + expires = convert_to_timedelta(cache_key.expiration) + datetime.now() + self.cache.set(cache_key.key, result, expires=expires) + + return result diff --git a/data/cache/cache_key.py b/data/cache/cache_key.py new file mode 100644 index 000000000..b0d4d7011 --- /dev/null +++ b/data/cache/cache_key.py @@ -0,0 +1,8 @@ +from collections import namedtuple + +class CacheKey(namedtuple('CacheKey', ['key', 'expiration'])): + """ Defines a key into the data model cache. """ + pass + +def for_repository_blob(namespace_name, repo_name, digest): + return CacheKey('repository_blob:%s:%s:%s' % (namespace_name, repo_name, digest), '60s') diff --git a/data/cache/test/test_cache.py b/data/cache/test/test_cache.py new file mode 100644 index 000000000..d9e3708d0 --- /dev/null +++ b/data/cache/test/test_cache.py @@ -0,0 +1,16 @@ +import pytest + +from data.cache import InMemoryDataModelCache, NoopDataModelCache +from data.cache.cache_key import CacheKey + +@pytest.mark.parametrize('cache_type', [ + (NoopDataModelCache), + (InMemoryDataModelCache), +]) +def test_caching(cache_type): + key = CacheKey('foo', '60m') + cache = cache_type() + + # Perform two retrievals, and make sure both return. + assert cache.retrieve(key, lambda: 1234) == 1234 + assert cache.retrieve(key, lambda: 1234) == 1234 From db6007cb371b4db0a4fa2e12e6eb0aaa3678644c Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 14 Dec 2017 13:37:31 -0500 Subject: [PATCH 3/6] Change v2 registry auth code to not hit the database when we know we have permissions loaded Avoids a DB call and, when used in conjunction with blob caching, will avoid a DB *connection* --- endpoints/v2/__init__.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/endpoints/v2/__init__.py b/endpoints/v2/__init__.py index 198b56cbe..28a837ddd 100644 --- a/endpoints/v2/__init__.py +++ b/endpoints/v2/__init__.py @@ -95,21 +95,26 @@ def _require_repo_permission(permission_class, scopes=None, allow_public=False): def wrapped(namespace_name, repo_name, *args, **kwargs): logger.debug('Checking permission %s for repo: %s/%s', permission_class, namespace_name, repo_name) - repository = namespace_name + '/' + repo_name - repo = model.get_repository(namespace_name, repo_name) - if repo is None: - raise Unauthorized(repository=repository, scopes=scopes) permission = permission_class(namespace_name, repo_name) - if (permission.can() or (allow_public and repo.is_public)): + if permission.can(): + return func(namespace_name, repo_name, *args, **kwargs) + + repository = namespace_name + '/' + repo_name + if allow_public: + repo = model.get_repository(namespace_name, repo_name) + if repo is None or not repo.is_public: + raise Unauthorized(repository=repository, scopes=scopes) + if repo.kind != 'image': msg = 'This repository is for managing %s resources and not container images.' % repo.kind raise Unsupported(detail=msg) - return func(namespace_name, repo_name, *args, **kwargs) + + if repo.is_public: + return func(namespace_name, repo_name, *args, **kwargs) + raise Unauthorized(repository=repository, scopes=scopes) - return wrapped - return wrapper From b2485934ede322ac0327e7ebb48097ef8d20a56f Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 14 Dec 2017 13:38:24 -0500 Subject: [PATCH 4/6] Enable caching of blobs in V2 registry protocol, to avoid DB connections after the cache has been loaded This should help for bursty pull traffic, as it will avoid DB connections on a huge % of requests --- app.py | 12 ++----- endpoints/v2/blob.py | 22 ++++++++++--- endpoints/v2/test/test_blob.py | 59 ++++++++++++++++++++++++++++++++++ test/fixtures.py | 2 ++ test/registry_tests.py | 30 +++++++++++++++++ 5 files changed, 112 insertions(+), 13 deletions(-) create mode 100644 endpoints/v2/test/test_blob.py diff --git a/app.py b/app.py index 4e908ce41..853bf2532 100644 --- a/app.py +++ b/app.py @@ -23,6 +23,7 @@ from data import model from data.archivedlogs import LogArchive from data.billing import Billing from data.buildlogs import BuildLogs +from data.cache import InMemoryDataModelCache from data.model.user import LoginWrappedDBUser from data.queue import WorkQueue, BuildMetricQueueReporter from data.userevent import UserEventsBuilderModule @@ -160,16 +161,7 @@ def _request_end(resp): if request.user_agent is not None: extra["user-agent"] = request.user_agent.string - user = get_authenticated_user() - - if user: - extra['user'] = {'email': user.email, - 'uuid': user.uuid, - 'org': user.organization, - 'robot': user.robot} - logger.info("request-end", extra=extra) - logger.debug('Ending request: %s', request.path) return resp @@ -188,6 +180,8 @@ Principal(app, use_sessions=False) tf = app.config['DB_TRANSACTION_FACTORY'] +# TODO(jschorr): make this configurable +model_cache = InMemoryDataModelCache() avatar = Avatar(app) login_manager = LoginManager(app) diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 7614e02ad..85676a04d 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -7,9 +7,10 @@ from flask import url_for, request, redirect, Response, abort as flask_abort import bitmath import resumablehashlib -from app import storage, app, get_app_url, metric_queue +from app import storage, app, get_app_url, metric_queue, model_cache from auth.registry_jwt_auth import process_registry_jwt_auth from data import database +from data.cache import cache_key from digest import digest_tools from endpoints.decorators import anon_protect, parse_repository_name from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream @@ -33,6 +34,18 @@ class _InvalidRangeHeader(Exception): pass +def _get_repository_blob(namespace_name, repo_name, digest): + """ Returns the blob with the given digest under the repository with the given + name. If one does not exist (or it is still uploading), returns None. + Automatically handles caching. + """ + def load_blob(): + return model.get_blob_by_digest(namespace_name, repo_name, digest) + + blob_cache_key = cache_key.for_repository_blob(namespace_name, repo_name, digest) + return model_cache.retrieve(blob_cache_key, load_blob) + + @v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD']) @parse_repository_name() @process_registry_jwt_auth(scopes=['pull']) @@ -41,7 +54,7 @@ class _InvalidRangeHeader(Exception): @cache_control(max_age=31436000) def check_blob_exists(namespace_name, repo_name, digest): # Find the blob. - blob = model.get_blob_by_digest(namespace_name, repo_name, digest) + blob = _get_repository_blob(namespace_name, repo_name, digest) if blob is None: raise BlobUnknown() @@ -49,7 +62,8 @@ def check_blob_exists(namespace_name, repo_name, digest): headers = { 'Docker-Content-Digest': digest, 'Content-Length': blob.size, - 'Content-Type': BLOB_CONTENT_TYPE,} + 'Content-Type': BLOB_CONTENT_TYPE, + } # If our storage supports range requests, let the client know. if storage.get_supports_resumable_downloads(blob.locations): @@ -67,7 +81,7 @@ def check_blob_exists(namespace_name, repo_name, digest): @cache_control(max_age=31536000) def download_blob(namespace_name, repo_name, digest): # Find the blob. - blob = model.get_blob_by_digest(namespace_name, repo_name, digest) + blob = _get_repository_blob(namespace_name, repo_name, digest) if blob is None: raise BlobUnknown() diff --git a/endpoints/v2/test/test_blob.py b/endpoints/v2/test/test_blob.py new file mode 100644 index 000000000..0ec205949 --- /dev/null +++ b/endpoints/v2/test/test_blob.py @@ -0,0 +1,59 @@ +import hashlib +import pytest + +from mock import patch +from flask import url_for +from playhouse.test_utils import assert_query_count + +from app import instance_keys, app as realapp +from data import model +from data.cache import InMemoryDataModelCache +from data.database import ImageStorageLocation +from endpoints.test.shared import conduct_call +from util.security.registry_jwt import generate_bearer_token, build_context_and_subject +from test.fixtures import * + +@pytest.mark.parametrize('method, endpoint', [ + ('GET', 'download_blob'), + ('HEAD', 'check_blob_exists'), +]) +def test_blob_caching(method, endpoint, client, app): + digest = 'sha256:' + hashlib.sha256("a").hexdigest() + location = ImageStorageLocation.get(name='local_us') + model.blob.store_blob_record_and_temp_link('devtable', 'simple', digest, location, 1, 10000000) + + params = { + 'repository': 'devtable/simple', + 'digest': digest, + } + + user = model.user.get_user('devtable') + access = [{ + 'type': 'repository', + 'name': 'devtable/simple', + 'actions': ['pull'], + }] + + context, subject = build_context_and_subject(user=user) + token = generate_bearer_token(realapp.config['SERVER_HOSTNAME'], subject, context, access, 600, + instance_keys) + + headers = { + 'Authorization': 'Bearer %s' % token, + } + + # Run without caching to make sure the request works. This also preloads some of + # our global model caches. + conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200, + headers=headers) + + with patch('endpoints.v2.blob.model_cache', InMemoryDataModelCache()): + # First request should make a DB query to retrieve the blob. + with assert_query_count(1): + conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200, + headers=headers) + + # Subsequent requests should use the cached blob. + with assert_query_count(0): + conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200, + headers=headers) diff --git a/test/fixtures.py b/test/fixtures.py index e09336f70..684294f0c 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -15,6 +15,7 @@ from data.model.user import LoginWrappedDBUser from endpoints.api import api_bp from endpoints.appr import appr_bp from endpoints.web import web +from endpoints.v2 import v2_bp from endpoints.verbs import verbs as verbs_bp from initdb import initialize_database, populate_database @@ -173,6 +174,7 @@ def app(appconfig, initialized_db): app.register_blueprint(appr_bp, url_prefix='/cnr') app.register_blueprint(web, url_prefix='/') app.register_blueprint(verbs_bp, url_prefix='/c1') + app.register_blueprint(v2_bp, url_prefix='/v2') app.config.update(appconfig) return app diff --git a/test/registry_tests.py b/test/registry_tests.py index cef0e0a17..f85cdb2a4 100644 --- a/test/registry_tests.py +++ b/test/registry_tests.py @@ -1,4 +1,5 @@ import binascii +import copy import hashlib import json import logging @@ -118,6 +119,19 @@ def addtoken(): return 'OK' +@testbp.route('/breakdatabase', methods=['POST']) +def break_database(): + # Close any existing connection. + close_db_filter(None) + + # Reload the database config with an invalid connection. + config = copy.copy(app.config) + config['DB_URI'] = 'sqlite:///not/a/valid/database' + configure(config) + + return 'OK' + + @testbp.route('/reloadapp', methods=['POST']) def reload_app(): # Close any existing connection. @@ -1674,6 +1688,22 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix def test_cancel_push(self): self.do_push('devtable', 'newrepo', 'devtable', 'password', cancel=True) + def test_with_blob_caching(self): + # Add a repository and do a pull, to prime the cache. + _, manifests = self.do_push('devtable', 'newrepo', 'devtable', 'password') + self.do_pull('devtable', 'newrepo', 'devtable', 'password') + + # Purposefully break the database so that we can check if caching works. + self.conduct('POST', '/__test/breakdatabase') + + # Attempt to pull the blobs and ensure we get back a result. Since the database is broken, + # this will only work if caching is working and no additional queries/connections are made. + repo_name = 'devtable/newrepo' + for tag_name in manifests: + for layer in manifests[tag_name].layers: + blob_id = str(layer.digest) + self.conduct('GET', '/v2/%s/blobs/%s' % (repo_name, blob_id), expected_code=200, auth='jwt') + def test_pull_by_checksum(self): # Add a new repository under the user, so we have a real repository to pull. _, manifests = self.do_push('devtable', 'newrepo', 'devtable', 'password') From 60bc65569543f5ded4d4f515eeeb40e0768e3219 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 14 Dec 2017 14:00:20 -0500 Subject: [PATCH 5/6] Fix flakiness in a test when comparing date times --- data/model/test/test_tag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data/model/test/test_tag.py b/data/model/test/test_tag.py index 6cfa7366d..b9ab2e3b2 100644 --- a/data/model/test/test_tag.py +++ b/data/model/test/test_tag.py @@ -210,4 +210,4 @@ def test_change_tag_expiration(expiration_offset, expected_offset, initialized_d start_date = datetime.utcfromtimestamp(footag_updated.lifetime_start_ts) end_date = datetime.utcfromtimestamp(footag_updated.lifetime_end_ts) expected_end_date = start_date + convert_to_timedelta(expected_offset) - assert end_date == expected_end_date + assert (expected_end_date - end_date).total_seconds() < 5 # variance in test From 9e165968541e3910729e7d5ddbf81f23387b3ce8 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 14 Dec 2017 14:30:59 -0500 Subject: [PATCH 6/6] Add a bunch of logging to the data model caching mechanism Should help us debug any potential issues --- data/cache/__init__.py | 14 ++++++++++++++ endpoints/v2/blob.py | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/data/cache/__init__.py b/data/cache/__init__.py index 01c572f38..96dc0223d 100644 --- a/data/cache/__init__.py +++ b/data/cache/__init__.py @@ -1,3 +1,5 @@ +import logging + from datetime import datetime from abc import ABCMeta, abstractmethod @@ -6,6 +8,8 @@ from six import add_metaclass from util.expiresdict import ExpiresDict from util.timedeltastring import convert_to_timedelta +logger = logging.getLogger(__name__) + def is_not_none(value): return value is not None @@ -36,13 +40,23 @@ class InMemoryDataModelCache(DataModelCache): def retrieve(self, cache_key, loader, should_cache=is_not_none): not_found = [None] + logger.debug('Checking cache for key %s', cache_key.key) result = self.cache.get(cache_key.key, default_value=not_found) if result != not_found: + logger.debug('Found result in cache for key %s: %s', cache_key.key, result) return result + logger.debug('Found no result in cache for key %s; calling loader', cache_key.key) result = loader() + logger.debug('Got loaded result for key %s: %s', cache_key.key, result) if should_cache(result): + logger.debug('Caching loaded result for key %s with expiration %s: %s', cache_key.key, + result, cache_key.expiration) expires = convert_to_timedelta(cache_key.expiration) + datetime.now() self.cache.set(cache_key.key, result, expires=expires) + logger.debug('Cached loaded result for key %s with expiration %s: %s', cache_key.key, + result, cache_key.expiration) + else: + logger.debug('Not caching loaded result for key %s: %s', cache_key.key, result) return result diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 85676a04d..19f3fd832 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -43,7 +43,7 @@ def _get_repository_blob(namespace_name, repo_name, digest): return model.get_blob_by_digest(namespace_name, repo_name, digest) blob_cache_key = cache_key.for_repository_blob(namespace_name, repo_name, digest) - return model_cache.retrieve(blob_cache_key, load_blob) + return model_cache.retrieve(blob_cache_key, load_blob) @v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD'])