diff --git a/data/cache/cache_key.py b/data/cache/cache_key.py index f30d6d345..94d6f62a4 100644 --- a/data/cache/cache_key.py +++ b/data/cache/cache_key.py @@ -7,7 +7,7 @@ class CacheKey(namedtuple('CacheKey', ['key', 'expiration'])): def for_repository_blob(namespace_name, repo_name, digest): """ Returns a cache key for a blob in a repository. """ - return CacheKey('repository_blob__%s_%s_%s' % (namespace_name, repo_name, digest), '60s') + return CacheKey('repo_blob__%s_%s_%s' % (namespace_name, repo_name, digest), '60s') def for_catalog_page(auth_context_key, start_id, limit): diff --git a/data/model/storage.py b/data/model/storage.py index cb77ece5f..85a118e88 100644 --- a/data/model/storage.py +++ b/data/model/storage.py @@ -282,7 +282,7 @@ def lookup_repo_storages_by_content_checksum(repo, checksums): candidate_subq = (ImageStorage .select(ImageStorage.id, ImageStorage.content_checksum, ImageStorage.image_size, ImageStorage.uuid, ImageStorage.cas_path, - ImageStorage.uncompressed_size) + ImageStorage.uncompressed_size, ImageStorage.uploading) .join(Image) .where(Image.repository == repo, ImageStorage.content_checksum == checksum) .limit(1) diff --git a/data/registry_model/datatype.py b/data/registry_model/datatype.py index 8264f277e..1b2768e83 100644 --- a/data/registry_model/datatype.py +++ b/data/registry_model/datatype.py @@ -2,6 +2,11 @@ from functools import wraps, total_ordering +class FromDictionaryException(Exception): + """ Exception raised if constructing a data type from a dictionary fails due to + a version mismatch or missing data. + """ + def datatype(name, static_fields): """ Defines a base class for a datatype that will represent a row from the database, in an abstracted form. @@ -33,6 +38,23 @@ def datatype(name, static_fields): def __repr__(self): return '<%s> #%s' % (name, self._db_id) + @classmethod + def from_dict(cls, dict_data): + if dict_data.get('version') != 1: + raise FromDictionaryException() + + try: + return cls(**dict_data) + except: + raise FromDictionaryException() + + def asdict(self): + dictionary_rep = dict(self._fields) + dictionary_rep['db_id'] = self._db_id + dictionary_rep['inputs'] = self._inputs + dictionary_rep['version'] = 1 + return dictionary_rep + return DataType diff --git a/data/registry_model/registry_pre_oci_model.py b/data/registry_model/registry_pre_oci_model.py index 4031d5eef..4c6f9510b 100644 --- a/data/registry_model/registry_pre_oci_model.py +++ b/data/registry_model/registry_pre_oci_model.py @@ -8,8 +8,10 @@ from peewee import IntegrityError from data import database from data import model +from data.cache import cache_key from data.database import db_transaction from data.registry_model.interface import RegistryDataInterface +from data.registry_model.datatype import FromDictionaryException from data.registry_model.datatypes import (Tag, RepositoryReference, Manifest, LegacyImage, Label, SecurityScanStatus, ManifestLayer, Blob, DerivedImage, TorrentInfo, BlobUpload) @@ -685,6 +687,36 @@ class PreOCIModel(RegistryDataInterface): torrent_info = model.storage.save_torrent_info(image_storage, piece_length, pieces) return TorrentInfo.for_torrent_info(torrent_info) + def get_cached_repo_blob(self, model_cache, namespace_name, repo_name, blob_digest): + """ + Returns the blob in the repository with the given digest if any or None if none. + Caches the result in the caching system. + """ + def load_blob(): + repository_ref = self.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + return None + + blob_found = self.get_repo_blob_by_digest(repository_ref, blob_digest, + include_placements=True) + if blob_found is None: + return None + + return blob_found.asdict() + + blob_cache_key = cache_key.for_repository_blob(namespace_name, repo_name, blob_digest) + blob_dict = model_cache.retrieve(blob_cache_key, load_blob) + + try: + return Blob.from_dict(blob_dict) if blob_dict is not None else None + except FromDictionaryException: + # The data was stale in some way. Simply reload. + repository_ref = self.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + return None + + return self.get_repo_blob_by_digest(repository_ref, blob_digest, include_placements=True) + def get_repo_blob_by_digest(self, repository_ref, blob_digest, include_placements=False): """ Returns the blob in the repository with the given digest, if any or None if none. Note that diff --git a/data/registry_model/test/test_pre_oci_model.py b/data/registry_model/test/test_pre_oci_model.py index 06a43aaba..bfdfc2b29 100644 --- a/data/registry_model/test/test_pre_oci_model.py +++ b/data/registry_model/test/test_pre_oci_model.py @@ -5,14 +5,15 @@ from datetime import datetime, timedelta import pytest +from mock import patch from playhouse.test_utils import assert_query_count -from app import docker_v2_signing_key from data import model from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest, ManifestBlob, ManifestLegacyImage, ManifestLabel, TagManifest, RepositoryTag, Image, TagManifestLabel, TagManifest, TagManifestLabel, DerivedStorageForImage, - TorrentInfo) + TorrentInfo, close_db_filter) +from data.cache.impl import InMemoryDataModelCache from data.registry_model.registry_pre_oci_model import PreOCIModel from data.registry_model.datatypes import RepositoryReference @@ -638,3 +639,49 @@ def test_mount_blob_into_repository(pre_oci_model): # Ensure it now exists. found = pre_oci_model.get_repo_blob_by_digest(target_repository_ref, layer.blob.digest) assert found == layer.blob + + +class SomeException(Exception): + pass + + +def test_get_cached_repo_blob(pre_oci_model): + model_cache = InMemoryDataModelCache() + + repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') + latest_tag = pre_oci_model.get_repo_tag(repository_ref, 'latest') + manifest = pre_oci_model.get_manifest_for_tag(latest_tag) + + layers = pre_oci_model.list_manifest_layers(manifest, include_placements=True) + assert layers + + blob = layers[0].blob + + # Load a blob to add it to the cache. + found = pre_oci_model.get_cached_repo_blob(model_cache, 'devtable', 'simple', blob.digest) + assert found.digest == blob.digest + assert found.uuid == blob.uuid + assert found.compressed_size == blob.compressed_size + assert found.uncompressed_size == blob.uncompressed_size + assert found.uploading == blob.uploading + assert found.placements == blob.placements + + # Disconnect from the database by overwriting the connection. + def fail(x, y): + raise SomeException('Not connected!') + + with patch('data.registry_model.registry_pre_oci_model.model.blob.get_repository_blob_by_digest', + fail): + # Make sure we can load again, which should hit the cache. + cached = pre_oci_model.get_cached_repo_blob(model_cache, 'devtable', 'simple', blob.digest) + assert cached.digest == blob.digest + assert cached.uuid == blob.uuid + assert cached.compressed_size == blob.compressed_size + assert cached.uncompressed_size == blob.uncompressed_size + assert cached.uploading == blob.uploading + assert cached.placements == blob.placements + + # Try another blob, which should fail since the DB is not connected and the cache + # does not contain the blob. + with pytest.raises(SomeException): + pre_oci_model.get_cached_repo_blob(model_cache, 'devtable', 'simple', 'some other digest') diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 49eddbcab..0aa34680a 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -3,7 +3,7 @@ import re from flask import url_for, request, redirect, Response, abort as flask_abort -from app import storage, app, get_app_url, metric_queue +from app import storage, app, get_app_url, metric_queue, model_cache from auth.registry_jwt_auth import process_registry_jwt_auth from auth.permissions import ReadRepositoryPermission from data import database @@ -39,12 +39,8 @@ class _InvalidRangeHeader(Exception): @anon_protect @cache_control(max_age=31436000) def check_blob_exists(namespace_name, repo_name, digest): - repository_ref = registry_model.lookup_repository(namespace_name, repo_name) - if repository_ref is None: - raise NameUnknown() - # Find the blob. - blob = registry_model.get_repo_blob_by_digest(repository_ref, digest, include_placements=True) + blob = registry_model.get_cached_repo_blob(model_cache, namespace_name, repo_name, digest) if blob is None: raise BlobUnknown() @@ -70,12 +66,8 @@ def check_blob_exists(namespace_name, repo_name, digest): @anon_protect @cache_control(max_age=31536000) def download_blob(namespace_name, repo_name, digest): - repository_ref = registry_model.lookup_repository(namespace_name, repo_name) - if repository_ref is None: - raise NameUnknown() - # Find the blob. - blob = registry_model.get_repo_blob_by_digest(repository_ref, digest, include_placements=True) + blob = registry_model.get_cached_repo_blob(model_cache, namespace_name, repo_name, digest) if blob is None: raise BlobUnknown() diff --git a/endpoints/v2/test/test_blob.py b/endpoints/v2/test/test_blob.py index c07c11922..7551bfb1c 100644 --- a/endpoints/v2/test/test_blob.py +++ b/endpoints/v2/test/test_blob.py @@ -50,7 +50,7 @@ def test_blob_caching(method, endpoint, client, app): with patch('endpoints.v2.blob.model_cache', InMemoryDataModelCache()): # First request should make a DB query to retrieve the blob. - with assert_query_count(1): + with assert_query_count(3): conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200, headers=headers)