diff --git a/app.py b/app.py index 4e908ce41..853bf2532 100644 --- a/app.py +++ b/app.py @@ -23,6 +23,7 @@ from data import model from data.archivedlogs import LogArchive from data.billing import Billing from data.buildlogs import BuildLogs +from data.cache import InMemoryDataModelCache from data.model.user import LoginWrappedDBUser from data.queue import WorkQueue, BuildMetricQueueReporter from data.userevent import UserEventsBuilderModule @@ -160,16 +161,7 @@ def _request_end(resp): if request.user_agent is not None: extra["user-agent"] = request.user_agent.string - user = get_authenticated_user() - - if user: - extra['user'] = {'email': user.email, - 'uuid': user.uuid, - 'org': user.organization, - 'robot': user.robot} - logger.info("request-end", extra=extra) - logger.debug('Ending request: %s', request.path) return resp @@ -188,6 +180,8 @@ Principal(app, use_sessions=False) tf = app.config['DB_TRANSACTION_FACTORY'] +# TODO(jschorr): make this configurable +model_cache = InMemoryDataModelCache() avatar = Avatar(app) login_manager = LoginManager(app) diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 7614e02ad..85676a04d 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -7,9 +7,10 @@ from flask import url_for, request, redirect, Response, abort as flask_abort import bitmath import resumablehashlib -from app import storage, app, get_app_url, metric_queue +from app import storage, app, get_app_url, metric_queue, model_cache from auth.registry_jwt_auth import process_registry_jwt_auth from data import database +from data.cache import cache_key from digest import digest_tools from endpoints.decorators import anon_protect, parse_repository_name from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream @@ -33,6 +34,18 @@ class _InvalidRangeHeader(Exception): pass +def _get_repository_blob(namespace_name, repo_name, digest): + """ Returns the blob with the given digest under the repository with the given + name. If one does not exist (or it is still uploading), returns None. + Automatically handles caching. + """ + def load_blob(): + return model.get_blob_by_digest(namespace_name, repo_name, digest) + + blob_cache_key = cache_key.for_repository_blob(namespace_name, repo_name, digest) + return model_cache.retrieve(blob_cache_key, load_blob) + + @v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD']) @parse_repository_name() @process_registry_jwt_auth(scopes=['pull']) @@ -41,7 +54,7 @@ class _InvalidRangeHeader(Exception): @cache_control(max_age=31436000) def check_blob_exists(namespace_name, repo_name, digest): # Find the blob. - blob = model.get_blob_by_digest(namespace_name, repo_name, digest) + blob = _get_repository_blob(namespace_name, repo_name, digest) if blob is None: raise BlobUnknown() @@ -49,7 +62,8 @@ def check_blob_exists(namespace_name, repo_name, digest): headers = { 'Docker-Content-Digest': digest, 'Content-Length': blob.size, - 'Content-Type': BLOB_CONTENT_TYPE,} + 'Content-Type': BLOB_CONTENT_TYPE, + } # If our storage supports range requests, let the client know. if storage.get_supports_resumable_downloads(blob.locations): @@ -67,7 +81,7 @@ def check_blob_exists(namespace_name, repo_name, digest): @cache_control(max_age=31536000) def download_blob(namespace_name, repo_name, digest): # Find the blob. - blob = model.get_blob_by_digest(namespace_name, repo_name, digest) + blob = _get_repository_blob(namespace_name, repo_name, digest) if blob is None: raise BlobUnknown() diff --git a/endpoints/v2/test/test_blob.py b/endpoints/v2/test/test_blob.py new file mode 100644 index 000000000..0ec205949 --- /dev/null +++ b/endpoints/v2/test/test_blob.py @@ -0,0 +1,59 @@ +import hashlib +import pytest + +from mock import patch +from flask import url_for +from playhouse.test_utils import assert_query_count + +from app import instance_keys, app as realapp +from data import model +from data.cache import InMemoryDataModelCache +from data.database import ImageStorageLocation +from endpoints.test.shared import conduct_call +from util.security.registry_jwt import generate_bearer_token, build_context_and_subject +from test.fixtures import * + +@pytest.mark.parametrize('method, endpoint', [ + ('GET', 'download_blob'), + ('HEAD', 'check_blob_exists'), +]) +def test_blob_caching(method, endpoint, client, app): + digest = 'sha256:' + hashlib.sha256("a").hexdigest() + location = ImageStorageLocation.get(name='local_us') + model.blob.store_blob_record_and_temp_link('devtable', 'simple', digest, location, 1, 10000000) + + params = { + 'repository': 'devtable/simple', + 'digest': digest, + } + + user = model.user.get_user('devtable') + access = [{ + 'type': 'repository', + 'name': 'devtable/simple', + 'actions': ['pull'], + }] + + context, subject = build_context_and_subject(user=user) + token = generate_bearer_token(realapp.config['SERVER_HOSTNAME'], subject, context, access, 600, + instance_keys) + + headers = { + 'Authorization': 'Bearer %s' % token, + } + + # Run without caching to make sure the request works. This also preloads some of + # our global model caches. + conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200, + headers=headers) + + with patch('endpoints.v2.blob.model_cache', InMemoryDataModelCache()): + # First request should make a DB query to retrieve the blob. + with assert_query_count(1): + conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200, + headers=headers) + + # Subsequent requests should use the cached blob. + with assert_query_count(0): + conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200, + headers=headers) diff --git a/test/fixtures.py b/test/fixtures.py index e09336f70..684294f0c 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -15,6 +15,7 @@ from data.model.user import LoginWrappedDBUser from endpoints.api import api_bp from endpoints.appr import appr_bp from endpoints.web import web +from endpoints.v2 import v2_bp from endpoints.verbs import verbs as verbs_bp from initdb import initialize_database, populate_database @@ -173,6 +174,7 @@ def app(appconfig, initialized_db): app.register_blueprint(appr_bp, url_prefix='/cnr') app.register_blueprint(web, url_prefix='/') app.register_blueprint(verbs_bp, url_prefix='/c1') + app.register_blueprint(v2_bp, url_prefix='/v2') app.config.update(appconfig) return app diff --git a/test/registry_tests.py b/test/registry_tests.py index cef0e0a17..f85cdb2a4 100644 --- a/test/registry_tests.py +++ b/test/registry_tests.py @@ -1,4 +1,5 @@ import binascii +import copy import hashlib import json import logging @@ -118,6 +119,19 @@ def addtoken(): return 'OK' +@testbp.route('/breakdatabase', methods=['POST']) +def break_database(): + # Close any existing connection. + close_db_filter(None) + + # Reload the database config with an invalid connection. + config = copy.copy(app.config) + config['DB_URI'] = 'sqlite:///not/a/valid/database' + configure(config) + + return 'OK' + + @testbp.route('/reloadapp', methods=['POST']) def reload_app(): # Close any existing connection. @@ -1674,6 +1688,22 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix def test_cancel_push(self): self.do_push('devtable', 'newrepo', 'devtable', 'password', cancel=True) + def test_with_blob_caching(self): + # Add a repository and do a pull, to prime the cache. + _, manifests = self.do_push('devtable', 'newrepo', 'devtable', 'password') + self.do_pull('devtable', 'newrepo', 'devtable', 'password') + + # Purposefully break the database so that we can check if caching works. + self.conduct('POST', '/__test/breakdatabase') + + # Attempt to pull the blobs and ensure we get back a result. Since the database is broken, + # this will only work if caching is working and no additional queries/connections are made. + repo_name = 'devtable/newrepo' + for tag_name in manifests: + for layer in manifests[tag_name].layers: + blob_id = str(layer.digest) + self.conduct('GET', '/v2/%s/blobs/%s' % (repo_name, blob_id), expected_code=200, auth='jwt') + def test_pull_by_checksum(self): # Add a new repository under the user, so we have a real repository to pull. _, manifests = self.do_push('devtable', 'newrepo', 'devtable', 'password')