Implement support for blob mounting via the mount parameter on blob uploads

Fixes https://jira.coreos.com/browse/QUAY-893
This commit is contained in:
Joseph Schorr 2018-04-15 14:55:14 +03:00
parent 44bb000fa5
commit 0fa1a1d5fd
5 changed files with 179 additions and 8 deletions

View file

@ -30,9 +30,7 @@ def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_
link_expiration_s, uncompressed_byte_count=None): link_expiration_s, uncompressed_byte_count=None):
""" Store a record of the blob and temporarily link it to the specified repository. """ Store a record of the blob and temporarily link it to the specified repository.
""" """
random_image_name = str(uuid4())
with db_transaction(): with db_transaction():
repo = _basequery.get_existing_repository(namespace, repo_name)
try: try:
storage = ImageStorage.get(content_checksum=blob_digest) storage = ImageStorage.get(content_checksum=blob_digest)
storage.image_size = byte_count storage.image_size = byte_count
@ -51,12 +49,33 @@ def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_
except ImageStoragePlacement.DoesNotExist: except ImageStoragePlacement.DoesNotExist:
ImageStoragePlacement.create(storage=storage, location=location_obj) ImageStoragePlacement.create(storage=storage, location=location_obj)
# Create a temporary link into the repository, to be replaced by the v1 metadata later _temp_link_blob(namespace, repo_name, storage, link_expiration_s)
# and create a temporary tag to reference it return storage
image = Image.create(storage=storage, docker_image_id=random_image_name, repository=repo)
tag.create_temporary_hidden_tag(repo, image, link_expiration_s)
return storage
def temp_link_blob(namespace, repo_name, blob_digest, link_expiration_s):
""" Temporarily links to the blob record from the given namespace. If the blob record is not
found, return None.
"""
with db_transaction():
try:
storage = ImageStorage.get(content_checksum=blob_digest)
except ImageStorage.DoesNotExist:
return None
_temp_link_blob(namespace, repo_name, storage, link_expiration_s)
return storage
def _temp_link_blob(namespace, repo_name, storage, link_expiration_s):
""" Note: Should *always* be called by a parent under a transaction. """
random_image_name = str(uuid4())
repo = _basequery.get_existing_repository(namespace, repo_name)
# Create a temporary link into the repository, to be replaced by the v1 metadata later
# and create a temporary tag to reference it
image = Image.create(storage=storage, docker_image_id=random_image_name, repository=repo)
tag.create_temporary_hidden_tag(repo, image, link_expiration_s)
def get_stale_blob_upload(stale_timespan): def get_stale_blob_upload(stale_timespan):

View file

@ -9,16 +9,19 @@ import resumablehashlib
from app import storage, app, get_app_url, metric_queue, model_cache from app import storage, app, get_app_url, metric_queue, model_cache
from auth.registry_jwt_auth import process_registry_jwt_auth from auth.registry_jwt_auth import process_registry_jwt_auth
from auth.permissions import ReadRepositoryPermission
from data import database from data import database
from data.cache import cache_key from data.cache import cache_key
from digest import digest_tools from digest import digest_tools
from endpoints.decorators import anon_protect, parse_repository_name from endpoints.decorators import anon_protect, parse_repository_name
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
from endpoints.v2.errors import ( from endpoints.v2.errors import (
BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported, NameUnknown, LayerTooLarge) BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported, NameUnknown, LayerTooLarge,
InvalidRequest)
from endpoints.v2.models_interface import Blob from endpoints.v2.models_interface import Blob
from endpoints.v2.models_pre_oci import data_model as model from endpoints.v2.models_pre_oci import data_model as model
from util.cache import cache_control from util.cache import cache_control
from util.names import parse_namespace_repository
from util.registry.filelike import wrap_with_handler, StreamSlice from util.registry.filelike import wrap_with_handler, StreamSlice
from util.registry.gzipstream import calculate_size_handler from util.registry.gzipstream import calculate_size_handler
from util.registry.torrent import PieceHasher from util.registry.torrent import PieceHasher
@ -121,6 +124,59 @@ def download_blob(namespace_name, repo_name, digest):
'Content-Type': BLOB_CONTENT_TYPE,}),) 'Content-Type': BLOB_CONTENT_TYPE,}),)
def _try_to_mount_blob(namespace_name, repo_name, mount_blob_digest):
""" Attempts to mount a blob requested by the user from another repository. """
logger.debug('Got mount request for blob `%s` into `%s/%s`', mount_blob_digest, namespace_name,
repo_name)
from_repo = request.args.get('from', None)
if from_repo is None:
raise InvalidRequest
# Ensure the user has access to the repository.
logger.debug('Got mount request for blob `%s` under repository `%s` into `%s/%s`',
mount_blob_digest, from_repo, namespace_name, repo_name)
from_namespace, from_repo_name = parse_namespace_repository(from_repo,
app.config['LIBRARY_NAMESPACE'],
include_tag=False)
# First check permission. This does not hit the DB so we do it first.
read_permission = ReadRepositoryPermission(from_namespace, from_repo_name).can()
if not read_permission:
# If no direct permission, check if the repostory is public.
if not model.is_repository_public(from_namespace, from_repo_name):
logger.debug('No permission to mount blob `%s` under repository `%s` into `%s/%s`',
mount_blob_digest, from_repo, namespace_name, repo_name)
return None
# Lookup if the mount blob's digest exists in the repository.
mount_blob = model.get_blob_by_digest(from_namespace, from_repo_name, mount_blob_digest)
if mount_blob is None:
logger.debug('Blob `%s` under repository `%s` not found', mount_blob_digest, from_repo)
return None
logger.debug('Mounting blob `%s` under repository `%s` into `%s/%s`', mount_blob_digest,
from_repo, namespace_name, repo_name)
# Mount the blob into the current repository and return that we've completed the operation.
expiration_sec = app.config['PUSH_TEMP_TAG_EXPIRATION_SEC']
if not model.mount_blob_and_temp_tag(namespace_name, repo_name, mount_blob, expiration_sec):
logger.debug('Could not mount blob `%s` under repository `%s` not found', mount_blob_digest,
from_repo)
return
# Return the response for the blob indicating that it was mounted, and including its content
# digest.
logger.debug('Mounted blob `%s` under repository `%s` into `%s/%s`', mount_blob_digest,
from_repo, namespace_name, repo_name)
return Response(
status=201,
headers={
'Docker-Content-Digest': mount_blob_digest,
'Location':
get_app_url() + url_for('v2.download_blob', repository='%s/%s' %
(namespace_name, repo_name), digest=mount_blob_digest),},)
@v2_bp.route('/<repopath:repository>/blobs/uploads/', methods=['POST']) @v2_bp.route('/<repopath:repository>/blobs/uploads/', methods=['POST'])
@parse_repository_name() @parse_repository_name()
@process_registry_jwt_auth(scopes=['pull', 'push']) @process_registry_jwt_auth(scopes=['pull', 'push'])
@ -135,6 +191,14 @@ def start_blob_upload(namespace_name, repo_name):
if not repository_exists: if not repository_exists:
raise NameUnknown() raise NameUnknown()
# Check for mounting of a blob from another repository.
mount_blob_digest = request.args.get('mount', None)
if mount_blob_digest is not None:
response = _try_to_mount_blob(namespace_name, repo_name, mount_blob_digest)
if response is not None:
return response
# Check for a normal blob upload.
digest = request.args.get('digest', None) digest = request.args.get('digest', None)
if digest is None: if digest is None:
# Short-circuit because the user will send the blob data in another request. # Short-circuit because the user will send the blob data in another request.

View file

@ -220,6 +220,14 @@ class DockerRegistryV2DataInterface(object):
""" """
pass pass
@abstractmethod
def mount_blob_and_temp_tag(self, namespace_name, repo_name, existing_blob, expiration_sec):
"""
Mounts an existing blob and links a temporary tag with the specified expiration to it under
the matching repository. Returns True on success and False on failure.
"""
pass
@abstractmethod @abstractmethod
def create_blob_and_temp_tag(self, namespace_name, repo_name, blob_digest, blob_upload, def create_blob_and_temp_tag(self, namespace_name, repo_name, blob_digest, blob_upload,
expiration_sec): expiration_sec):
@ -276,3 +284,8 @@ class DockerRegistryV2DataInterface(object):
""" Returns whether the given namespace is enabled. If the namespace doesn't exist, """ Returns whether the given namespace is enabled. If the namespace doesn't exist,
returns True. """ returns True. """
pass pass
@abstractmethod
def is_repository_public(self, namespace_name, repo_name):
""" Returns True if the repository with the given name exists and is public. """
pass

View file

@ -190,6 +190,10 @@ class PreOCIModel(DockerRegistryV2DataInterface):
except model.InvalidBlobUpload: except model.InvalidBlobUpload:
return return
def mount_blob_and_temp_tag(self, namespace_name, repo_name, existing_blob, expiration_sec):
return model.blob.temp_link_blob(namespace_name, repo_name, existing_blob.digest,
expiration_sec)
def create_blob_and_temp_tag(self, namespace_name, repo_name, blob_digest, blob_upload, def create_blob_and_temp_tag(self, namespace_name, repo_name, blob_digest, blob_upload,
expiration_sec): expiration_sec):
location_obj = model.storage.get_image_location_for_name(blob_upload.location_name) location_obj = model.storage.get_image_location_for_name(blob_upload.location_name)
@ -262,6 +266,9 @@ class PreOCIModel(DockerRegistryV2DataInterface):
namespace = model.user.get_namespace_user(namespace_name) namespace = model.user.get_namespace_user(namespace_name)
return namespace is None or namespace.enabled return namespace is None or namespace.enabled
def is_repository_public(self, namespace_name, repo_name):
return model.repository.repository_is_public(namespace_name, repo_name)
def _docker_v1_metadata(namespace_name, repo_name, repo_image): def _docker_v1_metadata(namespace_name, repo_name, repo_image):
""" """

View file

@ -58,3 +58,71 @@ def test_blob_caching(method, endpoint, client, app):
with assert_query_count(0): with assert_query_count(0):
conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200, conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200,
headers=headers) headers=headers)
@pytest.mark.parametrize('mount_digest, source_repo, username, expect_success', [
# Unknown blob.
('sha256:unknown', 'devtable/simple', 'devtable', False),
# Blob not in repo.
('sha256:' + hashlib.sha256("a").hexdigest(), 'devtable/complex', 'devtable', False),
# Blob in repo.
('sha256:' + hashlib.sha256("b").hexdigest(), 'devtable/complex', 'devtable', True),
# No access to repo.
('sha256:' + hashlib.sha256("b").hexdigest(), 'devtable/complex', 'public', False),
# Public repo.
('sha256:' + hashlib.sha256("c").hexdigest(), 'public/publicrepo', 'devtable', True),
])
def test_blob_mounting(mount_digest, source_repo, username, expect_success, client, app):
location = ImageStorageLocation.get(name='local_us')
# Store and link some blobs.
digest = 'sha256:' + hashlib.sha256("a").hexdigest()
model.blob.store_blob_record_and_temp_link('devtable', 'simple', digest, location, 1, 10000000)
digest = 'sha256:' + hashlib.sha256("b").hexdigest()
model.blob.store_blob_record_and_temp_link('devtable', 'complex', digest, location, 1, 10000000)
digest = 'sha256:' + hashlib.sha256("c").hexdigest()
model.blob.store_blob_record_and_temp_link('public', 'publicrepo', digest, location, 1, 10000000)
params = {
'repository': 'devtable/building',
'mount': mount_digest,
'from': source_repo,
}
user = model.user.get_user(username)
access = [{
'type': 'repository',
'name': 'devtable/building',
'actions': ['pull', 'push'],
}]
if source_repo.find(username) == 0:
access.append({
'type': 'repository',
'name': source_repo,
'actions': ['pull'],
})
context, subject = build_context_and_subject(ValidatedAuthContext(user=user))
token = generate_bearer_token(realapp.config['SERVER_HOSTNAME'], subject, context, access, 600,
instance_keys)
headers = {
'Authorization': 'Bearer %s' % token,
}
expected_code = 201 if expect_success else 202
conduct_call(client, 'v2.start_blob_upload', url_for, 'POST', params, expected_code=expected_code,
headers=headers)
if expect_success:
# Ensure the blob now exists under the repo.
model.blob.get_repo_blob_by_digest('devtable', 'building', mount_digest)
else:
with pytest.raises(model.blob.BlobDoesNotExist):
model.blob.get_repo_blob_by_digest('devtable', 'building', mount_digest)