From 0fa1a1d5fd253a48215052df7fb5016fe6cf4a87 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Sun, 15 Apr 2018 14:55:14 +0300 Subject: [PATCH 1/2] Implement support for blob mounting via the `mount` parameter on blob uploads Fixes https://jira.coreos.com/browse/QUAY-893 --- data/model/blob.py | 33 ++++++++++++---- endpoints/v2/blob.py | 66 ++++++++++++++++++++++++++++++- endpoints/v2/models_interface.py | 13 ++++++ endpoints/v2/models_pre_oci.py | 7 ++++ endpoints/v2/test/test_blob.py | 68 ++++++++++++++++++++++++++++++++ 5 files changed, 179 insertions(+), 8 deletions(-) diff --git a/data/model/blob.py b/data/model/blob.py index 0a3f1a39f..6c74f6bde 100644 --- a/data/model/blob.py +++ b/data/model/blob.py @@ -30,9 +30,7 @@ def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_ link_expiration_s, uncompressed_byte_count=None): """ Store a record of the blob and temporarily link it to the specified repository. """ - random_image_name = str(uuid4()) with db_transaction(): - repo = _basequery.get_existing_repository(namespace, repo_name) try: storage = ImageStorage.get(content_checksum=blob_digest) storage.image_size = byte_count @@ -51,12 +49,33 @@ def store_blob_record_and_temp_link(namespace, repo_name, blob_digest, location_ except ImageStoragePlacement.DoesNotExist: ImageStoragePlacement.create(storage=storage, location=location_obj) - # Create a temporary link into the repository, to be replaced by the v1 metadata later - # and create a temporary tag to reference it - image = Image.create(storage=storage, docker_image_id=random_image_name, repository=repo) - tag.create_temporary_hidden_tag(repo, image, link_expiration_s) + _temp_link_blob(namespace, repo_name, storage, link_expiration_s) + return storage - return storage + +def temp_link_blob(namespace, repo_name, blob_digest, link_expiration_s): + """ Temporarily links to the blob record from the given namespace. If the blob record is not + found, return None. + """ + with db_transaction(): + try: + storage = ImageStorage.get(content_checksum=blob_digest) + except ImageStorage.DoesNotExist: + return None + + _temp_link_blob(namespace, repo_name, storage, link_expiration_s) + return storage + + +def _temp_link_blob(namespace, repo_name, storage, link_expiration_s): + """ Note: Should *always* be called by a parent under a transaction. """ + random_image_name = str(uuid4()) + repo = _basequery.get_existing_repository(namespace, repo_name) + + # Create a temporary link into the repository, to be replaced by the v1 metadata later + # and create a temporary tag to reference it + image = Image.create(storage=storage, docker_image_id=random_image_name, repository=repo) + tag.create_temporary_hidden_tag(repo, image, link_expiration_s) def get_stale_blob_upload(stale_timespan): diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 133610ba3..5b6551a54 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -9,16 +9,19 @@ import resumablehashlib from app import storage, app, get_app_url, metric_queue, model_cache from auth.registry_jwt_auth import process_registry_jwt_auth +from auth.permissions import ReadRepositoryPermission from data import database from data.cache import cache_key from digest import digest_tools from endpoints.decorators import anon_protect, parse_repository_name from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream from endpoints.v2.errors import ( - BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported, NameUnknown, LayerTooLarge) + BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported, NameUnknown, LayerTooLarge, + InvalidRequest) from endpoints.v2.models_interface import Blob from endpoints.v2.models_pre_oci import data_model as model from util.cache import cache_control +from util.names import parse_namespace_repository from util.registry.filelike import wrap_with_handler, StreamSlice from util.registry.gzipstream import calculate_size_handler from util.registry.torrent import PieceHasher @@ -121,6 +124,59 @@ def download_blob(namespace_name, repo_name, digest): 'Content-Type': BLOB_CONTENT_TYPE,}),) +def _try_to_mount_blob(namespace_name, repo_name, mount_blob_digest): + """ Attempts to mount a blob requested by the user from another repository. """ + logger.debug('Got mount request for blob `%s` into `%s/%s`', mount_blob_digest, namespace_name, + repo_name) + from_repo = request.args.get('from', None) + if from_repo is None: + raise InvalidRequest + + # Ensure the user has access to the repository. + logger.debug('Got mount request for blob `%s` under repository `%s` into `%s/%s`', + mount_blob_digest, from_repo, namespace_name, repo_name) + from_namespace, from_repo_name = parse_namespace_repository(from_repo, + app.config['LIBRARY_NAMESPACE'], + include_tag=False) + + # First check permission. This does not hit the DB so we do it first. + read_permission = ReadRepositoryPermission(from_namespace, from_repo_name).can() + if not read_permission: + # If no direct permission, check if the repostory is public. + if not model.is_repository_public(from_namespace, from_repo_name): + logger.debug('No permission to mount blob `%s` under repository `%s` into `%s/%s`', + mount_blob_digest, from_repo, namespace_name, repo_name) + return None + + # Lookup if the mount blob's digest exists in the repository. + mount_blob = model.get_blob_by_digest(from_namespace, from_repo_name, mount_blob_digest) + if mount_blob is None: + logger.debug('Blob `%s` under repository `%s` not found', mount_blob_digest, from_repo) + return None + + logger.debug('Mounting blob `%s` under repository `%s` into `%s/%s`', mount_blob_digest, + from_repo, namespace_name, repo_name) + + # Mount the blob into the current repository and return that we've completed the operation. + expiration_sec = app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'] + if not model.mount_blob_and_temp_tag(namespace_name, repo_name, mount_blob, expiration_sec): + logger.debug('Could not mount blob `%s` under repository `%s` not found', mount_blob_digest, + from_repo) + return + + # Return the response for the blob indicating that it was mounted, and including its content + # digest. + logger.debug('Mounted blob `%s` under repository `%s` into `%s/%s`', mount_blob_digest, + from_repo, namespace_name, repo_name) + return Response( + status=201, + headers={ + 'Docker-Content-Digest': mount_blob_digest, + 'Location': + get_app_url() + url_for('v2.download_blob', repository='%s/%s' % + (namespace_name, repo_name), digest=mount_blob_digest),},) + + @v2_bp.route('//blobs/uploads/', methods=['POST']) @parse_repository_name() @process_registry_jwt_auth(scopes=['pull', 'push']) @@ -135,6 +191,14 @@ def start_blob_upload(namespace_name, repo_name): if not repository_exists: raise NameUnknown() + # Check for mounting of a blob from another repository. + mount_blob_digest = request.args.get('mount', None) + if mount_blob_digest is not None: + response = _try_to_mount_blob(namespace_name, repo_name, mount_blob_digest) + if response is not None: + return response + + # Check for a normal blob upload. digest = request.args.get('digest', None) if digest is None: # Short-circuit because the user will send the blob data in another request. diff --git a/endpoints/v2/models_interface.py b/endpoints/v2/models_interface.py index e43687359..1d5b17436 100644 --- a/endpoints/v2/models_interface.py +++ b/endpoints/v2/models_interface.py @@ -220,6 +220,14 @@ class DockerRegistryV2DataInterface(object): """ pass + @abstractmethod + def mount_blob_and_temp_tag(self, namespace_name, repo_name, existing_blob, expiration_sec): + """ + Mounts an existing blob and links a temporary tag with the specified expiration to it under + the matching repository. Returns True on success and False on failure. + """ + pass + @abstractmethod def create_blob_and_temp_tag(self, namespace_name, repo_name, blob_digest, blob_upload, expiration_sec): @@ -276,3 +284,8 @@ class DockerRegistryV2DataInterface(object): """ Returns whether the given namespace is enabled. If the namespace doesn't exist, returns True. """ pass + + @abstractmethod + def is_repository_public(self, namespace_name, repo_name): + """ Returns True if the repository with the given name exists and is public. """ + pass diff --git a/endpoints/v2/models_pre_oci.py b/endpoints/v2/models_pre_oci.py index 86f64f454..14ed17130 100644 --- a/endpoints/v2/models_pre_oci.py +++ b/endpoints/v2/models_pre_oci.py @@ -190,6 +190,10 @@ class PreOCIModel(DockerRegistryV2DataInterface): except model.InvalidBlobUpload: return + def mount_blob_and_temp_tag(self, namespace_name, repo_name, existing_blob, expiration_sec): + return model.blob.temp_link_blob(namespace_name, repo_name, existing_blob.digest, + expiration_sec) + def create_blob_and_temp_tag(self, namespace_name, repo_name, blob_digest, blob_upload, expiration_sec): location_obj = model.storage.get_image_location_for_name(blob_upload.location_name) @@ -262,6 +266,9 @@ class PreOCIModel(DockerRegistryV2DataInterface): namespace = model.user.get_namespace_user(namespace_name) return namespace is None or namespace.enabled + def is_repository_public(self, namespace_name, repo_name): + return model.repository.repository_is_public(namespace_name, repo_name) + def _docker_v1_metadata(namespace_name, repo_name, repo_image): """ diff --git a/endpoints/v2/test/test_blob.py b/endpoints/v2/test/test_blob.py index 5747ac5c3..c07c11922 100644 --- a/endpoints/v2/test/test_blob.py +++ b/endpoints/v2/test/test_blob.py @@ -58,3 +58,71 @@ def test_blob_caching(method, endpoint, client, app): with assert_query_count(0): conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200, headers=headers) + +@pytest.mark.parametrize('mount_digest, source_repo, username, expect_success', [ + # Unknown blob. + ('sha256:unknown', 'devtable/simple', 'devtable', False), + + # Blob not in repo. + ('sha256:' + hashlib.sha256("a").hexdigest(), 'devtable/complex', 'devtable', False), + + # Blob in repo. + ('sha256:' + hashlib.sha256("b").hexdigest(), 'devtable/complex', 'devtable', True), + + # No access to repo. + ('sha256:' + hashlib.sha256("b").hexdigest(), 'devtable/complex', 'public', False), + + # Public repo. + ('sha256:' + hashlib.sha256("c").hexdigest(), 'public/publicrepo', 'devtable', True), +]) +def test_blob_mounting(mount_digest, source_repo, username, expect_success, client, app): + location = ImageStorageLocation.get(name='local_us') + + # Store and link some blobs. + digest = 'sha256:' + hashlib.sha256("a").hexdigest() + model.blob.store_blob_record_and_temp_link('devtable', 'simple', digest, location, 1, 10000000) + + digest = 'sha256:' + hashlib.sha256("b").hexdigest() + model.blob.store_blob_record_and_temp_link('devtable', 'complex', digest, location, 1, 10000000) + + digest = 'sha256:' + hashlib.sha256("c").hexdigest() + model.blob.store_blob_record_and_temp_link('public', 'publicrepo', digest, location, 1, 10000000) + + params = { + 'repository': 'devtable/building', + 'mount': mount_digest, + 'from': source_repo, + } + + user = model.user.get_user(username) + access = [{ + 'type': 'repository', + 'name': 'devtable/building', + 'actions': ['pull', 'push'], + }] + + if source_repo.find(username) == 0: + access.append({ + 'type': 'repository', + 'name': source_repo, + 'actions': ['pull'], + }) + + context, subject = build_context_and_subject(ValidatedAuthContext(user=user)) + token = generate_bearer_token(realapp.config['SERVER_HOSTNAME'], subject, context, access, 600, + instance_keys) + + headers = { + 'Authorization': 'Bearer %s' % token, + } + + expected_code = 201 if expect_success else 202 + conduct_call(client, 'v2.start_blob_upload', url_for, 'POST', params, expected_code=expected_code, + headers=headers) + + if expect_success: + # Ensure the blob now exists under the repo. + model.blob.get_repo_blob_by_digest('devtable', 'building', mount_digest) + else: + with pytest.raises(model.blob.BlobDoesNotExist): + model.blob.get_repo_blob_by_digest('devtable', 'building', mount_digest) From f6eaf7ce9de97aa4ed04189d3ba166a4e3a32b71 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 30 May 2018 16:49:46 -0400 Subject: [PATCH 2/2] Add blob mounting tests to the new registry test suite --- test/registry/protocol_v2.py | 128 ++++++++++++++++++-------------- test/registry/protocols.py | 2 + test/registry/registry_tests.py | 37 +++++++++ 3 files changed, 112 insertions(+), 55 deletions(-) diff --git a/test/registry/protocol_v2.py b/test/registry/protocol_v2.py index b1650537f..85a2e52e0 100644 --- a/test/registry/protocol_v2.py +++ b/test/registry/protocol_v2.py @@ -15,6 +15,7 @@ class V2ProtocolSteps(Enum): BLOB_HEAD_CHECK = 'blob-head-check' GET_MANIFEST = 'get-manifest' PUT_MANIFEST = 'put-manifest' + MOUNT_BLOB = 'mount-blob' class V2Protocol(RegistryProtocol): @@ -27,6 +28,9 @@ class V2Protocol(RegistryProtocol): Failures.INVALID_REPOSITORY: 400, Failures.NAMESPACE_DISABLED: 400, }, + V2ProtocolSteps.MOUNT_BLOB: { + Failures.UNAUTHORIZED_FOR_MOUNT: 202, + }, V2ProtocolSteps.GET_MANIFEST: { Failures.UNKNOWN_TAG: 404, Failures.UNAUTHORIZED: 403, @@ -92,8 +96,7 @@ class V2Protocol(RegistryProtocol): } if scopes: - params['scope'] = 'repository:%s:%s' % (self.repo_name(namespace, repo_name), - ','.join(scopes)) + params['scope'] = scopes response = self.conduct(session, 'GET', '/v2/auth', params=params, auth=auth, expected_status=(200, expected_failure, V2ProtocolSteps.AUTH)) @@ -107,7 +110,7 @@ class V2Protocol(RegistryProtocol): def push(self, session, namespace, repo_name, tag_names, images, credentials=None, expected_failure=None, options=None): options = options or ProtocolOptions() - scopes = options.scopes or ['push', 'pull'] + scopes = options.scopes or ['repository:%s:push,pull' % self.repo_name(namespace, repo_name)] tag_names = [tag_names] if isinstance(tag_names, str) else tag_names # Ping! @@ -160,64 +163,79 @@ class V2Protocol(RegistryProtocol): expected_status=(404, expected_failure, V2ProtocolSteps.BLOB_HEAD_CHECK), headers=headers) - # Start a new upload of the layer data. - response = self.conduct(session, 'POST', - '/v2/%s/blobs/uploads/' % self.repo_name(namespace, repo_name), - expected_status=202, - headers=headers) - - upload_uuid = response.headers['Docker-Upload-UUID'] - new_upload_location = response.headers['Location'] - assert new_upload_location.startswith('http://localhost:5000') - - # We need to make this relative just for the tests because the live server test - # case modifies the port. - location = response.headers['Location'][len('http://localhost:5000'):] - - # PATCH the image data into the layer. - if options.chunks_for_upload is None: - self.conduct(session, 'PATCH', location, data=image.bytes, expected_status=204, + # Check for mounting of blobs. + if options.mount_blobs and image.id in options.mount_blobs: + self.conduct(session, 'POST', + '/v2/%s/blobs/uploads/' % self.repo_name(namespace, repo_name), + params={ + 'mount': checksum, + 'from': options.mount_blobs[image.id], + }, + expected_status=(201, expected_failure, V2ProtocolSteps.MOUNT_BLOB), headers=headers) + if expected_failure is not None: + return else: - # If chunked upload is requested, upload the data as a series of chunks, checking - # status at every point. - for chunk_data in options.chunks_for_upload: - if len(chunk_data) == 3: - (start_byte, end_byte, expected_code) = chunk_data - else: - (start_byte, end_byte) = chunk_data - expected_code = 204 + # Start a new upload of the layer data. + response = self.conduct(session, 'POST', + '/v2/%s/blobs/uploads/' % self.repo_name(namespace, repo_name), + expected_status=202, + headers=headers) - patch_headers = {'Range': 'bytes=%s-%s' % (start_byte, end_byte)} - patch_headers.update(headers) + upload_uuid = response.headers['Docker-Upload-UUID'] + new_upload_location = response.headers['Location'] + assert new_upload_location.startswith('http://localhost:5000') - contents_chunk = image.bytes[start_byte:end_byte] - self.conduct(session, 'PATCH', location, data=contents_chunk, - expected_status=expected_code, - headers=patch_headers) - if expected_code != 204: - return + # We need to make this relative just for the tests because the live server test + # case modifies the port. + location = response.headers['Location'][len('http://localhost:5000'):] - # Retrieve the upload status at each point, and ensure it is valid. + # PATCH the image data into the layer. + if options.chunks_for_upload is None: + self.conduct(session, 'PATCH', location, data=image.bytes, expected_status=204, + headers=headers) + else: + # If chunked upload is requested, upload the data as a series of chunks, checking + # status at every point. + for chunk_data in options.chunks_for_upload: + if len(chunk_data) == 3: + (start_byte, end_byte, expected_code) = chunk_data + else: + (start_byte, end_byte) = chunk_data + expected_code = 204 + + patch_headers = {'Range': 'bytes=%s-%s' % (start_byte, end_byte)} + patch_headers.update(headers) + + contents_chunk = image.bytes[start_byte:end_byte] + self.conduct(session, 'PATCH', location, data=contents_chunk, + expected_status=expected_code, + headers=patch_headers) + if expected_code != 204: + return + + # Retrieve the upload status at each point, and ensure it is valid. + status_url = '/v2/%s/blobs/uploads/%s' % (self.repo_name(namespace, repo_name), + upload_uuid) + response = self.conduct(session, 'GET', status_url, expected_status=204, + headers=headers) + assert response.headers['Docker-Upload-UUID'] == upload_uuid + assert response.headers['Range'] == "bytes=0-%s" % end_byte + + if options.cancel_blob_upload: + self.conduct(session, 'DELETE', location, params=dict(digest=checksum), + expected_status=204, headers=headers) + + # Ensure the upload was canceled. status_url = '/v2/%s/blobs/uploads/%s' % (self.repo_name(namespace, repo_name), upload_uuid) - response = self.conduct(session, 'GET', status_url, expected_status=204, headers=headers) - assert response.headers['Docker-Upload-UUID'] == upload_uuid - assert response.headers['Range'] == "bytes=0-%s" % end_byte + self.conduct(session, 'GET', status_url, expected_status=404, headers=headers) + return - if options.cancel_blob_upload: - self.conduct(session, 'DELETE', location, params=dict(digest=checksum), expected_status=204, - headers=headers) - - # Ensure the upload was canceled. - status_url = '/v2/%s/blobs/uploads/%s' % (self.repo_name(namespace, repo_name), upload_uuid) - self.conduct(session, 'GET', status_url, expected_status=404, headers=headers) - return - - # Finish the layer upload with a PUT. - response = self.conduct(session, 'PUT', location, params=dict(digest=checksum), - expected_status=201, headers=headers) - assert response.headers['Docker-Content-Digest'] == checksum + # Finish the layer upload with a PUT. + response = self.conduct(session, 'PUT', location, params=dict(digest=checksum), + expected_status=201, headers=headers) + assert response.headers['Docker-Content-Digest'] == checksum # Ensure the layer exists now. response = self.conduct(session, 'HEAD', @@ -258,7 +276,7 @@ class V2Protocol(RegistryProtocol): def delete(self, session, namespace, repo_name, tag_names, credentials=None, expected_failure=None, options=None): options = options or ProtocolOptions() - scopes = options.scopes or ['*'] + scopes = options.scopes or ['repository:%s:*' % self.repo_name(namespace, repo_name)] tag_names = [tag_names] if isinstance(tag_names, str) else tag_names # Ping! @@ -284,7 +302,7 @@ class V2Protocol(RegistryProtocol): def pull(self, session, namespace, repo_name, tag_names, images, credentials=None, expected_failure=None, options=None): options = options or ProtocolOptions() - scopes = options.scopes or ['pull'] + scopes = options.scopes or ['repository:%s:pull' % self.repo_name(namespace, repo_name)] tag_names = [tag_names] if isinstance(tag_names, str) else tag_names # Ping! diff --git a/test/registry/protocols.py b/test/registry/protocols.py index d4dd6c145..833a4dc4f 100644 --- a/test/registry/protocols.py +++ b/test/registry/protocols.py @@ -51,6 +51,7 @@ class Failures(Enum): UNSUPPORTED_CONTENT_TYPE = 'unsupported-content-type' INVALID_BLOB = 'invalid-blob' NAMESPACE_DISABLED = 'namespace-disabled' + UNAUTHORIZED_FOR_MOUNT = 'unauthorized-for-mount' class ProtocolOptions(object): @@ -62,6 +63,7 @@ class ProtocolOptions(object): self.chunks_for_upload = None self.skip_head_checks = False self.manifest_content_type = None + self.mount_blobs = None @add_metaclass(ABCMeta) diff --git a/test/registry/registry_tests.py b/test/registry/registry_tests.py index 6d3529893..07503ed15 100644 --- a/test/registry/registry_tests.py +++ b/test/registry/registry_tests.py @@ -783,6 +783,43 @@ def test_squashed_images(use_estimates, pusher, sized_images, liveserver_session tarfile.open(fileobj=tar.extractfile(tar.getmember('%s/layer.tar' % expected_image_id))) +@pytest.mark.parametrize('push_user, push_namespace, push_repo, mount_repo_name, expected_failure', [ + # Successful mount, same namespace. + ('devtable', 'devtable', 'baserepo', 'devtable/baserepo', None), + + # Successful mount, cross namespace. + ('devtable', 'buynlarge', 'baserepo', 'buynlarge/baserepo', None), + + # Unsuccessful mount, unknown repo. + ('devtable', 'devtable', 'baserepo', 'unknown/repohere', Failures.UNAUTHORIZED_FOR_MOUNT), + + # Unsuccessful mount, no access. + ('public', 'public', 'baserepo', 'public/baserepo', Failures.UNAUTHORIZED_FOR_MOUNT), +]) +def test_blob_mounting(push_user, push_namespace, push_repo, mount_repo_name, expected_failure, + manifest_protocol, pusher, puller, basic_images, liveserver_session, + app_reloader): + # Push an image so we can attempt to mount it. + pusher.push(liveserver_session, push_namespace, push_repo, 'latest', basic_images, + credentials=(push_user, 'password')) + + # Push again, trying to mount the image layer(s) from the mount repo. + options = ProtocolOptions() + options.scopes = ['repository:devtable/newrepo:push,pull', + 'repository:%s:pull' % (mount_repo_name)] + options.mount_blobs = {image.id: mount_repo_name for image in basic_images} + + manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, + credentials=('devtable', 'password'), + options=options, + expected_failure=expected_failure) + + if expected_failure is None: + # Pull to ensure it worked. + puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, + credentials=('devtable', 'password')) + + def get_robot_password(api_caller): api_caller.conduct_auth('devtable', 'password') resp = api_caller.get('/api/v1/organization/buynlarge/robots/ownerbot')