Reimplement cache support for blobs in the registry data model
This commit is contained in:
parent
7a68c41f1c
commit
a172de4fdc
7 changed files with 109 additions and 16 deletions
2
data/cache/cache_key.py
vendored
2
data/cache/cache_key.py
vendored
|
@ -7,7 +7,7 @@ class CacheKey(namedtuple('CacheKey', ['key', 'expiration'])):
|
|||
|
||||
def for_repository_blob(namespace_name, repo_name, digest):
|
||||
""" Returns a cache key for a blob in a repository. """
|
||||
return CacheKey('repository_blob__%s_%s_%s' % (namespace_name, repo_name, digest), '60s')
|
||||
return CacheKey('repo_blob__%s_%s_%s' % (namespace_name, repo_name, digest), '60s')
|
||||
|
||||
|
||||
def for_catalog_page(auth_context_key, start_id, limit):
|
||||
|
|
|
@ -282,7 +282,7 @@ def lookup_repo_storages_by_content_checksum(repo, checksums):
|
|||
candidate_subq = (ImageStorage
|
||||
.select(ImageStorage.id, ImageStorage.content_checksum,
|
||||
ImageStorage.image_size, ImageStorage.uuid, ImageStorage.cas_path,
|
||||
ImageStorage.uncompressed_size)
|
||||
ImageStorage.uncompressed_size, ImageStorage.uploading)
|
||||
.join(Image)
|
||||
.where(Image.repository == repo, ImageStorage.content_checksum == checksum)
|
||||
.limit(1)
|
||||
|
|
|
@ -2,6 +2,11 @@
|
|||
|
||||
from functools import wraps, total_ordering
|
||||
|
||||
class FromDictionaryException(Exception):
|
||||
""" Exception raised if constructing a data type from a dictionary fails due to
|
||||
a version mismatch or missing data.
|
||||
"""
|
||||
|
||||
def datatype(name, static_fields):
|
||||
""" Defines a base class for a datatype that will represent a row from the database,
|
||||
in an abstracted form.
|
||||
|
@ -33,6 +38,23 @@ def datatype(name, static_fields):
|
|||
def __repr__(self):
|
||||
return '<%s> #%s' % (name, self._db_id)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, dict_data):
|
||||
if dict_data.get('version') != 1:
|
||||
raise FromDictionaryException()
|
||||
|
||||
try:
|
||||
return cls(**dict_data)
|
||||
except:
|
||||
raise FromDictionaryException()
|
||||
|
||||
def asdict(self):
|
||||
dictionary_rep = dict(self._fields)
|
||||
dictionary_rep['db_id'] = self._db_id
|
||||
dictionary_rep['inputs'] = self._inputs
|
||||
dictionary_rep['version'] = 1
|
||||
return dictionary_rep
|
||||
|
||||
return DataType
|
||||
|
||||
|
||||
|
|
|
@ -8,8 +8,10 @@ from peewee import IntegrityError
|
|||
|
||||
from data import database
|
||||
from data import model
|
||||
from data.cache import cache_key
|
||||
from data.database import db_transaction
|
||||
from data.registry_model.interface import RegistryDataInterface
|
||||
from data.registry_model.datatype import FromDictionaryException
|
||||
from data.registry_model.datatypes import (Tag, RepositoryReference, Manifest, LegacyImage, Label,
|
||||
SecurityScanStatus, ManifestLayer, Blob, DerivedImage,
|
||||
TorrentInfo, BlobUpload)
|
||||
|
@ -685,6 +687,36 @@ class PreOCIModel(RegistryDataInterface):
|
|||
torrent_info = model.storage.save_torrent_info(image_storage, piece_length, pieces)
|
||||
return TorrentInfo.for_torrent_info(torrent_info)
|
||||
|
||||
def get_cached_repo_blob(self, model_cache, namespace_name, repo_name, blob_digest):
|
||||
"""
|
||||
Returns the blob in the repository with the given digest if any or None if none.
|
||||
Caches the result in the caching system.
|
||||
"""
|
||||
def load_blob():
|
||||
repository_ref = self.lookup_repository(namespace_name, repo_name)
|
||||
if repository_ref is None:
|
||||
return None
|
||||
|
||||
blob_found = self.get_repo_blob_by_digest(repository_ref, blob_digest,
|
||||
include_placements=True)
|
||||
if blob_found is None:
|
||||
return None
|
||||
|
||||
return blob_found.asdict()
|
||||
|
||||
blob_cache_key = cache_key.for_repository_blob(namespace_name, repo_name, blob_digest)
|
||||
blob_dict = model_cache.retrieve(blob_cache_key, load_blob)
|
||||
|
||||
try:
|
||||
return Blob.from_dict(blob_dict) if blob_dict is not None else None
|
||||
except FromDictionaryException:
|
||||
# The data was stale in some way. Simply reload.
|
||||
repository_ref = self.lookup_repository(namespace_name, repo_name)
|
||||
if repository_ref is None:
|
||||
return None
|
||||
|
||||
return self.get_repo_blob_by_digest(repository_ref, blob_digest, include_placements=True)
|
||||
|
||||
def get_repo_blob_by_digest(self, repository_ref, blob_digest, include_placements=False):
|
||||
"""
|
||||
Returns the blob in the repository with the given digest, if any or None if none. Note that
|
||||
|
|
|
@ -5,14 +5,15 @@ from datetime import datetime, timedelta
|
|||
|
||||
import pytest
|
||||
|
||||
from mock import patch
|
||||
from playhouse.test_utils import assert_query_count
|
||||
|
||||
from app import docker_v2_signing_key
|
||||
from data import model
|
||||
from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest, ManifestBlob,
|
||||
ManifestLegacyImage, ManifestLabel, TagManifest, RepositoryTag, Image,
|
||||
TagManifestLabel, TagManifest, TagManifestLabel, DerivedStorageForImage,
|
||||
TorrentInfo)
|
||||
TorrentInfo, close_db_filter)
|
||||
from data.cache.impl import InMemoryDataModelCache
|
||||
from data.registry_model.registry_pre_oci_model import PreOCIModel
|
||||
from data.registry_model.datatypes import RepositoryReference
|
||||
|
||||
|
@ -638,3 +639,49 @@ def test_mount_blob_into_repository(pre_oci_model):
|
|||
# Ensure it now exists.
|
||||
found = pre_oci_model.get_repo_blob_by_digest(target_repository_ref, layer.blob.digest)
|
||||
assert found == layer.blob
|
||||
|
||||
|
||||
class SomeException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def test_get_cached_repo_blob(pre_oci_model):
|
||||
model_cache = InMemoryDataModelCache()
|
||||
|
||||
repository_ref = pre_oci_model.lookup_repository('devtable', 'simple')
|
||||
latest_tag = pre_oci_model.get_repo_tag(repository_ref, 'latest')
|
||||
manifest = pre_oci_model.get_manifest_for_tag(latest_tag)
|
||||
|
||||
layers = pre_oci_model.list_manifest_layers(manifest, include_placements=True)
|
||||
assert layers
|
||||
|
||||
blob = layers[0].blob
|
||||
|
||||
# Load a blob to add it to the cache.
|
||||
found = pre_oci_model.get_cached_repo_blob(model_cache, 'devtable', 'simple', blob.digest)
|
||||
assert found.digest == blob.digest
|
||||
assert found.uuid == blob.uuid
|
||||
assert found.compressed_size == blob.compressed_size
|
||||
assert found.uncompressed_size == blob.uncompressed_size
|
||||
assert found.uploading == blob.uploading
|
||||
assert found.placements == blob.placements
|
||||
|
||||
# Disconnect from the database by overwriting the connection.
|
||||
def fail(x, y):
|
||||
raise SomeException('Not connected!')
|
||||
|
||||
with patch('data.registry_model.registry_pre_oci_model.model.blob.get_repository_blob_by_digest',
|
||||
fail):
|
||||
# Make sure we can load again, which should hit the cache.
|
||||
cached = pre_oci_model.get_cached_repo_blob(model_cache, 'devtable', 'simple', blob.digest)
|
||||
assert cached.digest == blob.digest
|
||||
assert cached.uuid == blob.uuid
|
||||
assert cached.compressed_size == blob.compressed_size
|
||||
assert cached.uncompressed_size == blob.uncompressed_size
|
||||
assert cached.uploading == blob.uploading
|
||||
assert cached.placements == blob.placements
|
||||
|
||||
# Try another blob, which should fail since the DB is not connected and the cache
|
||||
# does not contain the blob.
|
||||
with pytest.raises(SomeException):
|
||||
pre_oci_model.get_cached_repo_blob(model_cache, 'devtable', 'simple', 'some other digest')
|
||||
|
|
|
@ -3,7 +3,7 @@ import re
|
|||
|
||||
from flask import url_for, request, redirect, Response, abort as flask_abort
|
||||
|
||||
from app import storage, app, get_app_url, metric_queue
|
||||
from app import storage, app, get_app_url, metric_queue, model_cache
|
||||
from auth.registry_jwt_auth import process_registry_jwt_auth
|
||||
from auth.permissions import ReadRepositoryPermission
|
||||
from data import database
|
||||
|
@ -39,12 +39,8 @@ class _InvalidRangeHeader(Exception):
|
|||
@anon_protect
|
||||
@cache_control(max_age=31436000)
|
||||
def check_blob_exists(namespace_name, repo_name, digest):
|
||||
repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
|
||||
if repository_ref is None:
|
||||
raise NameUnknown()
|
||||
|
||||
# Find the blob.
|
||||
blob = registry_model.get_repo_blob_by_digest(repository_ref, digest, include_placements=True)
|
||||
blob = registry_model.get_cached_repo_blob(model_cache, namespace_name, repo_name, digest)
|
||||
if blob is None:
|
||||
raise BlobUnknown()
|
||||
|
||||
|
@ -70,12 +66,8 @@ def check_blob_exists(namespace_name, repo_name, digest):
|
|||
@anon_protect
|
||||
@cache_control(max_age=31536000)
|
||||
def download_blob(namespace_name, repo_name, digest):
|
||||
repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
|
||||
if repository_ref is None:
|
||||
raise NameUnknown()
|
||||
|
||||
# Find the blob.
|
||||
blob = registry_model.get_repo_blob_by_digest(repository_ref, digest, include_placements=True)
|
||||
blob = registry_model.get_cached_repo_blob(model_cache, namespace_name, repo_name, digest)
|
||||
if blob is None:
|
||||
raise BlobUnknown()
|
||||
|
||||
|
|
|
@ -50,7 +50,7 @@ def test_blob_caching(method, endpoint, client, app):
|
|||
|
||||
with patch('endpoints.v2.blob.model_cache', InMemoryDataModelCache()):
|
||||
# First request should make a DB query to retrieve the blob.
|
||||
with assert_query_count(1):
|
||||
with assert_query_count(3):
|
||||
conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200,
|
||||
headers=headers)
|
||||
|
||||
|
|
Reference in a new issue