From 8cfb3f4fe88dd5911592225c3981a552bc57f022 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 20 Sep 2018 15:49:20 -0400 Subject: [PATCH 01/13] Add interface function for deleting tags pointing to a manifest --- data/registry_model/interface.py | 10 +++++++++- data/registry_model/registry_pre_oci_model.py | 15 +++++++++++++++ data/registry_model/test/test_pre_oci_model.py | 13 +++++++++++-- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/data/registry_model/interface.py b/data/registry_model/interface.py index 748a8bcc0..3324f360d 100644 --- a/data/registry_model/interface.py +++ b/data/registry_model/interface.py @@ -82,7 +82,8 @@ class RegistryDataInterface(object): """ @abstractmethod - def list_repository_tag_history(self, repository_ref, page=1, size=100, specific_tag_name=None, active_tags_only=False): + def list_repository_tag_history(self, repository_ref, page=1, size=100, specific_tag_name=None, + active_tags_only=False): """ Returns the history of all tags in the repository (unless filtered). This includes tags that have been made in-active due to newer versions of those tags coming into service. @@ -110,6 +111,13 @@ class RegistryDataInterface(object): Deletes the latest, *active* tag with the given name in the repository. """ + @abstractmethod + def delete_tags_for_manifest(self, manifest): + """ + Deletes all tags pointing to the given manifest, making the manifest inaccessible for pulling. + Returns the tags deleted, if any. Returns None on error. + """ + @abstractmethod def change_repository_tag_expiration(self, tag, expiration_date): """ Sets the expiration date of the tag under the matching repository to that given. If the diff --git a/data/registry_model/registry_pre_oci_model.py b/data/registry_model/registry_pre_oci_model.py index bba644df3..d26c3af07 100644 --- a/data/registry_model/registry_pre_oci_model.py +++ b/data/registry_model/registry_pre_oci_model.py @@ -241,6 +241,21 @@ class PreOCIModel(RegistryDataInterface): deleted_tag = model.tag.delete_tag(repo.namespace_user.username, repo.name, tag_name) return Tag.for_repository_tag(deleted_tag) + def delete_tags_for_manifest(self, manifest): + """ + Deletes all tags pointing to the given manifest, making the manifest inaccessible for pulling. + Returns the tags deleted, if any. Returns None on error. + """ + try: + tagmanifest = database.TagManifest.get(id=manifest._db_id) + except database.TagManifest.DoesNotExist: + return None + + namespace_name = tagmanifest.tag.repository.namespace_user.username + repo_name = tagmanifest.tag.repository.name + tags = model.tag.delete_manifest_by_digest(namespace_name, repo_name, manifest.digest) + return [Tag.for_repository_tag(tag) for tag in tags] + def change_repository_tag_expiration(self, tag, expiration_date): """ Sets the expiration date of the tag under the matching repository to that given. If the expiration date is None, then the tag will not expire. Returns a tuple of the previous diff --git a/data/registry_model/test/test_pre_oci_model.py b/data/registry_model/test/test_pre_oci_model.py index cdeec9f45..5a1b5df15 100644 --- a/data/registry_model/test/test_pre_oci_model.py +++ b/data/registry_model/test/test_pre_oci_model.py @@ -209,7 +209,11 @@ def test_repository_tag_history(pre_oci_model): ('devtable', 'history'), ('buynlarge', 'orgrepo'), ]) -def test_delete_tags(repo_namespace, repo_name, pre_oci_model): +@pytest.mark.parametrize('via_manifest', [ + False, + True, +]) +def test_delete_tags(repo_namespace, repo_name, via_manifest, pre_oci_model): repository_ref = pre_oci_model.lookup_repository(repo_namespace, repo_name) tags = pre_oci_model.list_repository_tags(repository_ref) assert len(tags) @@ -220,7 +224,12 @@ def test_delete_tags(repo_namespace, repo_name, pre_oci_model): # Delete every tag in the repository. for tag in tags: - assert pre_oci_model.delete_tag(repository_ref, tag.name) + if via_manifest: + assert pre_oci_model.delete_tag(repository_ref, tag.name) + else: + manifest = pre_oci_model.get_manifest_for_tag(tag) + if manifest is not None: + assert pre_oci_model.delete_tags_for_manifest(manifest) # Make sure the tag is no longer found. with assert_query_count(1): From 03789b2210965bd04fd088b892536509402679f2 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 20 Sep 2018 16:00:02 -0400 Subject: [PATCH 02/13] Add interface for batch creation of labels on a manifest This cannot be a true batch operation right now because of the current mapping table entries needed, but we can create and use the interface now and change the underlying implementation later --- data/registry_model/interface.py | 8 ++++++ data/registry_model/registry_pre_oci_model.py | 28 +++++++++++++++++++ .../registry_model/test/test_pre_oci_model.py | 15 ++++++++++ 3 files changed, 51 insertions(+) diff --git a/data/registry_model/interface.py b/data/registry_model/interface.py index 3324f360d..581e54a8a 100644 --- a/data/registry_model/interface.py +++ b/data/registry_model/interface.py @@ -57,6 +57,14 @@ class RegistryDataInterface(object): on the validation errors. """ + @abstractmethod + def batch_create_manifest_labels(self, manifest): + """ Returns a context manager for batch creation of labels on a manifest. + + Can raise InvalidLabelKeyException or InvalidMediaTypeException depending + on the validation errors. + """ + @abstractmethod def list_manifest_labels(self, manifest, key_prefix=None): """ Returns all labels found on the manifest. If specified, the key_prefix will filter the diff --git a/data/registry_model/registry_pre_oci_model.py b/data/registry_model/registry_pre_oci_model.py index d26c3af07..d6b1b5325 100644 --- a/data/registry_model/registry_pre_oci_model.py +++ b/data/registry_model/registry_pre_oci_model.py @@ -2,6 +2,7 @@ import logging from collections import defaultdict +from contextlib import contextmanager from peewee import IntegrityError @@ -138,6 +139,33 @@ class PreOCIModel(RegistryDataInterface): media_type_name) return Label.for_label(label) + @contextmanager + def batch_create_manifest_labels(self, manifest): + """ Returns a context manager for batch creation of labels on a manifest. + + Can raise InvalidLabelKeyException or InvalidMediaTypeException depending + on the validation errors. + """ + try: + tag_manifest = database.TagManifest.get(id=manifest._db_id) + except database.TagManifest.DoesNotExist: + yield None + return + + labels_to_add = [] + def add_label(key, value, source_type_name, media_type_name=None): + labels_to_add.append(dict(key=key, value=value, source_type_name=source_type_name, + media_type_name=media_type_name)) + + yield add_label + if labels_to_add: + pass + + # TODO: make this truly batch once we've fully transitioned to V2_2 and no longer need + # the mapping tables. + for label in labels_to_add: + model.label.create_manifest_label(tag_manifest, **label) + def list_manifest_labels(self, manifest, key_prefix=None): """ Returns all labels found on the manifest. If specified, the key_prefix will filter the labels returned to those keys that start with the given prefix. diff --git a/data/registry_model/test/test_pre_oci_model.py b/data/registry_model/test/test_pre_oci_model.py index 5a1b5df15..9275fab1c 100644 --- a/data/registry_model/test/test_pre_oci_model.py +++ b/data/registry_model/test/test_pre_oci_model.py @@ -167,6 +167,21 @@ def test_manifest_labels(pre_oci_model): assert created not in pre_oci_model.list_manifest_labels(found_manifest) +def test_batch_labels(pre_oci_model): + repo = model.repository.get_repository('devtable', 'history') + repository_ref = RepositoryReference.for_repo_obj(repo) + found_tag = pre_oci_model.find_matching_tag(repository_ref, ['latest']) + found_manifest = pre_oci_model.get_manifest_for_tag(found_tag) + + with pre_oci_model.batch_create_manifest_labels(found_manifest) as add_label: + add_label('foo', '1', 'api') + add_label('bar', '2', 'api') + add_label('baz', '3', 'api') + + # Ensure we can look them up. + assert len(pre_oci_model.list_manifest_labels(found_manifest)) == 3 + + @pytest.mark.parametrize('repo_namespace, repo_name', [ ('devtable', 'simple'), ('devtable', 'complex'), From 818ed32f872a66499af8d4247c38ea6dfa6d03cb Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 20 Sep 2018 16:11:20 -0400 Subject: [PATCH 03/13] Add function in data interface for mounting blobs into other repositories --- data/registry_model/interface.py | 9 ++++++++ data/registry_model/registry_pre_oci_model.py | 17 ++++++++++++++ .../registry_model/test/test_pre_oci_model.py | 23 +++++++++++++++++++ 3 files changed, 49 insertions(+) diff --git a/data/registry_model/interface.py b/data/registry_model/interface.py index 581e54a8a..f5949b538 100644 --- a/data/registry_model/interface.py +++ b/data/registry_model/interface.py @@ -246,3 +246,12 @@ class RegistryDataInterface(object): def commit_blob_upload(self, blob_upload, blob_digest_str, blob_expiration_seconds): """ Commits the blob upload into a blob and sets an expiration before that blob will be GCed. """ + + @abstractmethod + def mount_blob_into_repository(self, blob, target_repository_ref, expiration_sec): + """ + Mounts the blob from another repository into the specified target repository, and adds an + expiration before that blob is automatically GCed. This function is useful during push + operations if an existing blob from another repositroy is being pushed. Returns False if + the mounting fails. + """ diff --git a/data/registry_model/registry_pre_oci_model.py b/data/registry_model/registry_pre_oci_model.py index d6b1b5325..72ae699b5 100644 --- a/data/registry_model/registry_pre_oci_model.py +++ b/data/registry_model/registry_pre_oci_model.py @@ -690,5 +690,22 @@ class PreOCIModel(RegistryDataInterface): return Blob.for_image_storage(blob_record, storage_path=model.storage.get_layer_path(blob_record)) + def mount_blob_into_repository(self, blob, target_repository_ref, expiration_sec): + """ + Mounts the blob from another repository into the specified target repository, and adds an + expiration before that blob is automatically GCed. This function is useful during push + operations if an existing blob from another repositroy is being pushed. Returns False if + the mounting fails. + """ + repo = model.repository.lookup_repository(target_repository_ref._db_id) + if repo is None: + return False + + namespace_name = repo.namespace_user.username + repo_name = repo.name + + storage = model.blob.temp_link_blob(namespace_name, repo_name, blob.digest, + expiration_sec) + return bool(storage) pre_oci_model = PreOCIModel() diff --git a/data/registry_model/test/test_pre_oci_model.py b/data/registry_model/test/test_pre_oci_model.py index 9275fab1c..3acb75146 100644 --- a/data/registry_model/test/test_pre_oci_model.py +++ b/data/registry_model/test/test_pre_oci_model.py @@ -554,6 +554,7 @@ def test_torrent_info(pre_oci_model): assert torrent_info.pieces == 'foo' +<<<<<<< HEAD def test_blob_uploads(pre_oci_model): repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') @@ -599,3 +600,25 @@ def test_commit_blob_upload(pre_oci_model): # Ensure the upload can no longer be found. assert not pre_oci_model.lookup_blob_upload(repository_ref, blob_upload.upload_id) + + +def test_mount_blob_into_repository(pre_oci_model): + repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') + latest_tag = pre_oci_model.get_repo_tag(repository_ref, 'latest') + manifest = pre_oci_model.get_manifest_for_tag(latest_tag) + + target_repository_ref = pre_oci_model.lookup_repository('devtable', 'complex') + + layers = pre_oci_model.list_manifest_layers(manifest, include_placements=True) + assert layers + + for layer in layers: + # Ensure the blob doesn't exist under the repository. + assert not pre_oci_model.get_repo_blob_by_digest(target_repository_ref, layer.blob.digest) + + # Mount the blob into the repository. + assert pre_oci_model.mount_blob_into_repository(layer.blob, target_repository_ref, 60) + + # Ensure it now exists. + found = pre_oci_model.get_repo_blob_by_digest(target_repository_ref, layer.blob.digest) + assert found == layer.blob From 0ae062be62f945967d4f8c05c5ec6e1d754ae857 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 20 Sep 2018 17:49:00 -0400 Subject: [PATCH 04/13] Add manifest creation to new registry data model interface --- data/model/tag.py | 11 ++ data/registry_model/interface.py | 19 ++++ data/registry_model/label_handlers.py | 28 +++++ data/registry_model/registry_pre_oci_model.py | 106 +++++++++++++++++- .../registry_model/test/test_pre_oci_model.py | 18 ++- image/docker/interfaces.py | 6 + image/docker/schema1.py | 4 + image/docker/schema2/manifest.py | 4 + workers/manifestbackfillworker.py | 4 + 9 files changed, 195 insertions(+), 5 deletions(-) create mode 100644 data/registry_model/label_handlers.py diff --git a/data/model/tag.py b/data/model/tag.py index 52fa1ba39..f87c99c70 100644 --- a/data/model/tag.py +++ b/data/model/tag.py @@ -1,6 +1,7 @@ import logging from calendar import timegm +from datetime import datetime from uuid import uuid4 from peewee import IntegrityError, JOIN, fn @@ -757,6 +758,16 @@ def change_repository_tag_expiration(namespace_name, repo_name, tag_name, expira return (None, False) +def set_tag_expiration_for_manifest(tag_manifest, expiration_sec): + """ + Changes the expiration of the tag that point to the given manifest to be its lifetime start + + the expiration seconds. + """ + expiration_time_in_seconds = tag_manifest.tag.lifetime_start_ts + expiration_sec + expiration_date = datetime.utcfromtimestamp(expiration_time_in_seconds) + return change_tag_expiration(tag_manifest.tag, expiration_date) + + def change_tag_expiration(tag, expiration_date): """ Changes the expiration of the given tag to the given expiration datetime. If the expiration datetime is None, then the tag is marked as not expiring. diff --git a/data/registry_model/interface.py b/data/registry_model/interface.py index f5949b538..099851d67 100644 --- a/data/registry_model/interface.py +++ b/data/registry_model/interface.py @@ -35,6 +35,19 @@ class RegistryDataInterface(object): """ Looks up the manifest with the given digest under the given repository and returns it or None if none. """ + @abstractmethod + def create_manifest_and_retarget_tag(self, repository_ref, manifest_interface_instance, tag_name): + """ Creates a manifest in a repository, adding all of the necessary data in the model. + + The `manifest_interface_instance` parameter must be an instance of the manifest + interface as returned by the image/docker package. + + Note that all blobs referenced by the manifest must exist under the repository or this + method will fail and return None. + + Returns a reference to the (created manifest, tag) or (None, None) on error. + """ + @abstractmethod def get_legacy_images(self, repository_ref): """ @@ -255,3 +268,9 @@ class RegistryDataInterface(object): operations if an existing blob from another repositroy is being pushed. Returns False if the mounting fails. """ + + @abstractmethod + def set_tags_expiration_for_manifest(self, manifest, expiration_sec): + """ + Sets the expiration on all tags that point to the given manifest to that specified. + """ diff --git a/data/registry_model/label_handlers.py b/data/registry_model/label_handlers.py new file mode 100644 index 000000000..07635537a --- /dev/null +++ b/data/registry_model/label_handlers.py @@ -0,0 +1,28 @@ +import logging + +from util.timedeltastring import convert_to_timedelta + +logger = logging.getLogger(__name__) + +def _expires_after(label_dict, manifest, model): + """ Sets the expiration of a manifest based on the quay.expires-in label. """ + try: + timedelta = convert_to_timedelta(label_dict['value']) + except ValueError: + logger.exception('Could not convert %s to timedeltastring', label_dict['value']) + return + + total_seconds = timedelta.total_seconds() + logger.debug('Labeling manifest %s with expiration of %s', manifest, total_seconds) + model.set_tags_expiration_for_manifest(manifest, total_seconds) + + +_LABEL_HANDLES = { + 'quay.expires-after': _expires_after, +} + +def apply_label_to_manifest(label_dict, manifest, model): + """ Runs the handler defined, if any, for the given label. """ + handler = _LABEL_HANDLES.get(label_dict['key']) + if handler is not None: + handler(label_dict, manifest, model) diff --git a/data/registry_model/registry_pre_oci_model.py b/data/registry_model/registry_pre_oci_model.py index 72ae699b5..4031d5eef 100644 --- a/data/registry_model/registry_pre_oci_model.py +++ b/data/registry_model/registry_pre_oci_model.py @@ -8,11 +8,15 @@ from peewee import IntegrityError from data import database from data import model +from data.database import db_transaction from data.registry_model.interface import RegistryDataInterface from data.registry_model.datatypes import (Tag, RepositoryReference, Manifest, LegacyImage, Label, SecurityScanStatus, ManifestLayer, Blob, DerivedImage, TorrentInfo, BlobUpload) -from image.docker.schema1 import DockerSchema1ManifestBuilder, ManifestException +from data.registry_model.label_handlers import apply_label_to_manifest +from image.docker.schema1 import (DockerSchema1ManifestBuilder, ManifestException, + DockerSchema1Manifest) +from util.validation import is_json logger = logging.getLogger(__name__) @@ -81,6 +85,75 @@ class PreOCIModel(RegistryDataInterface): return Manifest.for_tag_manifest(tag_manifest, legacy_image) + def create_manifest_and_retarget_tag(self, repository_ref, manifest_interface_instance, tag_name): + """ Creates a manifest in a repository, adding all of the necessary data in the model. + + The `manifest_interface_instance` parameter must be an instance of the manifest + interface as returned by the image/docker package. + + Note that all blobs referenced by the manifest must exist under the repository or this + method will fail and return None. + + Returns a reference to the (created manifest, tag) or (None, None) on error. + """ + # NOTE: Only Schema1 is supported by the pre_oci_model. + assert isinstance(manifest_interface_instance, DockerSchema1Manifest) + if not manifest_interface_instance.layers: + return None, None + + # Ensure all the blobs in the manifest exist. + digests = manifest_interface_instance.checksums + query = model.storage.lookup_repo_storages_by_content_checksum(repository_ref._db_id, digests) + blob_map = {s.content_checksum: s.id for s in query} + for layer in manifest_interface_instance.layers: + digest_str = str(layer.digest) + if digest_str not in blob_map: + return None, None + + # Lookup all the images and their parent images (if any) inside the manifest. + # This will let us know which v1 images we need to synthesize and which ones are invalid. + docker_image_ids = list(manifest_interface_instance.legacy_image_ids) + images_query = model.image.lookup_repository_images(repository_ref._db_id, docker_image_ids) + images_map = {i.docker_image_id: i.storage for i in images_query} + + # Rewrite any v1 image IDs that do not match the checksum in the database. + try: + rewritten_images = list(manifest_interface_instance.rewrite_invalid_image_ids(images_map)) + for rewritten_image in rewritten_images: + if not rewritten_image.image_id in images_map: + model.image.synthesize_v1_image( + repository_ref._db_id, + blob_map[rewritten_image.content_checksum], + rewritten_image.image_id, + rewritten_image.created, + rewritten_image.comment, + rewritten_image.command, + rewritten_image.compat_json, + rewritten_image.parent_image_id, + ) + except ManifestException: + logger.exception("exception when rewriting v1 metadata") + return None, None + + # Store the manifest pointing to the tag. + leaf_layer_id = rewritten_images[-1].image_id + tag_manifest, newly_created = model.tag.store_tag_manifest_for_repo(repository_ref._db_id, + tag_name, + manifest_interface_instance, + leaf_layer_id, + blob_map) + + manifest = Manifest.for_tag_manifest(tag_manifest) + + # Save the labels on the manifest. + if newly_created: + with self.batch_create_manifest_labels(manifest) as add_label: + for key, value in manifest.layers[-1].v1_metadata.labels.iteritems(): + media_type = 'application/json' if is_json(value) else 'text/plain' + add_label(key, value, 'manifest', media_type) + + return manifest, Tag.for_repository_tag(tag_manifest.tag) + def get_legacy_images(self, repository_ref): """ Returns an iterator of all the LegacyImage's defined in the matching repository. @@ -135,8 +208,17 @@ class PreOCIModel(RegistryDataInterface): except database.TagManifest.DoesNotExist: return None - label = model.label.create_manifest_label(tag_manifest, key, value, source_type_name, - media_type_name) + label_data = dict(key=key, value=value, source_type_name=source_type_name, + media_type_name=media_type_name) + + with db_transaction(): + # Create the label itself. + label = model.label.create_manifest_label(tag_manifest, key, value, source_type_name, + media_type_name) + + # Apply any changes to the manifest that the label prescribes. + apply_label_to_manifest(label_data, manifest, self) + return Label.for_label(label) @contextmanager @@ -164,7 +246,12 @@ class PreOCIModel(RegistryDataInterface): # TODO: make this truly batch once we've fully transitioned to V2_2 and no longer need # the mapping tables. for label in labels_to_add: - model.label.create_manifest_label(tag_manifest, **label) + with db_transaction(): + # Create the label itself. + model.label.create_manifest_label(tag_manifest, **label) + + # Apply any changes to the manifest that the label prescribes. + apply_label_to_manifest(label, manifest, self) def list_manifest_labels(self, manifest, key_prefix=None): """ Returns all labels found on the manifest. If specified, the key_prefix will filter the @@ -708,4 +795,15 @@ class PreOCIModel(RegistryDataInterface): expiration_sec) return bool(storage) + def set_tags_expiration_for_manifest(self, manifest, expiration_sec): + """ + Sets the expiration on all tags that point to the given manifest to that specified. + """ + try: + tag_manifest = database.TagManifest.get(id=manifest._db_id) + except database.TagManifest.DoesNotExist: + return None + + model.tag.set_tag_expiration_for_manifest(tag_manifest, expiration_sec) + pre_oci_model = PreOCIModel() diff --git a/data/registry_model/test/test_pre_oci_model.py b/data/registry_model/test/test_pre_oci_model.py index 3acb75146..06a43aaba 100644 --- a/data/registry_model/test/test_pre_oci_model.py +++ b/data/registry_model/test/test_pre_oci_model.py @@ -167,6 +167,23 @@ def test_manifest_labels(pre_oci_model): assert created not in pre_oci_model.list_manifest_labels(found_manifest) +def test_manifest_label_handlers(pre_oci_model): + repo = model.repository.get_repository('devtable', 'simple') + repository_ref = RepositoryReference.for_repo_obj(repo) + found_tag = pre_oci_model.get_repo_tag(repository_ref, 'latest') + found_manifest = pre_oci_model.get_manifest_for_tag(found_tag) + + # Ensure the tag has no expiration. + assert found_tag.lifetime_end_ts is None + + # Create a new label with an expires-after. + pre_oci_model.create_manifest_label(found_manifest, 'quay.expires-after', '2h', 'api') + + # Ensure the tag now has an expiration. + updated_tag = pre_oci_model.get_repo_tag(repository_ref, 'latest') + assert updated_tag.lifetime_end_ts == (updated_tag.lifetime_start_ts + (60 * 60 * 2)) + + def test_batch_labels(pre_oci_model): repo = model.repository.get_repository('devtable', 'history') repository_ref = RepositoryReference.for_repo_obj(repo) @@ -554,7 +571,6 @@ def test_torrent_info(pre_oci_model): assert torrent_info.pieces == 'foo' -<<<<<<< HEAD def test_blob_uploads(pre_oci_model): repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') diff --git a/image/docker/interfaces.py b/image/docker/interfaces.py index 5ce12de93..62dd4563c 100644 --- a/image/docker/interfaces.py +++ b/image/docker/interfaces.py @@ -34,6 +34,12 @@ class ManifestInterface(object): """ Returns the Docker V1 image ID for the leaf (top) layer, if any, or None if none. """ pass + @abstractproperty + def legacy_image_ids(self): + """ Returns the Docker V1 image IDs for the layers of this manifest or None if not applicable. + """ + pass + @abstractproperty def blob_digests(self): """ Returns an iterator over all the blob digests referenced by this manifest, diff --git a/image/docker/schema1.py b/image/docker/schema1.py index 4897f4926..f7184c43b 100644 --- a/image/docker/schema1.py +++ b/image/docker/schema1.py @@ -254,6 +254,10 @@ class DockerSchema1Manifest(ManifestInterface): def image_ids(self): return {mdata.v1_metadata.image_id for mdata in self.layers} + @property + def legacy_image_ids(self): + return {mdata.v1_metadata.image_id for mdata in self.layers} + @property def parent_image_ids(self): return {mdata.v1_metadata.parent_image_id for mdata in self.layers diff --git a/image/docker/schema2/manifest.py b/image/docker/schema2/manifest.py index f4e443f66..ceb07fa30 100644 --- a/image/docker/schema2/manifest.py +++ b/image/docker/schema2/manifest.py @@ -172,6 +172,10 @@ class DockerSchema2Manifest(ManifestInterface): def leaf_layer_v1_image_id(self): return list(self.layers_with_v1_ids)[-1].v1_id + @property + def legacy_image_ids(self): + return [l.v1_id for l in self.layers_with_v1_ids] + @property def blob_digests(self): return [str(layer.digest) for layer in self.layers] diff --git a/workers/manifestbackfillworker.py b/workers/manifestbackfillworker.py index 8593fccdd..decb9dd44 100644 --- a/workers/manifestbackfillworker.py +++ b/workers/manifestbackfillworker.py @@ -51,6 +51,10 @@ class BrokenManifest(ManifestInterface): def layers(self): return [] + @property + def legacy_image_ids(self): + return [] + @property def leaf_layer_v1_image_id(self): return None From 7a68c41f1cc4cd4a717fc3160c2bbebd5e4c1ba5 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 4 Oct 2018 13:59:53 -0400 Subject: [PATCH 05/13] Convert V2 to use the blob uploader interface --- data/registry_model/blobuploader.py | 16 +- data/registry_model/interface.py | 2 +- endpoints/v2/blob.py | 460 ++++++++++------------------ 3 files changed, 170 insertions(+), 308 deletions(-) diff --git a/data/registry_model/blobuploader.py b/data/registry_model/blobuploader.py index 3d53bcfe3..a356f78e6 100644 --- a/data/registry_model/blobuploader.py +++ b/data/registry_model/blobuploader.py @@ -64,6 +64,20 @@ def retrieve_blob_upload_manager(repository_ref, blob_upload_id, storage, settin return _BlobUploadManager(repository_ref, blob_upload, settings, storage) +@contextmanager +def complete_when_uploaded(blob_upload): + """ Wraps the given blob upload in a context manager that completes the upload when the context + closes. + """ + try: + yield blob_upload + except Exception as ex: + logger.exception('Exception when uploading blob `%s`', blob_upload.blob_upload_id) + raise ex + finally: + # Cancel the upload if something went wrong or it was not commit to a blob. + if blob_upload.committed_blob is None: + blob_upload.cancel_upload() @contextmanager def upload_blob(repository_ref, storage, settings, extra_blob_stream_handlers=None): @@ -120,7 +134,7 @@ class _BlobUploadManager(object): if start_offset > 0 and start_offset > self.blob_upload.byte_count: logger.error('start_offset provided greater than blob_upload.byte_count') - return None + raise BlobUploadException() # Ensure that we won't go over the allowed maximum size for blobs. max_blob_size = bitmath.parse_string_unsafe(self.settings.maximum_blob_size) diff --git a/data/registry_model/interface.py b/data/registry_model/interface.py index 099851d67..c5a1d126d 100644 --- a/data/registry_model/interface.py +++ b/data/registry_model/interface.py @@ -266,7 +266,7 @@ class RegistryDataInterface(object): Mounts the blob from another repository into the specified target repository, and adds an expiration before that blob is automatically GCed. This function is useful during push operations if an existing blob from another repositroy is being pushed. Returns False if - the mounting fails. + the mounting fails. Note that this function does *not* check security for mounting the blob. """ @abstractmethod diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index ccff226d5..49eddbcab 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -1,30 +1,24 @@ import logging import re -import time from flask import url_for, request, redirect, Response, abort as flask_abort -import bitmath -import resumablehashlib - -from app import storage, app, get_app_url, metric_queue, model_cache +from app import storage, app, get_app_url, metric_queue from auth.registry_jwt_auth import process_registry_jwt_auth from auth.permissions import ReadRepositoryPermission from data import database -from data.cache import cache_key +from data.registry_model import registry_model +from data.registry_model.blobuploader import (create_blob_upload, retrieve_blob_upload_manager, + complete_when_uploaded, BlobUploadSettings, + BlobUploadException, BlobTooLargeException) from digest import digest_tools from endpoints.decorators import anon_protect, parse_repository_name from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream from endpoints.v2.errors import ( BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported, NameUnknown, LayerTooLarge, InvalidRequest) -from endpoints.v2.models_interface import Blob -from endpoints.v2.models_pre_oci import data_model as model from util.cache import cache_control from util.names import parse_namespace_repository -from util.registry.filelike import wrap_with_handler, StreamSlice -from util.registry.gzipstream import calculate_size_handler -from util.registry.torrent import PieceHasher logger = logging.getLogger(__name__) @@ -38,23 +32,6 @@ class _InvalidRangeHeader(Exception): pass -def _get_repository_blob(namespace_name, repo_name, digest): - """ Returns the blob with the given digest under the repository with the given - name. If one does not exist (or it is still uploading), returns None. - Automatically handles caching. - """ - def load_blob(): - blob = model.get_blob_by_digest(namespace_name, repo_name, digest) - if blob is None: - return None - - return blob._asdict() - - blob_cache_key = cache_key.for_repository_blob(namespace_name, repo_name, digest) - blob_dict = model_cache.retrieve(blob_cache_key, load_blob) - return Blob(**blob_dict) if blob_dict is not None else None - - @v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD']) @parse_repository_name() @process_registry_jwt_auth(scopes=['pull']) @@ -62,20 +39,24 @@ def _get_repository_blob(namespace_name, repo_name, digest): @anon_protect @cache_control(max_age=31436000) def check_blob_exists(namespace_name, repo_name, digest): + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + raise NameUnknown() + # Find the blob. - blob = _get_repository_blob(namespace_name, repo_name, digest) + blob = registry_model.get_repo_blob_by_digest(repository_ref, digest, include_placements=True) if blob is None: raise BlobUnknown() # Build the response headers. headers = { 'Docker-Content-Digest': digest, - 'Content-Length': blob.size, + 'Content-Length': blob.compressed_size, 'Content-Type': BLOB_CONTENT_TYPE, } # If our storage supports range requests, let the client know. - if storage.get_supports_resumable_downloads(blob.locations): + if storage.get_supports_resumable_downloads(blob.placements): headers['Accept-Ranges'] = 'bytes' # Write the response to the client. @@ -89,8 +70,12 @@ def check_blob_exists(namespace_name, repo_name, digest): @anon_protect @cache_control(max_age=31536000) def download_blob(namespace_name, repo_name, digest): + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + raise NameUnknown() + # Find the blob. - blob = _get_repository_blob(namespace_name, repo_name, digest) + blob = registry_model.get_repo_blob_by_digest(repository_ref, digest, include_placements=True) if blob is None: raise BlobUnknown() @@ -98,15 +83,13 @@ def download_blob(namespace_name, repo_name, digest): headers = {'Docker-Content-Digest': digest} # If our storage supports range requests, let the client know. - if storage.get_supports_resumable_downloads(blob.locations): + if storage.get_supports_resumable_downloads(blob.placements): headers['Accept-Ranges'] = 'bytes' - # Find the storage path for the blob. - path = model.get_blob_path(blob) - # Short-circuit by redirecting if the storage supports it. + path = blob.storage_path logger.debug('Looking up the direct download URL for path: %s', path) - direct_download_url = storage.get_direct_download_url(blob.locations, path, request.remote_addr) + direct_download_url = storage.get_direct_download_url(blob.placements, path, request.remote_addr) if direct_download_url: logger.debug('Returning direct download URL') resp = redirect(direct_download_url) @@ -118,63 +101,77 @@ def download_blob(namespace_name, repo_name, digest): with database.CloseForLongOperation(app.config): # Stream the response to the client. return Response( - storage.stream_read(blob.locations, path), + storage.stream_read(blob.placements, path), headers=headers.update({ - 'Content-Length': blob.size, - 'Content-Type': BLOB_CONTENT_TYPE,}),) + 'Content-Length': blob.compressed_size, + 'Content-Type': BLOB_CONTENT_TYPE, + }), + ) -def _try_to_mount_blob(namespace_name, repo_name, mount_blob_digest): +def _try_to_mount_blob(repository_ref, mount_blob_digest): """ Attempts to mount a blob requested by the user from another repository. """ - logger.debug('Got mount request for blob `%s` into `%s/%s`', mount_blob_digest, namespace_name, - repo_name) + logger.debug('Got mount request for blob `%s` into `%s`', mount_blob_digest, repository_ref) from_repo = request.args.get('from', None) if from_repo is None: raise InvalidRequest # Ensure the user has access to the repository. - logger.debug('Got mount request for blob `%s` under repository `%s` into `%s/%s`', - mount_blob_digest, from_repo, namespace_name, repo_name) + logger.debug('Got mount request for blob `%s` under repository `%s` into `%s`', + mount_blob_digest, from_repo, repository_ref) from_namespace, from_repo_name = parse_namespace_repository(from_repo, app.config['LIBRARY_NAMESPACE'], include_tag=False) - # First check permission. This does not hit the DB so we do it first. + from_repository_ref = registry_model.lookup_repository(from_namespace, from_repo_name) + if from_repository_ref is None: + logger.debug('Could not find from repo: `%s/%s`', from_namespace, from_repo_name) + return None + + # First check permission. read_permission = ReadRepositoryPermission(from_namespace, from_repo_name).can() if not read_permission: # If no direct permission, check if the repostory is public. - if not model.is_repository_public(from_namespace, from_repo_name): - logger.debug('No permission to mount blob `%s` under repository `%s` into `%s/%s`', - mount_blob_digest, from_repo, namespace_name, repo_name) + if not from_repository_ref.is_public: + logger.debug('No permission to mount blob `%s` under repository `%s` into `%s`', + mount_blob_digest, from_repo, repository_ref) return None # Lookup if the mount blob's digest exists in the repository. - mount_blob = model.get_blob_by_digest(from_namespace, from_repo_name, mount_blob_digest) + mount_blob = registry_model.get_repo_blob_by_digest(from_repository_ref, mount_blob_digest) if mount_blob is None: logger.debug('Blob `%s` under repository `%s` not found', mount_blob_digest, from_repo) return None - logger.debug('Mounting blob `%s` under repository `%s` into `%s/%s`', mount_blob_digest, - from_repo, namespace_name, repo_name) + logger.debug('Mounting blob `%s` under repository `%s` into `%s`', mount_blob_digest, + from_repo, repository_ref) # Mount the blob into the current repository and return that we've completed the operation. expiration_sec = app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'] - if not model.mount_blob_and_temp_tag(namespace_name, repo_name, mount_blob, expiration_sec): + mounted = registry_model.mount_blob_into_repository(mount_blob, repository_ref, expiration_sec) + if not mounted: logger.debug('Could not mount blob `%s` under repository `%s` not found', mount_blob_digest, from_repo) return # Return the response for the blob indicating that it was mounted, and including its content # digest. - logger.debug('Mounted blob `%s` under repository `%s` into `%s/%s`', mount_blob_digest, - from_repo, namespace_name, repo_name) + logger.debug('Mounted blob `%s` under repository `%s` into `%s`', mount_blob_digest, + from_repo, repository_ref) + + namespace_name = repository_ref.namespace_name + repo_name = repository_ref.name + return Response( status=201, headers={ 'Docker-Content-Digest': mount_blob_digest, 'Location': - get_app_url() + url_for('v2.download_blob', repository='%s/%s' % - (namespace_name, repo_name), digest=mount_blob_digest),},) + get_app_url() + url_for('v2.download_blob', + repository='%s/%s' % (namespace_name, repo_name), + digest=mount_blob_digest), + }, + ) @v2_bp.route('//blobs/uploads/', methods=['POST']) @@ -183,63 +180,56 @@ def _try_to_mount_blob(namespace_name, repo_name, mount_blob_digest): @require_repo_write @anon_protect def start_blob_upload(namespace_name, repo_name): - # Begin the blob upload process in the database and storage. - location_name = storage.preferred_locations[0] - new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name) - repository_exists = model.create_blob_upload(namespace_name, repo_name, new_upload_uuid, - location_name, upload_metadata) - if not repository_exists: + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: raise NameUnknown() # Check for mounting of a blob from another repository. mount_blob_digest = request.args.get('mount', None) if mount_blob_digest is not None: - response = _try_to_mount_blob(namespace_name, repo_name, mount_blob_digest) + response = _try_to_mount_blob(repository_ref, mount_blob_digest) if response is not None: return response - # Check for a normal blob upload. + # Begin the blob upload process. + blob_uploader = create_blob_upload(repository_ref, storage, _upload_settings()) + if blob_uploader is None: + logger.debug('Could not create a blob upload for `%s/%s`', namespace_name, repo_name) + raise InvalidRequest() + + # Check if the blob will be uploaded now or in followup calls. If the `digest` is given, then + # the upload will occur as a monolithic chunk in this call. Ohterwise, we return a redirect + # for the client to upload the chunks as distinct operations. digest = request.args.get('digest', None) if digest is None: # Short-circuit because the user will send the blob data in another request. return Response( status=202, headers={ - 'Docker-Upload-UUID': - new_upload_uuid, - 'Range': - _render_range(0), + 'Docker-Upload-UUID': blob_uploader.blob_upload_id, + 'Range': _render_range(0), 'Location': - get_app_url() + url_for('v2.upload_chunk', repository='%s/%s' % - (namespace_name, repo_name), upload_uuid=new_upload_uuid)},) + get_app_url() + url_for('v2.upload_chunk', + repository='%s/%s' % (namespace_name, repo_name), + upload_uuid=blob_uploader.blob_upload_id) + }, + ) - # The user plans to send us the entire body right now. - # Find the upload. - blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, new_upload_uuid) - if blob_upload is None: - raise BlobUploadUnknown() - - # Upload the chunk to storage while calculating some metadata and updating - # the upload state. - updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range')) - if updated_blob_upload is None: - _abort_range_not_satisfiable(blob_upload.byte_count, new_upload_uuid) - - # Save the upload state to the database. - model.update_blob_upload(updated_blob_upload) - - # Finalize the upload process in the database and storage. - _finish_upload(namespace_name, repo_name, updated_blob_upload, digest) + # Upload the data sent and commit it to a blob. + with complete_when_uploaded(blob_uploader): + _upload_chunk(blob_uploader, digest) # Write the response to the client. return Response( status=201, headers={ - 'Docker-Content-Digest': - digest, + 'Docker-Content-Digest': digest, 'Location': - get_app_url() + url_for('v2.download_blob', repository='%s/%s' % - (namespace_name, repo_name), digest=digest),},) + get_app_url() + url_for('v2.download_blob', + repository='%s/%s' % (namespace_name, repo_name), + digest=digest), + }, + ) @v2_bp.route('//blobs/uploads/', methods=['GET']) @@ -248,16 +238,21 @@ def start_blob_upload(namespace_name, repo_name): @require_repo_write @anon_protect def fetch_existing_upload(namespace_name, repo_name, upload_uuid): - blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid) - if blob_upload is None: + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + raise NameUnknown() + + uploader = retrieve_blob_upload_manager(repository_ref, upload_uuid, storage, _upload_settings()) + if uploader is None: raise BlobUploadUnknown() return Response( status=204, headers={ 'Docker-Upload-UUID': upload_uuid, - 'Range': _render_range(blob_upload.byte_count + 1), # byte ranges are exclusive - },) + 'Range': _render_range(uploader.blob_upload.byte_count + 1), # byte ranges are exclusive + }, + ) @v2_bp.route('//blobs/uploads/', methods=['PATCH']) @@ -266,27 +261,26 @@ def fetch_existing_upload(namespace_name, repo_name, upload_uuid): @require_repo_write @anon_protect def upload_chunk(namespace_name, repo_name, upload_uuid): - # Find the upload. - blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid) - if blob_upload is None: + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + raise NameUnknown() + + uploader = retrieve_blob_upload_manager(repository_ref, upload_uuid, storage, _upload_settings()) + if uploader is None: raise BlobUploadUnknown() - # Upload the chunk to storage while calculating some metadata and updating - # the upload state. - updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range')) - if updated_blob_upload is None: - _abort_range_not_satisfiable(blob_upload.byte_count, upload_uuid) - - # Save the upload state to the database. - model.update_blob_upload(updated_blob_upload) + # Upload the chunk for the blob. + _upload_chunk(uploader) # Write the response to the client. return Response( status=204, headers={ 'Location': _current_request_url(), - 'Range': _render_range(updated_blob_upload.byte_count, with_bytes_prefix=False), - 'Docker-Upload-UUID': upload_uuid,},) + 'Range': _render_range(uploader.blob_upload.byte_count, with_bytes_prefix=False), + 'Docker-Upload-UUID': upload_uuid, + }, + ) @v2_bp.route('//blobs/uploads/', methods=['PUT']) @@ -301,26 +295,27 @@ def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid): raise BlobUploadInvalid(detail={'reason': 'Missing digest arg on monolithic upload'}) # Find the upload. - blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid) - if blob_upload is None: + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + raise NameUnknown() + + uploader = retrieve_blob_upload_manager(repository_ref, upload_uuid, storage, _upload_settings()) + if uploader is None: raise BlobUploadUnknown() - # Upload the chunk to storage while calculating some metadata and updating - # the upload state. - updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range')) - if updated_blob_upload is None: - _abort_range_not_satisfiable(blob_upload.byte_count, upload_uuid) - - # Finalize the upload process in the database and storage. - _finish_upload(namespace_name, repo_name, updated_blob_upload, digest) + # Upload the chunk for the blob and commit it once complete. + with complete_when_uploaded(uploader): + _upload_chunk(uploader, digest) # Write the response to the client. return Response(status=201, headers={ - 'Docker-Content-Digest': - digest, + 'Docker-Content-Digest': digest, 'Location': - get_app_url() + url_for('v2.download_blob', repository='%s/%s' % - (namespace_name, repo_name), digest=digest),}) + get_app_url() + url_for('v2.download_blob', + repository='%s/%s' % (namespace_name, repo_name), + digest=digest), + }, + ) @v2_bp.route('//blobs/uploads/', methods=['DELETE']) @@ -329,16 +324,15 @@ def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid): @require_repo_write @anon_protect def cancel_upload(namespace_name, repo_name, upload_uuid): - blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid) - if blob_upload is None: + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + raise NameUnknown() + + uploader = retrieve_blob_upload_manager(repository_ref, upload_uuid, storage, _upload_settings()) + if uploader is None: raise BlobUploadUnknown() - # We delete the record for the upload first, since if the partial upload in - # storage fails to delete, it doesn't break anything. - model.delete_blob_upload(namespace_name, repo_name, upload_uuid) - storage.cancel_chunked_upload({blob_upload.location_name}, blob_upload.uuid, - blob_upload.storage_metadata) - + uploader.cancel_upload() return Response(status=204) @@ -413,182 +407,36 @@ def _start_offset_and_length(range_header): return start_offset, length -def _upload_chunk(blob_upload, range_header): +def _upload_settings(): + """ Returns the settings for instantiating a blob upload manager. """ + expiration_sec = app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'] + settings = BlobUploadSettings(maximum_blob_size=app.config['MAXIMUM_LAYER_SIZE'], + bittorrent_piece_size=app.config['BITTORRENT_PIECE_SIZE'], + committed_blob_expiration=expiration_sec) + return settings + + +def _upload_chunk(blob_uploader, commit_digest=None): + """ Performs uploading of a chunk of data in the current request's stream, via the blob uploader + given. If commit_digest is specified, the upload is committed to a blob once the stream's + data has been read and stored. """ - Calculates metadata while uploading a chunk to storage. + start_offset, length = _start_offset_and_length(request.headers.get('range')) + if None in {start_offset, length}: + raise InvalidRequest() - Returns a BlobUpload object or None if there was a failure. - """ - max_layer_size = bitmath.parse_string_unsafe(app.config['MAXIMUM_LAYER_SIZE']) + input_fp = get_input_stream(request) - # Get the offset and length of the current chunk. - start_offset, length = _start_offset_and_length(range_header) - if blob_upload is None or None in {start_offset, length}: - logger.error('Invalid arguments provided to _upload_chunk') - return None + try: + # Upload the data received. + blob_uploader.upload_chunk(app.config, input_fp, start_offset, length, metric_queue) - if start_offset > 0 and start_offset > blob_upload.byte_count: - logger.error('start_offset provided to _upload_chunk greater than blob.upload.byte_count') - return None - - # Check if we should raise 413 before accepting the data. - uploaded = bitmath.Byte(length + start_offset) - if length > -1 and uploaded > max_layer_size: - raise LayerTooLarge(uploaded=uploaded.bytes, max_allowed=max_layer_size.bytes) - - location_set = {blob_upload.location_name} - - upload_error = None - with database.CloseForLongOperation(app.config): - input_fp = get_input_stream(request) - - if start_offset > 0 and start_offset < blob_upload.byte_count: - # Skip the bytes which were received on a previous push, which are already stored and - # included in the sha calculation - overlap_size = blob_upload.byte_count - start_offset - input_fp = StreamSlice(input_fp, overlap_size) - - # Update our upload bounds to reflect the skipped portion of the overlap - start_offset = blob_upload.byte_count - length = max(length - overlap_size, 0) - - # We use this to escape early in case we have already processed all of the bytes the user - # wants to upload - if length == 0: - return blob_upload - - input_fp = wrap_with_handler(input_fp, blob_upload.sha_state.update) - - # Add a hasher for calculating SHA1s for torrents if this is the first chunk and/or we have - # already calculated hash data for the previous chunk(s). - piece_hasher = None - if blob_upload.chunk_count == 0 or blob_upload.piece_sha_state: - initial_sha1_value = blob_upload.piece_sha_state or resumablehashlib.sha1() - initial_sha1_pieces_value = blob_upload.piece_hashes or '' - - piece_hasher = PieceHasher(app.config['BITTORRENT_PIECE_SIZE'], start_offset, - initial_sha1_pieces_value, initial_sha1_value) - - input_fp = wrap_with_handler(input_fp, piece_hasher.update) - - # If this is the first chunk and we're starting at the 0 offset, add a handler to gunzip the - # stream so we can determine the uncompressed size. We'll throw out this data if another chunk - # comes in, but in the common case the docker client only sends one chunk. - size_info = None - if start_offset == 0 and blob_upload.chunk_count == 0: - size_info, fn = calculate_size_handler() - input_fp = wrap_with_handler(input_fp, fn) - - start_time = time.time() - length_written, new_metadata, upload_error = storage.stream_upload_chunk( - location_set, - blob_upload.uuid, - start_offset, - length, - input_fp, - blob_upload.storage_metadata, - content_type=BLOB_CONTENT_TYPE,) - - if upload_error is not None: - logger.error('storage.stream_upload_chunk returned error %s', upload_error) - return None - - # Update the chunk upload time metric. - metric_queue.chunk_upload_time.Observe(time.time() - start_time, labelvalues=[ - length_written, list(location_set)[0]]) - - # If we determined an uncompressed size and this is the first chunk, add it to the blob. - # Otherwise, we clear the size from the blob as it was uploaded in multiple chunks. - if size_info is not None and blob_upload.chunk_count == 0 and size_info.is_valid: - blob_upload.uncompressed_byte_count = size_info.uncompressed_size - elif length_written > 0: - # Otherwise, if we wrote some bytes and the above conditions were not met, then we don't - # know the uncompressed size. - blob_upload.uncompressed_byte_count = None - - if piece_hasher is not None: - blob_upload.piece_hashes = piece_hasher.piece_hashes - blob_upload.piece_sha_state = piece_hasher.hash_fragment - - blob_upload.storage_metadata = new_metadata - blob_upload.byte_count += length_written - blob_upload.chunk_count += 1 - - # Ensure we have not gone beyond the max layer size. - upload_size = bitmath.Byte(blob_upload.byte_count) - if upload_size > max_layer_size: - raise LayerTooLarge(uploaded=upload_size.bytes, max_allowed=max_layer_size.bytes) - - return blob_upload - - -def _validate_digest(blob_upload, expected_digest): - """ - Verifies that the digest's SHA matches that of the uploaded data. - """ - computed_digest = digest_tools.sha256_digest_from_hashlib(blob_upload.sha_state) - if not digest_tools.digests_equal(computed_digest, expected_digest): - logger.error('Digest mismatch for upload %s: Expected digest %s, found digest %s', - blob_upload.uuid, expected_digest, computed_digest) - raise BlobUploadInvalid(detail={'reason': 'Digest mismatch on uploaded blob'}) - - -def _finalize_blob_storage(blob_upload, expected_digest): - """ - When an upload is successful, this ends the uploading process from the - storage's perspective. - - Returns True if the blob already existed. - """ - final_blob_location = digest_tools.content_path(expected_digest) - - # Move the storage into place, or if this was a re-upload, cancel it - with database.CloseForLongOperation(app.config): - already_existed = storage.exists({blob_upload.location_name}, final_blob_location) - if already_existed: - # It already existed, clean up our upload which served as proof that the - # uploader had the blob. - storage.cancel_chunked_upload({blob_upload.location_name}, blob_upload.uuid, - blob_upload.storage_metadata) - - else: - # We were the first ones to upload this image (at least to this location) - # Let's copy it into place - storage.complete_chunked_upload({blob_upload.location_name}, blob_upload.uuid, - final_blob_location, blob_upload.storage_metadata) - return already_existed - - -def _finalize_blob_database(namespace_name, repo_name, blob_upload, digest, already_existed): - """ - When an upload is successful, this ends the uploading process from the - database's perspective. - """ - # Create the blob and temporarily tag it. - blob_storage = model.create_blob_and_temp_tag( - namespace_name, - repo_name, - digest, - blob_upload, - app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'],) - - # If it doesn't already exist, create the BitTorrent pieces for the blob. - if blob_upload.piece_sha_state is not None and not already_existed: - piece_bytes = blob_upload.piece_hashes + blob_upload.piece_sha_state.digest() - model.save_bittorrent_pieces(blob_storage, app.config['BITTORRENT_PIECE_SIZE'], piece_bytes) - - # Delete the blob upload. - model.delete_blob_upload(namespace_name, repo_name, blob_upload.uuid) - - -def _finish_upload(namespace_name, repo_name, blob_upload, digest): - """ - When an upload is successful, this ends the uploading process. - """ - _validate_digest(blob_upload, digest) - _finalize_blob_database( - namespace_name, - repo_name, - blob_upload, - digest, - _finalize_blob_storage(blob_upload, digest),) + if commit_digest is not None: + # Commit the upload to a blob. + return blob_uploader.commit_to_blob(app.config, commit_digest) + except BlobTooLargeException as ble: + raise LayerTooLarge(uploaded=ble.uploaded, max_allowed=ble.max_allowed) + except BlobUploadException: + logger.exception('Exception when uploading blob to %s', blob_uploader.blob_upload_id) + _abort_range_not_satisfiable(blob_uploader.blob_upload.byte_count, + blob_uploader.blob_upload_id) From a172de4fdc9a69c0d659d70f1599d8763a7a9242 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 4 Oct 2018 16:09:56 -0400 Subject: [PATCH 06/13] Reimplement cache support for blobs in the registry data model --- data/cache/cache_key.py | 2 +- data/model/storage.py | 2 +- data/registry_model/datatype.py | 22 ++++++++ data/registry_model/registry_pre_oci_model.py | 32 ++++++++++++ .../registry_model/test/test_pre_oci_model.py | 51 ++++++++++++++++++- endpoints/v2/blob.py | 14 ++--- endpoints/v2/test/test_blob.py | 2 +- 7 files changed, 109 insertions(+), 16 deletions(-) diff --git a/data/cache/cache_key.py b/data/cache/cache_key.py index f30d6d345..94d6f62a4 100644 --- a/data/cache/cache_key.py +++ b/data/cache/cache_key.py @@ -7,7 +7,7 @@ class CacheKey(namedtuple('CacheKey', ['key', 'expiration'])): def for_repository_blob(namespace_name, repo_name, digest): """ Returns a cache key for a blob in a repository. """ - return CacheKey('repository_blob__%s_%s_%s' % (namespace_name, repo_name, digest), '60s') + return CacheKey('repo_blob__%s_%s_%s' % (namespace_name, repo_name, digest), '60s') def for_catalog_page(auth_context_key, start_id, limit): diff --git a/data/model/storage.py b/data/model/storage.py index cb77ece5f..85a118e88 100644 --- a/data/model/storage.py +++ b/data/model/storage.py @@ -282,7 +282,7 @@ def lookup_repo_storages_by_content_checksum(repo, checksums): candidate_subq = (ImageStorage .select(ImageStorage.id, ImageStorage.content_checksum, ImageStorage.image_size, ImageStorage.uuid, ImageStorage.cas_path, - ImageStorage.uncompressed_size) + ImageStorage.uncompressed_size, ImageStorage.uploading) .join(Image) .where(Image.repository == repo, ImageStorage.content_checksum == checksum) .limit(1) diff --git a/data/registry_model/datatype.py b/data/registry_model/datatype.py index 8264f277e..1b2768e83 100644 --- a/data/registry_model/datatype.py +++ b/data/registry_model/datatype.py @@ -2,6 +2,11 @@ from functools import wraps, total_ordering +class FromDictionaryException(Exception): + """ Exception raised if constructing a data type from a dictionary fails due to + a version mismatch or missing data. + """ + def datatype(name, static_fields): """ Defines a base class for a datatype that will represent a row from the database, in an abstracted form. @@ -33,6 +38,23 @@ def datatype(name, static_fields): def __repr__(self): return '<%s> #%s' % (name, self._db_id) + @classmethod + def from_dict(cls, dict_data): + if dict_data.get('version') != 1: + raise FromDictionaryException() + + try: + return cls(**dict_data) + except: + raise FromDictionaryException() + + def asdict(self): + dictionary_rep = dict(self._fields) + dictionary_rep['db_id'] = self._db_id + dictionary_rep['inputs'] = self._inputs + dictionary_rep['version'] = 1 + return dictionary_rep + return DataType diff --git a/data/registry_model/registry_pre_oci_model.py b/data/registry_model/registry_pre_oci_model.py index 4031d5eef..4c6f9510b 100644 --- a/data/registry_model/registry_pre_oci_model.py +++ b/data/registry_model/registry_pre_oci_model.py @@ -8,8 +8,10 @@ from peewee import IntegrityError from data import database from data import model +from data.cache import cache_key from data.database import db_transaction from data.registry_model.interface import RegistryDataInterface +from data.registry_model.datatype import FromDictionaryException from data.registry_model.datatypes import (Tag, RepositoryReference, Manifest, LegacyImage, Label, SecurityScanStatus, ManifestLayer, Blob, DerivedImage, TorrentInfo, BlobUpload) @@ -685,6 +687,36 @@ class PreOCIModel(RegistryDataInterface): torrent_info = model.storage.save_torrent_info(image_storage, piece_length, pieces) return TorrentInfo.for_torrent_info(torrent_info) + def get_cached_repo_blob(self, model_cache, namespace_name, repo_name, blob_digest): + """ + Returns the blob in the repository with the given digest if any or None if none. + Caches the result in the caching system. + """ + def load_blob(): + repository_ref = self.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + return None + + blob_found = self.get_repo_blob_by_digest(repository_ref, blob_digest, + include_placements=True) + if blob_found is None: + return None + + return blob_found.asdict() + + blob_cache_key = cache_key.for_repository_blob(namespace_name, repo_name, blob_digest) + blob_dict = model_cache.retrieve(blob_cache_key, load_blob) + + try: + return Blob.from_dict(blob_dict) if blob_dict is not None else None + except FromDictionaryException: + # The data was stale in some way. Simply reload. + repository_ref = self.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + return None + + return self.get_repo_blob_by_digest(repository_ref, blob_digest, include_placements=True) + def get_repo_blob_by_digest(self, repository_ref, blob_digest, include_placements=False): """ Returns the blob in the repository with the given digest, if any or None if none. Note that diff --git a/data/registry_model/test/test_pre_oci_model.py b/data/registry_model/test/test_pre_oci_model.py index 06a43aaba..bfdfc2b29 100644 --- a/data/registry_model/test/test_pre_oci_model.py +++ b/data/registry_model/test/test_pre_oci_model.py @@ -5,14 +5,15 @@ from datetime import datetime, timedelta import pytest +from mock import patch from playhouse.test_utils import assert_query_count -from app import docker_v2_signing_key from data import model from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest, ManifestBlob, ManifestLegacyImage, ManifestLabel, TagManifest, RepositoryTag, Image, TagManifestLabel, TagManifest, TagManifestLabel, DerivedStorageForImage, - TorrentInfo) + TorrentInfo, close_db_filter) +from data.cache.impl import InMemoryDataModelCache from data.registry_model.registry_pre_oci_model import PreOCIModel from data.registry_model.datatypes import RepositoryReference @@ -638,3 +639,49 @@ def test_mount_blob_into_repository(pre_oci_model): # Ensure it now exists. found = pre_oci_model.get_repo_blob_by_digest(target_repository_ref, layer.blob.digest) assert found == layer.blob + + +class SomeException(Exception): + pass + + +def test_get_cached_repo_blob(pre_oci_model): + model_cache = InMemoryDataModelCache() + + repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') + latest_tag = pre_oci_model.get_repo_tag(repository_ref, 'latest') + manifest = pre_oci_model.get_manifest_for_tag(latest_tag) + + layers = pre_oci_model.list_manifest_layers(manifest, include_placements=True) + assert layers + + blob = layers[0].blob + + # Load a blob to add it to the cache. + found = pre_oci_model.get_cached_repo_blob(model_cache, 'devtable', 'simple', blob.digest) + assert found.digest == blob.digest + assert found.uuid == blob.uuid + assert found.compressed_size == blob.compressed_size + assert found.uncompressed_size == blob.uncompressed_size + assert found.uploading == blob.uploading + assert found.placements == blob.placements + + # Disconnect from the database by overwriting the connection. + def fail(x, y): + raise SomeException('Not connected!') + + with patch('data.registry_model.registry_pre_oci_model.model.blob.get_repository_blob_by_digest', + fail): + # Make sure we can load again, which should hit the cache. + cached = pre_oci_model.get_cached_repo_blob(model_cache, 'devtable', 'simple', blob.digest) + assert cached.digest == blob.digest + assert cached.uuid == blob.uuid + assert cached.compressed_size == blob.compressed_size + assert cached.uncompressed_size == blob.uncompressed_size + assert cached.uploading == blob.uploading + assert cached.placements == blob.placements + + # Try another blob, which should fail since the DB is not connected and the cache + # does not contain the blob. + with pytest.raises(SomeException): + pre_oci_model.get_cached_repo_blob(model_cache, 'devtable', 'simple', 'some other digest') diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 49eddbcab..0aa34680a 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -3,7 +3,7 @@ import re from flask import url_for, request, redirect, Response, abort as flask_abort -from app import storage, app, get_app_url, metric_queue +from app import storage, app, get_app_url, metric_queue, model_cache from auth.registry_jwt_auth import process_registry_jwt_auth from auth.permissions import ReadRepositoryPermission from data import database @@ -39,12 +39,8 @@ class _InvalidRangeHeader(Exception): @anon_protect @cache_control(max_age=31436000) def check_blob_exists(namespace_name, repo_name, digest): - repository_ref = registry_model.lookup_repository(namespace_name, repo_name) - if repository_ref is None: - raise NameUnknown() - # Find the blob. - blob = registry_model.get_repo_blob_by_digest(repository_ref, digest, include_placements=True) + blob = registry_model.get_cached_repo_blob(model_cache, namespace_name, repo_name, digest) if blob is None: raise BlobUnknown() @@ -70,12 +66,8 @@ def check_blob_exists(namespace_name, repo_name, digest): @anon_protect @cache_control(max_age=31536000) def download_blob(namespace_name, repo_name, digest): - repository_ref = registry_model.lookup_repository(namespace_name, repo_name) - if repository_ref is None: - raise NameUnknown() - # Find the blob. - blob = registry_model.get_repo_blob_by_digest(repository_ref, digest, include_placements=True) + blob = registry_model.get_cached_repo_blob(model_cache, namespace_name, repo_name, digest) if blob is None: raise BlobUnknown() diff --git a/endpoints/v2/test/test_blob.py b/endpoints/v2/test/test_blob.py index c07c11922..7551bfb1c 100644 --- a/endpoints/v2/test/test_blob.py +++ b/endpoints/v2/test/test_blob.py @@ -50,7 +50,7 @@ def test_blob_caching(method, endpoint, client, app): with patch('endpoints.v2.blob.model_cache', InMemoryDataModelCache()): # First request should make a DB query to retrieve the blob. - with assert_query_count(1): + with assert_query_count(3): conduct_call(client, 'v2.' + endpoint, url_for, method, params, expected_code=200, headers=headers) From 6b5064aba4e79f5a88bab91ae17e62d0c4b3bf26 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 5 Oct 2018 17:30:47 -0400 Subject: [PATCH 07/13] Convert V2's manifest endpoints to use the new data model interface --- data/model/tag.py | 9 + data/registry_model/datatypes.py | 5 +- data/registry_model/interface.py | 6 + data/registry_model/registry_pre_oci_model.py | 46 +++- .../registry_model/test/test_pre_oci_model.py | 30 +++ endpoints/v2/labelhandlers.py | 36 --- endpoints/v2/manifest.py | 253 ++++++++---------- endpoints/v2/test/test_manifest.py | 2 +- test/registry/protocol_v2.py | 2 +- test/registry_tests.py | 8 +- 10 files changed, 197 insertions(+), 200 deletions(-) delete mode 100644 endpoints/v2/labelhandlers.py diff --git a/data/model/tag.py b/data/model/tag.py index f87c99c70..77ac3bee8 100644 --- a/data/model/tag.py +++ b/data/model/tag.py @@ -628,6 +628,15 @@ def get_active_tag_for_repo(repo, tag_name): except RepositoryTag.DoesNotExist: return None +def get_expired_tag_in_repo(repo, tag_name): + return (RepositoryTag + .select() + .where(RepositoryTag.name == tag_name, RepositoryTag.repository == repo) + .where(~(RepositoryTag.lifetime_end_ts >> None)) + .where(RepositoryTag.lifetime_end_ts <= get_epoch_timestamp()) + .get()) + + def get_possibly_expired_tag(namespace, repo_name, tag_name): return (RepositoryTag .select() diff --git a/data/registry_model/datatypes.py b/data/registry_model/datatypes.py index d09d92ade..f36d34d79 100644 --- a/data/registry_model/datatypes.py +++ b/data/registry_model/datatypes.py @@ -7,7 +7,7 @@ from cachetools import lru_cache from data import model from data.registry_model.datatype import datatype, requiresinput, optionalinput -from image.docker.schema1 import DockerSchema1Manifest +from image.docker.schema1 import DockerSchema1Manifest, DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE class RepositoryReference(datatype('Repository', [])): @@ -118,7 +118,7 @@ class Tag(datatype('Tag', ['name', 'reversion', 'manifest_digest', 'lifetime_sta return legacy_image -class Manifest(datatype('Manifest', ['digest', 'manifest_bytes'])): +class Manifest(datatype('Manifest', ['digest', 'media_type', 'manifest_bytes'])): """ Manifest represents a manifest in a repository. """ @classmethod def for_tag_manifest(cls, tag_manifest, legacy_image=None): @@ -127,6 +127,7 @@ class Manifest(datatype('Manifest', ['digest', 'manifest_bytes'])): return Manifest(db_id=tag_manifest.id, digest=tag_manifest.digest, manifest_bytes=tag_manifest.json_data, + media_type=DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE, # Always in legacy. inputs=dict(legacy_image=legacy_image)) @property diff --git a/data/registry_model/interface.py b/data/registry_model/interface.py index c5a1d126d..3376ef5f4 100644 --- a/data/registry_model/interface.py +++ b/data/registry_model/interface.py @@ -117,6 +117,12 @@ class RegistryDataInterface(object): or None if none. """ + @abstractmethod + def has_expired_tag(self, repository_ref, tag_name): + """ + Returns true if and only if the repository contains a tag with the given name that is expired. + """ + @abstractmethod def retarget_tag(self, repository_ref, tag_name, manifest_or_legacy_image, is_reversion=False): diff --git a/data/registry_model/registry_pre_oci_model.py b/data/registry_model/registry_pre_oci_model.py index 4c6f9510b..5297a92c4 100644 --- a/data/registry_model/registry_pre_oci_model.py +++ b/data/registry_model/registry_pre_oci_model.py @@ -106,7 +106,7 @@ class PreOCIModel(RegistryDataInterface): # Ensure all the blobs in the manifest exist. digests = manifest_interface_instance.checksums query = model.storage.lookup_repo_storages_by_content_checksum(repository_ref._db_id, digests) - blob_map = {s.content_checksum: s.id for s in query} + blob_map = {s.content_checksum: s for s in query} for layer in manifest_interface_instance.layers: digest_str = str(layer.digest) if digest_str not in blob_map: @@ -116,23 +116,38 @@ class PreOCIModel(RegistryDataInterface): # This will let us know which v1 images we need to synthesize and which ones are invalid. docker_image_ids = list(manifest_interface_instance.legacy_image_ids) images_query = model.image.lookup_repository_images(repository_ref._db_id, docker_image_ids) - images_map = {i.docker_image_id: i.storage for i in images_query} + image_storage_map = {i.docker_image_id: i.storage for i in images_query} # Rewrite any v1 image IDs that do not match the checksum in the database. try: - rewritten_images = list(manifest_interface_instance.rewrite_invalid_image_ids(images_map)) + rewritten_images = manifest_interface_instance.rewrite_invalid_image_ids(image_storage_map) + rewritten_images = list(rewritten_images) + parent_image_map = {} + for rewritten_image in rewritten_images: - if not rewritten_image.image_id in images_map: - model.image.synthesize_v1_image( + if not rewritten_image.image_id in image_storage_map: + parent_image = None + if rewritten_image.parent_image_id: + parent_image = parent_image_map.get(rewritten_image.parent_image_id) + if parent_image is None: + parent_image = model.image.get_image(repository_ref._db_id, + rewritten_image.parent_image_id) + if parent_image is None: + return None, None + + synthesized = model.image.synthesize_v1_image( repository_ref._db_id, - blob_map[rewritten_image.content_checksum], + blob_map[rewritten_image.content_checksum].id, + blob_map[rewritten_image.content_checksum].image_size, rewritten_image.image_id, rewritten_image.created, rewritten_image.comment, rewritten_image.command, rewritten_image.compat_json, - rewritten_image.parent_image_id, + parent_image, ) + + parent_image_map[rewritten_image.image_id] = synthesized except ManifestException: logger.exception("exception when rewriting v1 metadata") return None, None @@ -150,7 +165,7 @@ class PreOCIModel(RegistryDataInterface): # Save the labels on the manifest. if newly_created: with self.batch_create_manifest_labels(manifest) as add_label: - for key, value in manifest.layers[-1].v1_metadata.labels.iteritems(): + for key, value in manifest_interface_instance.layers[-1].v1_metadata.labels.iteritems(): media_type = 'application/json' if is_json(value) else 'text/plain' add_label(key, value, 'manifest', media_type) @@ -289,7 +304,8 @@ class PreOCIModel(RegistryDataInterface): else None)) for tag in tags] - def list_repository_tag_history(self, repository_ref, page=1, size=100, specific_tag_name=None, active_tags_only=False): + def list_repository_tag_history(self, repository_ref, page=1, size=100, specific_tag_name=None, + active_tags_only=False): """ Returns the history of all tags in the repository (unless filtered). This includes tags that have been made in-active due to newer versions of those tags coming into service. @@ -302,6 +318,16 @@ class PreOCIModel(RegistryDataInterface): legacy_image=LegacyImage.for_image(tag.image)) for tag in tags], has_more + def has_expired_tag(self, repository_ref, tag_name): + """ + Returns true if and only if the repository contains a tag with the given name that is expired. + """ + try: + model.tag.get_expired_tag_in_repo(repository_ref._db_id, tag_name) + return True + except database.RepositoryTag.DoesNotExist: + return False + def get_repo_tag(self, repository_ref, tag_name, include_legacy_image=False): """ Returns the latest, *active* tag found in the repository, with the matching name @@ -706,7 +732,7 @@ class PreOCIModel(RegistryDataInterface): blob_cache_key = cache_key.for_repository_blob(namespace_name, repo_name, blob_digest) blob_dict = model_cache.retrieve(blob_cache_key, load_blob) - + try: return Blob.from_dict(blob_dict) if blob_dict is not None else None except FromDictionaryException: diff --git a/data/registry_model/test/test_pre_oci_model.py b/data/registry_model/test/test_pre_oci_model.py index bfdfc2b29..7fa41b63f 100644 --- a/data/registry_model/test/test_pre_oci_model.py +++ b/data/registry_model/test/test_pre_oci_model.py @@ -8,6 +8,7 @@ import pytest from mock import patch from playhouse.test_utils import assert_query_count +from app import docker_v2_signing_key from data import model from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest, ManifestBlob, ManifestLegacyImage, ManifestLabel, TagManifest, RepositoryTag, Image, @@ -16,6 +17,7 @@ from data.database import (TagManifestLabelMap, TagManifestToManifest, Manifest, from data.cache.impl import InMemoryDataModelCache from data.registry_model.registry_pre_oci_model import PreOCIModel from data.registry_model.datatypes import RepositoryReference +from image.docker.schema1 import DockerSchema1ManifestBuilder from test.fixtures import * @@ -235,6 +237,10 @@ def test_repository_tag_history(pre_oci_model): assert not has_more assert len(history) == 2 + # Ensure the latest tag is marked expired, since there is an expired one. + with assert_query_count(1): + assert pre_oci_model.has_expired_tag(repository_ref, 'latest') + @pytest.mark.parametrize('repo_namespace, repo_name', [ ('devtable', 'simple'), @@ -685,3 +691,27 @@ def test_get_cached_repo_blob(pre_oci_model): # does not contain the blob. with pytest.raises(SomeException): pre_oci_model.get_cached_repo_blob(model_cache, 'devtable', 'simple', 'some other digest') + + +def test_create_manifest_and_retarget_tag(pre_oci_model): + repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') + latest_tag = pre_oci_model.get_repo_tag(repository_ref, 'latest', include_legacy_image=True) + manifest = pre_oci_model.get_manifest_for_tag(latest_tag).get_parsed_manifest() + + builder = DockerSchema1ManifestBuilder('devtable', 'simple', 'anothertag') + builder.add_layer(manifest.blob_digests[0], + '{"id": "%s"}' % latest_tag.legacy_image.docker_image_id) + sample_manifest = builder.build(docker_v2_signing_key) + assert sample_manifest is not None + + another_manifest, tag = pre_oci_model.create_manifest_and_retarget_tag(repository_ref, + sample_manifest, + 'anothertag') + assert another_manifest is not None + assert tag is not None + + assert tag.name == 'anothertag' + assert another_manifest.get_parsed_manifest().manifest_dict == sample_manifest.manifest_dict + + layers = pre_oci_model.list_manifest_layers(another_manifest) + assert len(layers) == 1 diff --git a/endpoints/v2/labelhandlers.py b/endpoints/v2/labelhandlers.py deleted file mode 100644 index 67596f404..000000000 --- a/endpoints/v2/labelhandlers.py +++ /dev/null @@ -1,36 +0,0 @@ -import logging - -from app import app -from endpoints.v2.models_pre_oci import data_model as model -from util.timedeltastring import convert_to_timedelta - -logger = logging.getLogger(__name__) - -min_expire_sec = convert_to_timedelta(app.config.get('LABELED_EXPIRATION_MINIMUM', '1h')) -max_expire_sec = convert_to_timedelta(app.config.get('LABELED_EXPIRATION_MAXIMUM', '104w')) - -def _expires_after(value, namespace_name, repo_name, digest): - """ Sets the expiration of a manifest based on the quay.expires-in label. """ - try: - timedelta = convert_to_timedelta(value) - except ValueError: - logger.exception('Could not convert %s to timedeltastring for %s/%s@%s', value, namespace_name, - repo_name, digest) - return - - total_seconds = min(max(timedelta.total_seconds(), min_expire_sec.total_seconds()), - max_expire_sec.total_seconds()) - - logger.debug('Labeling manifest %s/%s@%s with expiration of %s', namespace_name, repo_name, - digest, total_seconds) - model.set_manifest_expires_after(namespace_name, repo_name, digest, total_seconds) - - -_LABEL_HANDLES = { - 'quay.expires-after': _expires_after, -} - -def handle_label(key, value, namespace_name, repo_name, digest): - handler = _LABEL_HANDLES.get(key) - if handler is not None: - handler(value, namespace_name, repo_name, digest) diff --git a/endpoints/v2/manifest.py b/endpoints/v2/manifest.py index 93d343db2..888a68b9a 100644 --- a/endpoints/v2/manifest.py +++ b/endpoints/v2/manifest.py @@ -6,26 +6,22 @@ from flask import request, url_for, Response import features -from app import docker_v2_signing_key, app, metric_queue +from app import app, metric_queue from auth.registry_jwt_auth import process_registry_jwt_auth from digest import digest_tools from data.registry_model import registry_model from endpoints.decorators import anon_protect, parse_repository_name from endpoints.v2 import v2_bp, require_repo_read, require_repo_write -from endpoints.v2.models_interface import Label -from endpoints.v2.models_pre_oci import data_model as model -from endpoints.v2.errors import (BlobUnknown, ManifestInvalid, ManifestUnknown, TagInvalid, - NameInvalid, TagExpired) -from endpoints.v2.labelhandlers import handle_label +from endpoints.v2.errors import (ManifestInvalid, ManifestUnknown, TagInvalid, + NameInvalid, TagExpired, NameUnknown) from image.docker import ManifestException -from image.docker.schema1 import (DOCKER_SCHEMA1_MANIFEST_CONTENT_TYPE, - DockerSchema1Manifest, DockerSchema1ManifestBuilder) +from image.docker.schema1 import DOCKER_SCHEMA1_MANIFEST_CONTENT_TYPE, DockerSchema1Manifest from image.docker.schema2 import DOCKER_SCHEMA2_CONTENT_TYPES, OCI_CONTENT_TYPES from notifications import spawn_notification from util.audit import track_and_log from util.names import VALID_TAG_PATTERN from util.registry.replication import queue_replication_batch -from util.validation import is_json + logger = logging.getLogger(__name__) @@ -40,45 +36,37 @@ MANIFEST_TAGNAME_ROUTE = BASE_MANIFEST_ROUTE.format(VALID_TAG_PATTERN) @require_repo_read @anon_protect def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref): - manifest = model.get_manifest_by_tag(namespace_name, repo_name, manifest_ref) + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + raise NameUnknown() + + tag = registry_model.get_repo_tag(repository_ref, manifest_ref) + if tag is None: + if registry_model.has_expired_tag(repository_ref, manifest_ref): + logger.debug('Found expired tag %s for repository %s/%s', manifest_ref, namespace_name, + repo_name) + msg = 'Tag %s was deleted or has expired. To pull, revive via time machine' % manifest_ref + raise TagExpired(msg) + + raise ManifestUnknown() + + manifest = registry_model.get_manifest_for_tag(tag, backfill_if_necessary=True) if manifest is None: - has_tag = model.has_active_tag(namespace_name, repo_name, manifest_ref) - if not has_tag: - has_expired_tag = model.has_tag(namespace_name, repo_name, manifest_ref) - if has_expired_tag: - logger.debug('Found expired tag %s for repository %s/%s', manifest_ref, namespace_name, - repo_name) - msg = 'Tag %s was deleted or has expired. To pull, revive via time machine' % manifest_ref - raise TagExpired(msg) - else: - raise ManifestUnknown() + # Something went wrong. + raise ManifestInvalid() - repo_ref = registry_model.lookup_repository(namespace_name, repo_name) - if repo_ref is None: - raise ManifestUnknown() - - tag = registry_model.get_repo_tag(repo_ref, manifest_ref, include_legacy_image=True) - if tag is None: - raise ManifestUnknown() - - if not registry_model.backfill_manifest_for_tag(tag): - raise ManifestUnknown() - - manifest = model.get_manifest_by_tag(namespace_name, repo_name, manifest_ref) - if manifest is None: - raise ManifestUnknown() - - repo = model.get_repository(namespace_name, repo_name) - if repo is not None: - track_and_log('pull_repo', repo, analytics_name='pull_repo_100x', analytics_sample=0.01, - tag=manifest_ref) - metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2', True]) + track_and_log('pull_repo', repository_ref, analytics_name='pull_repo_100x', analytics_sample=0.01, + tag=manifest_ref) + metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2', True]) return Response( - manifest.json, + manifest.manifest_bytes, status=200, - headers={'Content-Type': manifest.media_type, - 'Docker-Content-Digest': manifest.digest},) + headers={ + 'Content-Type': manifest.media_type, + 'Docker-Content-Digest': manifest.digest, + }, + ) @v2_bp.route(MANIFEST_DIGEST_ROUTE, methods=['GET']) @@ -87,19 +75,21 @@ def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref): @require_repo_read @anon_protect def fetch_manifest_by_digest(namespace_name, repo_name, manifest_ref): - manifest = model.get_manifest_by_digest(namespace_name, repo_name, manifest_ref) + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + raise NameUnknown() + + manifest = registry_model.lookup_manifest_by_digest(repository_ref, manifest_ref) if manifest is None: - # Without a tag name to reference, we can't make an attempt to generate the manifest raise ManifestUnknown() - repo = model.get_repository(namespace_name, repo_name) - if repo is not None: - track_and_log('pull_repo', repo, manifest_digest=manifest_ref) - metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2', True]) + track_and_log('pull_repo', repository_ref, manifest_digest=manifest_ref) + metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'v2', True]) - return Response(manifest.json, status=200, headers={ + return Response(manifest.manifest_bytes, status=200, headers={ 'Content-Type': manifest.media_type, - 'Docker-Content-Digest': manifest.digest}) + 'Docker-Content-Digest': manifest.digest, + }) def _reject_manifest2_schema2(func): @@ -158,99 +148,6 @@ def write_manifest_by_digest(namespace_name, repo_name, manifest_ref): return _write_manifest_and_log(namespace_name, repo_name, manifest) -def _write_manifest(namespace_name, repo_name, manifest): - if (manifest.namespace == '' and features.LIBRARY_SUPPORT and - namespace_name == app.config['LIBRARY_NAMESPACE']): - pass - elif manifest.namespace != namespace_name: - raise NameInvalid() - - if manifest.repo_name != repo_name: - raise NameInvalid() - - # Ensure that the repository exists. - repo = model.get_repository(namespace_name, repo_name) - if repo is None: - raise NameInvalid() - - if not manifest.layers: - raise ManifestInvalid(detail={'message': 'manifest does not reference any layers'}) - - # Ensure all the blobs in the manifest exist. - blob_map = model.lookup_blobs_by_digest(repo, manifest.checksums) - for layer in manifest.layers: - digest_str = str(layer.digest) - if digest_str not in blob_map: - raise BlobUnknown(detail={'digest': digest_str}) - - # Lookup all the images and their parent images (if any) inside the manifest. - # This will let us know which v1 images we need to synthesize and which ones are invalid. - all_image_ids = list(manifest.parent_image_ids | manifest.image_ids) - images_map = model.get_docker_v1_metadata_by_image_id(repo, all_image_ids) - - # Rewrite any v1 image IDs that do not match the checksum in the database. - try: - # TODO: make this batch and read the parent image from the previous iteration, rather than - # reloading it. - rewritten_images = list(manifest.rewrite_invalid_image_ids(images_map)) - for rewritten_image in rewritten_images: - if not rewritten_image.image_id in images_map: - model.synthesize_v1_image( - repo, - blob_map[rewritten_image.content_checksum], - rewritten_image.image_id, - rewritten_image.created, - rewritten_image.comment, - rewritten_image.command, - rewritten_image.compat_json, - rewritten_image.parent_image_id, - ) - except ManifestException as me: - logger.exception("exception when rewriting v1 metadata") - raise ManifestInvalid(detail={'message': 'failed synthesizing v1 metadata: %s' % me.message}) - - # Store the manifest pointing to the tag. - leaf_layer_id = rewritten_images[-1].image_id - newly_created = model.save_manifest(repo, manifest.tag, manifest, leaf_layer_id, blob_map) - if newly_created: - # TODO: make this batch - labels = [] - for key, value in manifest.layers[-1].v1_metadata.labels.iteritems(): - media_type = 'application/json' if is_json(value) else 'text/plain' - labels.append(Label(key=key, value=value, source_type='manifest', media_type=media_type)) - handle_label(key, value, namespace_name, repo_name, manifest.digest) - - model.create_manifest_labels(namespace_name, repo_name, manifest.digest, labels) - - return repo, blob_map - - -def _write_manifest_and_log(namespace_name, repo_name, manifest): - repo, blob_map = _write_manifest(namespace_name, repo_name, manifest) - - # Queue all blob manifests for replication. - if features.STORAGE_REPLICATION: - with queue_replication_batch(namespace_name) as queue_storage_replication: - for layer in manifest.layers: - digest_str = str(layer.digest) - queue_storage_replication(blob_map[digest_str]) - - track_and_log('push_repo', repo, tag=manifest.tag) - spawn_notification(repo, 'repo_push', {'updated_tags': [manifest.tag]}) - metric_queue.repository_push.Inc(labelvalues=[namespace_name, repo_name, 'v2', True]) - - return Response( - 'OK', - status=202, - headers={ - 'Docker-Content-Digest': manifest.digest, - 'Location': - url_for('v2.fetch_manifest_by_digest', repository='%s/%s' % (namespace_name, repo_name), - manifest_ref=manifest.digest), - }, - ) - - @v2_bp.route(MANIFEST_DIGEST_ROUTE, methods=['DELETE']) @parse_repository_name() @process_registry_jwt_auth(scopes=['pull', 'push']) @@ -263,11 +160,75 @@ def delete_manifest_by_digest(namespace_name, repo_name, manifest_ref): Note: there is no equivalent method for deleting by tag name because it is forbidden by the spec. """ - tags = model.delete_manifest_by_digest(namespace_name, repo_name, manifest_ref) + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + raise NameUnknown() + + manifest = registry_model.lookup_manifest_by_digest(repository_ref, manifest_ref) + if manifest is None: + raise ManifestUnknown() + + tags = registry_model.delete_tags_for_manifest(manifest) if not tags: raise ManifestUnknown() for tag in tags: - track_and_log('delete_tag', tag.repository, tag=tag.name, digest=manifest_ref) + track_and_log('delete_tag', repository_ref, tag=tag.name, digest=manifest_ref) return Response(status=202) + + +def _write_manifest_and_log(namespace_name, repo_name, manifest_impl): + repository_ref, manifest, tag = _write_manifest(namespace_name, repo_name, manifest_impl) + + # Queue all blob manifests for replication. + if features.STORAGE_REPLICATION: + layers = registry_model.list_manifest_layers(manifest) + if layers is None: + raise ManifestInvalid() + + with queue_replication_batch(namespace_name) as queue_storage_replication: + for layer in layers: + queue_storage_replication(layer.blob) + + track_and_log('push_repo', repository_ref, tag=manifest_impl.tag) + spawn_notification(repository_ref, 'repo_push', {'updated_tags': [manifest_impl.tag]}) + metric_queue.repository_push.Inc(labelvalues=[namespace_name, repo_name, 'v2', True]) + + return Response( + 'OK', + status=202, + headers={ + 'Docker-Content-Digest': manifest.digest, + 'Location': + url_for('v2.fetch_manifest_by_digest', + repository='%s/%s' % (namespace_name, repo_name), + manifest_ref=manifest.digest), + }, + ) + + +def _write_manifest(namespace_name, repo_name, manifest_impl): + if (manifest_impl.namespace == '' and features.LIBRARY_SUPPORT and + namespace_name == app.config['LIBRARY_NAMESPACE']): + pass + elif manifest_impl.namespace != namespace_name: + raise NameInvalid() + + if manifest_impl.repo_name != repo_name: + raise NameInvalid() + + if not manifest_impl.layers: + raise ManifestInvalid(detail={'message': 'manifest does not reference any layers'}) + + # Ensure that the repository exists. + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + raise NameUnknown() + + manifest, tag = registry_model.create_manifest_and_retarget_tag(repository_ref, manifest_impl, + manifest_impl.tag) + if manifest is None: + raise ManifestInvalid() + + return repository_ref, manifest, tag diff --git a/endpoints/v2/test/test_manifest.py b/endpoints/v2/test/test_manifest.py index d25a30e3d..673163bb9 100644 --- a/endpoints/v2/test/test_manifest.py +++ b/endpoints/v2/test/test_manifest.py @@ -52,4 +52,4 @@ def test_e2e_query_count_manifest_norewrite(client, app): conduct_call(client, 'v2.write_manifest_by_digest', url_for, 'PUT', params, expected_code=202, headers=headers, raw_body=tag_manifest.json_data) - assert counter.count <= 15 + assert counter.count <= 16 diff --git a/test/registry/protocol_v2.py b/test/registry/protocol_v2.py index 450ec291a..ed5b40b4b 100644 --- a/test/registry/protocol_v2.py +++ b/test/registry/protocol_v2.py @@ -51,7 +51,7 @@ class V2Protocol(RegistryProtocol): Failures.MISSING_TAG: 404, Failures.INVALID_TAG: 404, Failures.INVALID_IMAGES: 400, - Failures.INVALID_BLOB: 404, + Failures.INVALID_BLOB: 400, Failures.UNSUPPORTED_CONTENT_TYPE: 415, }, } diff --git a/test/registry_tests.py b/test/registry_tests.py index ece37e75c..b14b4b8e4 100644 --- a/test/registry_tests.py +++ b/test/registry_tests.py @@ -678,9 +678,9 @@ class V2RegistryPushMixin(V2RegistryMixin): for tag_name in tag_names: manifest = manifests[tag_name] - # Write the manifest. If we expect it to be invalid, we expect a 404 code. Otherwise, we expect + # Write the manifest. If we expect it to be invalid, we expect a 400 code. Otherwise, we expect # a 202 response for success. - put_code = 404 if invalid else 202 + put_code = 400 if invalid else 202 self.conduct('PUT', '/v2/%s/manifests/%s' % (repo_name, tag_name), data=manifest.bytes, expected_code=put_code, headers={'Content-Type': 'application/json'}, auth='jwt') @@ -1682,9 +1682,9 @@ class V2RegistryTests(V2RegistryPullMixin, V2RegistryPushMixin, RegistryTestsMix manifest = builder.build(_JWK) response = self.conduct('PUT', '/v2/%s/manifests/%s' % (repo_name, tag_name), - data=manifest.bytes, expected_code=404, + data=manifest.bytes, expected_code=400, headers={'Content-Type': 'application/json'}, auth='jwt') - self.assertEquals('BLOB_UNKNOWN', response.json()['errors'][0]['code']) + self.assertEquals('MANIFEST_INVALID', response.json()['errors'][0]['code']) def test_delete_manifest(self): From e91ba98e1b36039d0d504701b22fcc1ad27fa718 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 8 Oct 2018 13:13:31 +0100 Subject: [PATCH 08/13] Convert V2's tag endpoints to use the new data model interface --- data/model/tag.py | 8 +++++++- data/registry_model/datatypes.py | 5 +++++ data/registry_model/interface.py | 4 +++- data/registry_model/registry_pre_oci_model.py | 6 ++++-- endpoints/v2/tag.py | 15 ++++++++++++--- 5 files changed, 31 insertions(+), 7 deletions(-) diff --git a/data/model/tag.py b/data/model/tag.py index 77ac3bee8..705ebe6dc 100644 --- a/data/model/tag.py +++ b/data/model/tag.py @@ -204,7 +204,7 @@ def get_tag_manifest_digests(tags): return {manifest.tag_id: manifest.digest for manifest in manifests} -def list_active_repo_tags(repo): +def list_active_repo_tags(repo, start_id=None, limit=None): """ Returns all of the active, non-hidden tags in a repository, joined to they images and (if present), their manifest. """ @@ -216,6 +216,12 @@ def list_active_repo_tags(repo): .switch(RepositoryTag) .join(TagManifest, JOIN.LEFT_OUTER)) + if start_id is not None: + query = query.where(RepositoryTag.id >= start_id) + + if limit is not None: + query = query.limit(limit) + return query diff --git a/data/registry_model/datatypes.py b/data/registry_model/datatypes.py index f36d34d79..78f038479 100644 --- a/data/registry_model/datatypes.py +++ b/data/registry_model/datatypes.py @@ -117,6 +117,11 @@ class Tag(datatype('Tag', ['name', 'reversion', 'manifest_digest', 'lifetime_sta """ return legacy_image + @property + def id(self): + """ The ID of this tag for pagination purposes only. """ + return self._db_id + class Manifest(datatype('Manifest', ['digest', 'media_type', 'manifest_bytes'])): """ Manifest represents a manifest in a repository. """ diff --git a/data/registry_model/interface.py b/data/registry_model/interface.py index 3376ef5f4..f7a1dadc5 100644 --- a/data/registry_model/interface.py +++ b/data/registry_model/interface.py @@ -95,7 +95,9 @@ class RegistryDataInterface(object): """ @abstractmethod - def list_repository_tags(self, repository_ref, include_legacy_images=False): + def list_repository_tags(self, repository_ref, include_legacy_images=False, + start_pagination_id=None, + limit=None): """ Returns a list of all the active tags in the repository. Note that this can be a *heavy* operation on repositories with a lot of tags, and should be avoided for more targetted diff --git a/data/registry_model/registry_pre_oci_model.py b/data/registry_model/registry_pre_oci_model.py index 5297a92c4..e119852b6 100644 --- a/data/registry_model/registry_pre_oci_model.py +++ b/data/registry_model/registry_pre_oci_model.py @@ -287,7 +287,9 @@ class PreOCIModel(RegistryDataInterface): """ return Label.for_label(model.label.delete_manifest_label(label_uuid, manifest._db_id)) - def list_repository_tags(self, repository_ref, include_legacy_images=False): + def list_repository_tags(self, repository_ref, include_legacy_images=False, + start_pagination_id=None, + limit=None): """ Returns a list of all the active tags in the repository. Note that this can be a *heavy* operation on repositories with a lot of tags, and should be avoided for more targetted @@ -296,7 +298,7 @@ class PreOCIModel(RegistryDataInterface): # NOTE: include_legacy_images isn't used here because `list_active_repo_tags` includes the # information already, so we might as well just use it. However, the new model classes will # *not* include it by default, so we make it a parameter now. - tags = model.tag.list_active_repo_tags(repository_ref._db_id) + tags = model.tag.list_active_repo_tags(repository_ref._db_id, start_pagination_id, limit) return [Tag.for_repository_tag(tag, legacy_image=LegacyImage.for_image(tag.image), manifest_digest=(tag.tagmanifest.digest diff --git a/endpoints/v2/tag.py b/endpoints/v2/tag.py index cc2867aa7..fec91f11e 100644 --- a/endpoints/v2/tag.py +++ b/endpoints/v2/tag.py @@ -1,9 +1,10 @@ from flask import jsonify from auth.registry_jwt_auth import process_registry_jwt_auth +from data.registry_model import registry_model from endpoints.decorators import anon_protect, parse_repository_name from endpoints.v2 import v2_bp, require_repo_read, paginate -from endpoints.v2.models_pre_oci import data_model as model +from endpoints.v2.errors import NameUnknown @v2_bp.route('//tags/list', methods=['GET']) @@ -13,10 +14,18 @@ from endpoints.v2.models_pre_oci import data_model as model @anon_protect @paginate() def list_all_tags(namespace_name, repo_name, start_id, limit, pagination_callback): - tags = list(model.repository_tags(namespace_name, repo_name, start_id, limit)) + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + raise NameUnknown() + + # NOTE: We add 1 to the limit because that's how pagination_callback knows if there are + # additional tags. + tags = registry_model.list_repository_tags(repository_ref, start_pagination_id=start_id, + limit=limit + 1) response = jsonify({ 'name': '{0}/{1}'.format(namespace_name, repo_name), - 'tags': [tag.name for tag in tags][0:limit],}) + 'tags': [tag.name for tag in tags][0:limit], + }) pagination_callback(tags, response) return response From 3a8a913ad37b28cae0ff69c07d6f8358cb226f36 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 8 Oct 2018 14:26:39 +0100 Subject: [PATCH 09/13] Convert V2's catalog endpoint to use the new data model interface --- endpoints/v2/catalog.py | 22 +- endpoints/v2/models_interface.py | 288 -------------------- endpoints/v2/models_pre_oci.py | 323 ----------------------- endpoints/v2/test/test_models_pre_oci.py | 25 -- 4 files changed, 17 insertions(+), 641 deletions(-) delete mode 100644 endpoints/v2/models_interface.py delete mode 100644 endpoints/v2/models_pre_oci.py delete mode 100644 endpoints/v2/test/test_models_pre_oci.py diff --git a/endpoints/v2/catalog.py b/endpoints/v2/catalog.py index 6e1d09854..240ab6ac5 100644 --- a/endpoints/v2/catalog.py +++ b/endpoints/v2/catalog.py @@ -1,15 +1,20 @@ -import features +from collections import namedtuple from flask import jsonify +import features + from app import model_cache from auth.auth_context import get_authenticated_user, get_authenticated_context from auth.registry_jwt_auth import process_registry_jwt_auth +from data import model from data.cache import cache_key from endpoints.decorators import anon_protect from endpoints.v2 import v2_bp, paginate -from endpoints.v2.models_interface import Repository -from endpoints.v2.models_pre_oci import data_model as model + + +class Repository(namedtuple('Repository', ['id', 'namespace_name', 'name'])): + pass @v2_bp.route('/_catalog', methods=['GET']) @@ -26,8 +31,15 @@ def catalog_search(start_id, limit, pagination_callback): if username and not get_authenticated_user().enabled: return [] - repos = model.get_visible_repositories(username, start_id, limit, include_public=include_public) - return [repo._asdict() for repo in repos] + query = model.repository.get_visible_repositories(username, + kind_filter='image', + include_public=include_public, + start_id=start_id, + limit=limit + 1) + # NOTE: The repository ID is in `rid` (not `id`) here, as per the requirements of + # the `get_visible_repositories` call. + return [Repository(repo.rid, repo.namespace_user.username, repo.name)._asdict() + for repo in query] context_key = get_authenticated_context().unique_key if get_authenticated_context() else None catalog_cache_key = cache_key.for_catalog_page(context_key, start_id, limit) diff --git a/endpoints/v2/models_interface.py b/endpoints/v2/models_interface.py deleted file mode 100644 index 59557801a..000000000 --- a/endpoints/v2/models_interface.py +++ /dev/null @@ -1,288 +0,0 @@ -from abc import ABCMeta, abstractmethod -from collections import namedtuple - -from namedlist import namedlist -from six import add_metaclass - - -class Repository( - namedtuple('Repository', [ - 'id', 'name', 'namespace_name', 'description', 'is_public', 'kind', 'trust_enabled'])): - """ - Repository represents a namespaced collection of tags. - :type id: int - :type name: string - :type namespace_name: string - :type description: string - :type is_public: bool - :type kind: string - :type trust_enabled: bool - """ - - -class ManifestJSON(namedtuple('ManifestJSON', ['digest', 'json', 'media_type'])): - """ - ManifestJSON represents a Manifest of any format. - """ - - -class Tag(namedtuple('Tag', ['id', 'name', 'repository'])): - """ - Tag represents a user-facing alias for referencing a set of Manifests. - """ - - -class BlobUpload( - namedlist('BlobUpload', [ - 'uuid', 'byte_count', 'uncompressed_byte_count', 'chunk_count', 'sha_state', 'location_name', - 'storage_metadata', 'piece_sha_state', 'piece_hashes', 'repo_namespace_name', 'repo_name'])): - """ - BlobUpload represents the current state of an Blob being uploaded. - """ - - -class Blob(namedtuple('Blob', ['id', 'uuid', 'digest', 'size', 'locations', 'cas_path'])): - """ - Blob represents an opaque binary blob saved to the storage system. - """ - - -class RepositoryReference(namedtuple('RepositoryReference', ['id', 'name', 'namespace_name'])): - """ - RepositoryReference represents a reference to a Repository, without its full metadata. - """ - - -class Label(namedtuple('Label', ['key', 'value', 'source_type', 'media_type'])): - """ - Label represents a key-value pair that describes a particular Manifest. - """ - - -@add_metaclass(ABCMeta) -class DockerRegistryV2DataInterface(object): - """ - Interface that represents all data store interactions required by a Docker Registry v1. - """ - - @abstractmethod - def create_repository(self, namespace_name, repo_name, creating_user=None): - """ - Creates a new repository under the specified namespace with the given name. The user supplied is - the user creating the repository, if any. - """ - pass - - @abstractmethod - def get_repository(self, namespace_name, repo_name): - """ - Returns a repository tuple for the repository with the given name under the given namespace. - Returns None if no such repository was found. - """ - pass - - @abstractmethod - def has_active_tag(self, namespace_name, repo_name, tag_name): - """ - Returns whether there is an active tag for the tag with the given name under the matching - repository, if any, or none if none. - """ - pass - - @abstractmethod - def get_manifest_by_tag(self, namespace_name, repo_name, tag_name): - """ - Returns the current manifest for the tag with the given name under the matching repository, if - any, or None if none. - """ - pass - - @abstractmethod - def get_manifest_by_digest(self, namespace_name, repo_name, digest): - """ - Returns the manifest matching the given digest under the matching repository, if any, or None if - none. - """ - pass - - @abstractmethod - def delete_manifest_by_digest(self, namespace_name, repo_name, digest): - """ - Deletes the manifest with the associated digest (if any) and returns all removed tags that - pointed to that manifest. If the manifest was not found, returns an empty list. - """ - pass - - @abstractmethod - def get_docker_v1_metadata_by_tag(self, namespace_name, repo_name, tag_name): - """ - Returns the Docker V1 metadata associated with the tag with the given name under the matching - repository, if any. If none, returns None. - """ - pass - - @abstractmethod - def get_docker_v1_metadata_by_image_id(self, repository, docker_image_ids): - """ - Returns a map of Docker V1 metadata for each given image ID, matched under the repository with - the given namespace and name. Returns an empty map if the matching repository was not found. - """ - pass - - @abstractmethod - def get_parents_docker_v1_metadata(self, namespace_name, repo_name, docker_image_id): - """ - Returns an ordered list containing the Docker V1 metadata for each parent of the image with the - given docker ID under the matching repository. Returns an empty list if the image was not found. - """ - pass - - @abstractmethod - def create_manifest_and_update_tag(self, namespace_name, repo_name, tag_name, manifest): - """ - Creates a new manifest and assigns the tag with the given name under the matching repository to - it. - """ - pass - - @abstractmethod - def synthesize_v1_image(self, repository, storage, image_id, created, comment, command, - compat_json, parent_image_id): - """ - Synthesizes a V1 image under the specified repository, pointing to the given storage and returns - the V1 metadata for the synthesized image. - """ - pass - - @abstractmethod - def save_manifest(self, repository, tag_name, manifest, blob_map): - """ - Saves a manifest, under the matching repository as a tag with the given name. - - Returns a boolean whether or not the tag was newly created or not. - """ - pass - - @abstractmethod - def repository_tags(self, namespace_name, repo_name, start_id, limit): - """ - Returns the active tags under the repository with the given name and namespace. - """ - pass - - @abstractmethod - def get_visible_repositories(self, username, start_id, limit): - """ - Returns the repositories visible to the user with the given username, if any. - """ - pass - - @abstractmethod - def create_blob_upload(self, namespace_name, repo_name, upload_uuid, location_name, - storage_metadata): - """ - Creates a blob upload under the matching repository with the given UUID and metadata. - Returns whether the matching repository exists. - """ - pass - - @abstractmethod - def blob_upload_by_uuid(self, namespace_name, repo_name, upload_uuid): - """ - Searches for a blob upload with the given UUID under the given repository and returns it or None - if none. - """ - pass - - @abstractmethod - def update_blob_upload(self, blob_upload): - """ - Saves any changes to the blob upload object given to the backing data store. - Fields that can change: - - uncompressed_byte_count - - piece_hashes - - piece_sha_state - - storage_metadata - - byte_count - - chunk_count - - sha_state - """ - pass - - @abstractmethod - def delete_blob_upload(self, namespace_name, repo_name, uuid): - """ - Deletes the blob upload with the given uuid under the matching repository. If none, does - nothing. - """ - pass - - @abstractmethod - def mount_blob_and_temp_tag(self, namespace_name, repo_name, existing_blob, expiration_sec): - """ - Mounts an existing blob and links a temporary tag with the specified expiration to it under - the matching repository. Returns True on success and False on failure. - """ - pass - - @abstractmethod - def create_blob_and_temp_tag(self, namespace_name, repo_name, blob_digest, blob_upload, - expiration_sec): - """ - Creates a blob and links a temporary tag with the specified expiration to it under the matching - repository. - """ - pass - - @abstractmethod - def get_blob_by_digest(self, namespace_name, repo_name, digest): - """ - Returns the blob with the given digest under the matching repository or None if none. - """ - pass - - @abstractmethod - def save_bittorrent_pieces(self, blob, piece_size, piece_bytes): - """ - Saves the BitTorrent piece hashes for the given blob. - """ - pass - - @abstractmethod - def create_manifest_labels(self, namespace_name, repo_name, manifest_digest, labels): - """ - Creates a new labels for the provided manifest. - """ - pass - - @abstractmethod - def get_blob_path(self, blob): - """ - Once everything is moved over, this could be in util.registry and not even touch the database. - """ - pass - - @abstractmethod - def set_manifest_expires_after(self, namespace_name, repo_name, digest, expires_after_sec): - """ - Sets that the manifest with given digest expires after the number of seconds from *now*. - """ - pass - - @abstractmethod - def lookup_blobs_by_digest(self, repository, digests): - """ - Looks up all blobs with the matching digests under the given repository. - """ - pass - - @abstractmethod - def is_namespace_enabled(self, namespace_name): - """ Returns whether the given namespace is enabled. If the namespace doesn't exist, - returns True. """ - pass - - @abstractmethod - def is_repository_public(self, namespace_name, repo_name): - """ Returns True if the repository with the given name exists and is public. """ - pass diff --git a/endpoints/v2/models_pre_oci.py b/endpoints/v2/models_pre_oci.py deleted file mode 100644 index 256fbbc8a..000000000 --- a/endpoints/v2/models_pre_oci.py +++ /dev/null @@ -1,323 +0,0 @@ -from peewee import IntegrityError - -from data import model, database -from data.model import DataModelException -from endpoints.v2.models_interface import ( - Blob, - BlobUpload, - DockerRegistryV2DataInterface, - ManifestJSON, - Repository, - RepositoryReference, - Tag,) -from image.docker.v1 import DockerV1Metadata -from image.docker.interfaces import ManifestInterface -from image.docker.schema1 import DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE - -_MEDIA_TYPE = DOCKER_SCHEMA1_SIGNED_MANIFEST_CONTENT_TYPE - -class PreOCIModel(DockerRegistryV2DataInterface): - """ - PreOCIModel implements the data model for the v2 Docker Registry protocol using a database schema - before it was changed to support the OCI specification. - """ - - def create_repository(self, namespace_name, repo_name, creating_user=None): - return model.repository.create_repository(namespace_name, repo_name, creating_user) - - def get_repository(self, namespace_name, repo_name): - repo = model.repository.get_repository(namespace_name, repo_name) - if repo is None: - return None - return _repository_for_repo(repo) - - def has_active_tag(self, namespace_name, repo_name, tag_name): - try: - model.tag.get_active_tag(namespace_name, repo_name, tag_name) - return True - except database.RepositoryTag.DoesNotExist: - return False - - def has_tag(self, namespace_name, repo_name, tag_name): - try: - model.tag.get_possibly_expired_tag(namespace_name, repo_name, tag_name) - return True - except database.RepositoryTag.DoesNotExist: - return False - - def get_manifest_by_tag(self, namespace_name, repo_name, tag_name): - try: - manifest = model.tag.load_tag_manifest(namespace_name, repo_name, tag_name) - return ManifestJSON(digest=manifest.digest, json=manifest.json_data, media_type=_MEDIA_TYPE) - except model.InvalidManifestException: - return None - - def get_manifest_by_digest(self, namespace_name, repo_name, digest): - try: - manifest = model.tag.load_manifest_by_digest(namespace_name, repo_name, digest) - return ManifestJSON(digest=digest, json=manifest.json_data, media_type=_MEDIA_TYPE) - except model.InvalidManifestException: - return None - - def delete_manifest_by_digest(self, namespace_name, repo_name, digest): - def _tag_view(tag): - return Tag(id=tag.id, name=tag.name, repository=RepositoryReference( - id=tag.repository_id, - name=repo_name, - namespace_name=namespace_name,)) - - tags = model.tag.delete_manifest_by_digest(namespace_name, repo_name, digest) - return [_tag_view(tag) for tag in tags] - - def get_docker_v1_metadata_by_tag(self, namespace_name, repo_name, tag_name): - try: - repo_img = model.tag.get_tag_image(namespace_name, repo_name, tag_name, include_storage=True) - return _docker_v1_metadata(namespace_name, repo_name, repo_img) - except DataModelException: - return None - - def get_docker_v1_metadata_by_image_id(self, repository, docker_image_ids): - images_query = model.image.lookup_repository_images(repository.id, docker_image_ids) - return { - image.docker_image_id: _docker_v1_metadata(repository.namespace_name, repository.name, image) - for image in images_query - } - - def get_parents_docker_v1_metadata(self, namespace_name, repo_name, docker_image_id): - repo_image = model.image.get_repo_image(namespace_name, repo_name, docker_image_id) - if repo_image is None: - return [] - - parents = model.image.get_parent_images(namespace_name, repo_name, repo_image) - return [_docker_v1_metadata(namespace_name, repo_name, image) for image in parents] - - def create_manifest_and_update_tag(self, namespace_name, repo_name, tag_name, manifest): - assert isinstance(manifest, ManifestInterface) - repo = model.repository.get_repository(namespace_name, repo_name) - if repo is None: - return - - blob_map = self.lookup_blobs_by_digest(repo, manifest.checksums) - storage_map = {blob.digest: blob.id for blob_digest, blob in blob_map.iteritems()} - try: - model.tag.associate_generated_tag_manifest(namespace_name, repo_name, tag_name, manifest, - storage_map) - except IntegrityError: - # It's already there! - pass - - def synthesize_v1_image(self, repository, storage, image_id, created, comment, command, - compat_json, parent_image_id): - parent_image = None - if parent_image_id is not None: - parent_image = model.image.get_image(repository.id, parent_image_id) - if parent_image is None: - raise DataModelException('Unknown parent image: %s' % parent_image_id) - - repo_image = model.image.synthesize_v1_image(repository.id, storage.id, storage.size, - image_id, created, comment, command, compat_json, - parent_image) - return _docker_v1_metadata(repository.namespace_name, repository.name, repo_image) - - def save_manifest(self, repository, tag_name, manifest, leaf_layer_id, blob_map): - assert isinstance(manifest, ManifestInterface) - storage_map = {blob.digest: blob.id for blob_digest, blob in blob_map.iteritems()} - (_, newly_created) = model.tag.store_tag_manifest_for_repo(repository.id, tag_name, manifest, - leaf_layer_id, storage_map) - return newly_created - - def repository_tags(self, namespace_name, repo_name, start_id, limit): - def _tag_view(tag): - return Tag(id=tag.id, name=tag.name, repository=RepositoryReference( - id=tag.repository_id, - name=repo_name, - namespace_name=namespace_name,)) - - tags_query = model.tag.list_repository_tags(namespace_name, repo_name) - tags_query = (tags_query - .order_by(database.RepositoryTag.id) - .limit(limit + 1)) - - if start_id is not None: - tags_query = tags_query.where(database.RepositoryTag.id >= start_id) - - return [_tag_view(tag) for tag in tags_query] - - def get_visible_repositories(self, username, start_id, limit, include_public=None): - if include_public is None: - include_public = (username is None) - - query = model.repository.get_visible_repositories(username, - kind_filter='image', - include_public=include_public, - start_id=start_id, - limit=limit + 1) - return [_repository_for_repo(repo) for repo in query] - - def create_blob_upload(self, namespace_name, repo_name, upload_uuid, location_name, - storage_metadata): - try: - model.blob.initiate_upload(namespace_name, repo_name, upload_uuid, location_name, - storage_metadata) - return True - except database.Repository.DoesNotExist: - return False - - def blob_upload_by_uuid(self, namespace_name, repo_name, upload_uuid): - try: - found = model.blob.get_blob_upload(namespace_name, repo_name, upload_uuid) - except model.InvalidBlobUpload: - return None - - return BlobUpload( - repo_namespace_name=namespace_name, - repo_name=repo_name, - uuid=upload_uuid, - byte_count=found.byte_count, - uncompressed_byte_count=found.uncompressed_byte_count, - chunk_count=found.chunk_count, - sha_state=found.sha_state, - piece_sha_state=found.piece_sha_state, - piece_hashes=found.piece_hashes, - location_name=found.location.name, - storage_metadata=found.storage_metadata,) - - def update_blob_upload(self, blob_upload): - # Lookup the blob upload object. - try: - blob_upload_record = model.blob.get_blob_upload(blob_upload.repo_namespace_name, - blob_upload.repo_name, blob_upload.uuid) - except model.InvalidBlobUpload: - return - - blob_upload_record.uncompressed_byte_count = blob_upload.uncompressed_byte_count - blob_upload_record.piece_hashes = blob_upload.piece_hashes - blob_upload_record.piece_sha_state = blob_upload.piece_sha_state - blob_upload_record.storage_metadata = blob_upload.storage_metadata - blob_upload_record.byte_count = blob_upload.byte_count - blob_upload_record.chunk_count = blob_upload.chunk_count - blob_upload_record.sha_state = blob_upload.sha_state - blob_upload_record.save() - - def delete_blob_upload(self, namespace_name, repo_name, uuid): - try: - found = model.blob.get_blob_upload(namespace_name, repo_name, uuid) - found.delete_instance() - except model.InvalidBlobUpload: - return - - def mount_blob_and_temp_tag(self, namespace_name, repo_name, existing_blob, expiration_sec): - return model.blob.temp_link_blob(namespace_name, repo_name, existing_blob.digest, - expiration_sec) - - def create_blob_and_temp_tag(self, namespace_name, repo_name, blob_digest, blob_upload, - expiration_sec): - location_obj = model.storage.get_image_location_for_name(blob_upload.location_name) - blob_record = model.blob.store_blob_record_and_temp_link( - namespace_name, repo_name, blob_digest, location_obj.id, blob_upload.byte_count, - expiration_sec, blob_upload.uncompressed_byte_count) - return Blob( - id=blob_record.id, - uuid=blob_record.uuid, - digest=blob_digest, - size=blob_upload.byte_count, - locations=[blob_upload.location_name], - cas_path=blob_record.cas_path - ) - - def lookup_blobs_by_digest(self, repository, digests): - def _blob_view(blob_record): - return Blob( - id=blob_record.id, - uuid=blob_record.uuid, - digest=blob_record.content_checksum, - size=blob_record.image_size, - cas_path=blob_record.cas_path, - locations=None, # Note: Locations is None in this case. - ) - - query = model.storage.lookup_repo_storages_by_content_checksum(repository.id, digests) - return {storage.content_checksum: _blob_view(storage) for storage in query} - - def get_blob_by_digest(self, namespace_name, repo_name, digest): - try: - blob_record = model.blob.get_repo_blob_by_digest(namespace_name, repo_name, digest) - return Blob( - id=blob_record.id, - uuid=blob_record.uuid, - digest=digest, - size=blob_record.image_size, - locations=list(blob_record.locations), - cas_path=blob_record.cas_path - ) - except model.BlobDoesNotExist: - return None - - def save_bittorrent_pieces(self, blob, piece_size, piece_bytes): - blob_record = model.storage.get_storage_by_uuid(blob.uuid) - model.storage.save_torrent_info(blob_record, piece_size, piece_bytes) - - def create_manifest_labels(self, namespace_name, repo_name, manifest_digest, labels): - if not labels: - # No point in doing anything more. - return - - tag_manifest = model.tag.load_manifest_by_digest(namespace_name, repo_name, manifest_digest) - for label in labels: - model.label.create_manifest_label(tag_manifest, label.key, label.value, label.source_type, - label.media_type) - - def get_blob_path(self, blob): - return model.storage.get_layer_path_for_storage(blob.uuid, blob.cas_path, blob.digest) - - def set_manifest_expires_after(self, namespace_name, repo_name, digest, expires_after_sec): - try: - manifest = model.tag.load_manifest_by_digest(namespace_name, repo_name, digest) - manifest.tag.lifetime_end_ts = manifest.tag.lifetime_start_ts + expires_after_sec - manifest.tag.save() - except model.InvalidManifestException: - return - - def is_namespace_enabled(self, namespace_name): - namespace = model.user.get_namespace_user(namespace_name) - return namespace is None or namespace.enabled - - def is_repository_public(self, namespace_name, repo_name): - return model.repository.repository_is_public(namespace_name, repo_name) - - -def _docker_v1_metadata(namespace_name, repo_name, repo_image): - """ - Returns a DockerV1Metadata object for the given Pre-OCI repo_image under the - repository with the given namespace and name. Note that the namespace and - name are passed here as an optimization, and are *not checked* against the - image. - """ - return DockerV1Metadata( - namespace_name=namespace_name, - repo_name=repo_name, - image_id=repo_image.docker_image_id, - checksum=repo_image.v1_checksum, - content_checksum=repo_image.storage.content_checksum, - compat_json=repo_image.v1_json_metadata, - created=repo_image.created, - comment=repo_image.comment, - command=repo_image.command, - # TODO: make sure this isn't needed anywhere, as it is expensive to lookup - parent_image_id=None, - ) - - -def _repository_for_repo(repo): - """ Returns a Repository object representing the Pre-OCI data model repo instance given. """ - return Repository( - id=repo.id or repo.rid, - name=repo.name, - namespace_name=repo.namespace_user.username, - description=repo.description, - is_public=model.repository.is_repository_public(repo), - kind=model.repository.get_repo_kind_name(repo), - trust_enabled=repo.trust_enabled,) - - -data_model = PreOCIModel() diff --git a/endpoints/v2/test/test_models_pre_oci.py b/endpoints/v2/test/test_models_pre_oci.py deleted file mode 100644 index c0e232939..000000000 --- a/endpoints/v2/test/test_models_pre_oci.py +++ /dev/null @@ -1,25 +0,0 @@ -import hashlib - -from playhouse.test_utils import assert_query_count - -from data import model -from data.database import ImageStorageLocation -from endpoints.v2.models_pre_oci import data_model -from test.fixtures import * - -def test_get_blob_path(initialized_db): - # Add a blob. - digest = 'sha256:' + hashlib.sha256("a").hexdigest() - location = ImageStorageLocation.get(name='local_us') - db_blob = model.blob.store_blob_record_and_temp_link('devtable', 'simple', digest, location, 1, - 10000000) - - with assert_query_count(1): - blob = data_model.get_blob_by_digest('devtable', 'simple', digest) - assert blob.uuid == db_blob.uuid - - # The blob tuple should have everything get_blob_path needs, so there should be no queries. - with assert_query_count(0): - assert data_model.get_blob_path(blob) - - assert data_model.get_blob_path(blob) == model.storage.get_layer_path(db_blob) From cbf0edb164eefb4ace3e880dc18116de9c2bdccb Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 8 Oct 2018 15:28:43 +0100 Subject: [PATCH 10/13] Address remaining usage of the old data interface --- data/registry_model/datatypes.py | 9 +++++++++ endpoints/v2/__init__.py | 12 ++++++------ endpoints/v2/v2auth.py | 31 ++++++++++++++++--------------- 3 files changed, 31 insertions(+), 21 deletions(-) diff --git a/data/registry_model/datatypes.py b/data/registry_model/datatypes.py index 78f038479..93c81a448 100644 --- a/data/registry_model/datatypes.py +++ b/data/registry_model/datatypes.py @@ -47,6 +47,15 @@ class RepositoryReference(datatype('Repository', [])): return model.repository.is_repository_public(self._repository_obj) + @property + def trust_enabled(self): + """ Returns whether trust is enabled in this repository. """ + repository = self._repository_obj + if repository is None: + return None + + return repository.trust_enabled + @property def id(self): """ Returns the database ID of the repository. """ diff --git a/endpoints/v2/__init__.py b/endpoints/v2/__init__.py index 520dd2e07..d20ad76df 100644 --- a/endpoints/v2/__init__.py +++ b/endpoints/v2/__init__.py @@ -15,9 +15,9 @@ from auth.auth_context import get_authenticated_context from auth.permissions import ( ReadRepositoryPermission, ModifyRepositoryPermission, AdministerRepositoryPermission) from auth.registry_jwt_auth import process_registry_jwt_auth, get_auth_headers +from data.registry_model import registry_model from endpoints.decorators import anon_protect, anon_allowed, route_show_if from endpoints.v2.errors import V2RegistryException, Unauthorized, Unsupported, NameUnknown -from endpoints.v2.models_pre_oci import data_model as model from util.http import abort from util.metrics.metricqueue import time_blueprint from util.registry.dockerver import docker_version @@ -98,15 +98,15 @@ def _require_repo_permission(permission_class, scopes=None, allow_public=False): repository = namespace_name + '/' + repo_name if allow_public: - repo = model.get_repository(namespace_name, repo_name) - if repo is None or not repo.is_public: + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None or not repository_ref.is_public: raise Unauthorized(repository=repository, scopes=scopes) - if repo.kind != 'image': - msg = 'This repository is for managing %s resources and not container images.' % repo.kind + if repository_ref.kind != 'image': + msg = 'This repository is for managing %s and not container images.' % repository_ref.kind raise Unsupported(detail=msg) - if repo.is_public: + if repository_ref.is_public: return func(namespace_name, repo_name, *args, **kwargs) raise Unauthorized(repository=repository, scopes=scopes) diff --git a/endpoints/v2/v2auth.py b/endpoints/v2/v2auth.py index bceb14e55..9cb80371c 100644 --- a/endpoints/v2/v2auth.py +++ b/endpoints/v2/v2auth.py @@ -11,11 +11,12 @@ from auth.auth_context import get_authenticated_context, get_authenticated_user from auth.decorators import process_basic_auth from auth.permissions import (ModifyRepositoryPermission, ReadRepositoryPermission, CreateRepositoryPermission, AdministerRepositoryPermission) +from data import model +from data.registry_model import registry_model from endpoints.decorators import anon_protect from endpoints.v2 import v2_bp from endpoints.v2.errors import (InvalidLogin, NameInvalid, InvalidRequest, Unsupported, Unauthorized, NamespaceDisabled) -from endpoints.v2.models_pre_oci import data_model as model from util.cache import no_cache from util.names import parse_namespace_repository, REPOSITORY_NAME_REGEX from util.security.registry_jwt import (generate_bearer_token, build_context_and_subject, @@ -117,11 +118,11 @@ def _get_scope_regex(): return re.compile(scope_regex_string) -def _get_tuf_root(repo, namespace, reponame): - if not features.SIGNING or repo is None or not repo.trust_enabled: +def _get_tuf_root(repository_ref, namespace, reponame): + if not features.SIGNING or repository_ref is None or not repository_ref.trust_enabled: return DISABLED_TUF_ROOT - # Users with write access to a repo will see signer-rooted TUF metadata + # Users with write access to a repository will see signer-rooted TUF metadata if ModifyRepositoryPermission(namespace, reponame).can(): return SIGNER_TUF_ROOT return QUAY_TUF_ROOT @@ -162,18 +163,18 @@ def _authorize_or_downscope_request(scope_param, has_valid_auth_context): raise NameInvalid(message='Invalid repository name: %s' % namespace_and_repo) # Ensure the namespace is enabled. - if not model.is_namespace_enabled(namespace): + if not registry_model.is_namespace_enabled(namespace): msg = 'Namespace %s has been disabled. Please contact a system administrator.' % namespace raise NamespaceDisabled(message=msg) final_actions = [] - repo = model.get_repository(namespace, reponame) - repo_is_public = repo is not None and repo.is_public + repository_ref = registry_model.lookup_repository(namespace, reponame) + repo_is_public = repository_ref is not None and repository_ref.is_public invalid_repo_message = '' - if repo is not None and repo.kind != 'image': + if repository_ref is not None and repository_ref.kind != 'image': invalid_repo_message = (( - 'This repository is for managing %s resources ' + 'and not container images.') % repo.kind) + 'This repository is for managing %s ' + 'and not container images.') % repository_ref.kind) if 'push' in actions: # Check if there is a valid user or token, as otherwise the repository cannot be @@ -181,9 +182,9 @@ def _authorize_or_downscope_request(scope_param, has_valid_auth_context): if has_valid_auth_context: # Lookup the repository. If it exists, make sure the entity has modify # permission. Otherwise, make sure the entity has create permission. - if repo: + if repository_ref: if ModifyRepositoryPermission(namespace, reponame).can(): - if repo.kind != 'image': + if repository_ref.kind != 'image': raise Unsupported(message=invalid_repo_message) final_actions.append('push') @@ -193,7 +194,7 @@ def _authorize_or_downscope_request(scope_param, has_valid_auth_context): user = get_authenticated_user() if CreateRepositoryPermission(namespace).can() and user is not None: logger.debug('Creating repository: %s/%s', namespace, reponame) - model.create_repository(namespace, reponame, user) + model.repository.create_repository(namespace, reponame, user) final_actions.append('push') else: logger.debug('No permission to create repository %s/%s', namespace, reponame) @@ -201,7 +202,7 @@ def _authorize_or_downscope_request(scope_param, has_valid_auth_context): if 'pull' in actions: # Grant pull if the user can read the repo or it is public. if ReadRepositoryPermission(namespace, reponame).can() or repo_is_public: - if repo is not None and repo.kind != 'image': + if repository_ref is not None and repository_ref.kind != 'image': raise Unsupported(message=invalid_repo_message) final_actions.append('pull') @@ -211,7 +212,7 @@ def _authorize_or_downscope_request(scope_param, has_valid_auth_context): if '*' in actions: # Grant * user is admin if AdministerRepositoryPermission(namespace, reponame).can(): - if repo is not None and repo.kind != 'image': + if repository_ref is not None and repository_ref.kind != 'image': raise Unsupported(message=invalid_repo_message) final_actions.append('*') @@ -220,4 +221,4 @@ def _authorize_or_downscope_request(scope_param, has_valid_auth_context): return scopeResult(actions=final_actions, namespace=namespace, repository=reponame, registry_and_repo=registry_and_repo, - tuf_root=_get_tuf_root(repo, namespace, reponame)) + tuf_root=_get_tuf_root(repository_ref, namespace, reponame)) From 4a7b4ad06ad6566c14849aa1d1394f8e47b8f9c1 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 8 Oct 2018 16:32:09 +0100 Subject: [PATCH 11/13] Fix disabled namespace check --- data/cache/cache_key.py | 4 ++-- data/model/tag.py | 9 +++++---- data/registry_model/datatype.py | 9 ++++----- data/registry_model/interface.py | 8 +++++++- data/registry_model/label_handlers.py | 4 ++-- data/registry_model/registry_pre_oci_model.py | 15 +++++++++------ endpoints/v2/blob.py | 2 +- endpoints/v2/manifest.py | 2 +- endpoints/v2/v2auth.py | 2 +- 9 files changed, 32 insertions(+), 23 deletions(-) diff --git a/data/cache/cache_key.py b/data/cache/cache_key.py index 94d6f62a4..924d72b12 100644 --- a/data/cache/cache_key.py +++ b/data/cache/cache_key.py @@ -5,9 +5,9 @@ class CacheKey(namedtuple('CacheKey', ['key', 'expiration'])): pass -def for_repository_blob(namespace_name, repo_name, digest): +def for_repository_blob(namespace_name, repo_name, digest, version): """ Returns a cache key for a blob in a repository. """ - return CacheKey('repo_blob__%s_%s_%s' % (namespace_name, repo_name, digest), '60s') + return CacheKey('repo_blob__%s_%s_%s_%s' % (namespace_name, repo_name, digest, version), '60s') def for_catalog_page(auth_context_key, start_id, limit): diff --git a/data/model/tag.py b/data/model/tag.py index 705ebe6dc..7a92fda6e 100644 --- a/data/model/tag.py +++ b/data/model/tag.py @@ -214,7 +214,8 @@ def list_active_repo_tags(repo, start_id=None, limit=None): .join(ImageStorage) .where(RepositoryTag.repository == repo, RepositoryTag.hidden == False) .switch(RepositoryTag) - .join(TagManifest, JOIN.LEFT_OUTER)) + .join(TagManifest, JOIN.LEFT_OUTER) + .order_by(RepositoryTag.id)) if start_id is not None: query = query.where(RepositoryTag.id >= start_id) @@ -775,11 +776,11 @@ def change_repository_tag_expiration(namespace_name, repo_name, tag_name, expira def set_tag_expiration_for_manifest(tag_manifest, expiration_sec): """ - Changes the expiration of the tag that point to the given manifest to be its lifetime start + + Changes the expiration of the tag that points to the given manifest to be its lifetime start + the expiration seconds. """ - expiration_time_in_seconds = tag_manifest.tag.lifetime_start_ts + expiration_sec - expiration_date = datetime.utcfromtimestamp(expiration_time_in_seconds) + expiration_time_ts = tag_manifest.tag.lifetime_start_ts + expiration_sec + expiration_date = datetime.utcfromtimestamp(expiration_time_ts) return change_tag_expiration(tag_manifest.tag, expiration_date) diff --git a/data/registry_model/datatype.py b/data/registry_model/datatype.py index 1b2768e83..091776bb1 100644 --- a/data/registry_model/datatype.py +++ b/data/registry_model/datatype.py @@ -4,7 +4,7 @@ from functools import wraps, total_ordering class FromDictionaryException(Exception): """ Exception raised if constructing a data type from a dictionary fails due to - a version mismatch or missing data. + missing data. """ def datatype(name, static_fields): @@ -40,9 +40,6 @@ def datatype(name, static_fields): @classmethod def from_dict(cls, dict_data): - if dict_data.get('version') != 1: - raise FromDictionaryException() - try: return cls(**dict_data) except: @@ -50,9 +47,11 @@ def datatype(name, static_fields): def asdict(self): dictionary_rep = dict(self._fields) + assert ('db_id' not in dictionary_rep and + 'inputs' not in dictionary_rep) + dictionary_rep['db_id'] = self._db_id dictionary_rep['inputs'] = self._inputs - dictionary_rep['version'] = 1 return dictionary_rep return DataType diff --git a/data/registry_model/interface.py b/data/registry_model/interface.py index f7a1dadc5..544462c84 100644 --- a/data/registry_model/interface.py +++ b/data/registry_model/interface.py @@ -171,6 +171,10 @@ class RegistryDataInterface(object): it should be removed. """ + @abstractmethod + def is_existing_disabled_namespace(self, namespace_name): + """ Returns whether the given namespace exists and is disabled. """ + @abstractmethod def is_namespace_enabled(self, namespace_name): """ Returns whether the given namespace exists and is enabled. """ @@ -274,7 +278,9 @@ class RegistryDataInterface(object): Mounts the blob from another repository into the specified target repository, and adds an expiration before that blob is automatically GCed. This function is useful during push operations if an existing blob from another repositroy is being pushed. Returns False if - the mounting fails. Note that this function does *not* check security for mounting the blob. + the mounting fails. Note that this function does *not* check security for mounting the blob + and the caller is responsible for doing this check (an example can be found in + endpoints/v2/blob.py). """ @abstractmethod diff --git a/data/registry_model/label_handlers.py b/data/registry_model/label_handlers.py index 07635537a..96afe0d94 100644 --- a/data/registry_model/label_handlers.py +++ b/data/registry_model/label_handlers.py @@ -17,12 +17,12 @@ def _expires_after(label_dict, manifest, model): model.set_tags_expiration_for_manifest(manifest, total_seconds) -_LABEL_HANDLES = { +_LABEL_HANDLERS = { 'quay.expires-after': _expires_after, } def apply_label_to_manifest(label_dict, manifest, model): """ Runs the handler defined, if any, for the given label. """ - handler = _LABEL_HANDLES.get(label_dict['key']) + handler = _LABEL_HANDLERS.get(label_dict['key']) if handler is not None: handler(label_dict, manifest, model) diff --git a/data/registry_model/registry_pre_oci_model.py b/data/registry_model/registry_pre_oci_model.py index e119852b6..f3bb14953 100644 --- a/data/registry_model/registry_pre_oci_model.py +++ b/data/registry_model/registry_pre_oci_model.py @@ -257,8 +257,6 @@ class PreOCIModel(RegistryDataInterface): media_type_name=media_type_name)) yield add_label - if labels_to_add: - pass # TODO: make this truly batch once we've fully transitioned to V2_2 and no longer need # the mapping tables. @@ -537,6 +535,11 @@ class PreOCIModel(RegistryDataInterface): return Manifest.for_tag_manifest(tag_manifest) + def is_existing_disabled_namespace(self, namespace_name): + """ Returns whether the given namespace exists and is disabled. """ + namespace = model.user.get_namespace_user(namespace_name) + return namespace is not None and not namespace.enabled + def is_namespace_enabled(self, namespace_name): """ Returns whether the given namespace exists and is enabled. """ namespace = model.user.get_namespace_user(namespace_name) @@ -732,9 +735,9 @@ class PreOCIModel(RegistryDataInterface): return blob_found.asdict() - blob_cache_key = cache_key.for_repository_blob(namespace_name, repo_name, blob_digest) + blob_cache_key = cache_key.for_repository_blob(namespace_name, repo_name, blob_digest, 2) blob_dict = model_cache.retrieve(blob_cache_key, load_blob) - + try: return Blob.from_dict(blob_dict) if blob_dict is not None else None except FromDictionaryException: @@ -841,7 +844,7 @@ class PreOCIModel(RegistryDataInterface): """ Mounts the blob from another repository into the specified target repository, and adds an expiration before that blob is automatically GCed. This function is useful during push - operations if an existing blob from another repositroy is being pushed. Returns False if + operations if an existing blob from another repository is being pushed. Returns False if the mounting fails. """ repo = model.repository.lookup_repository(target_repository_ref._db_id) @@ -862,7 +865,7 @@ class PreOCIModel(RegistryDataInterface): try: tag_manifest = database.TagManifest.get(id=manifest._db_id) except database.TagManifest.DoesNotExist: - return None + return model.tag.set_tag_expiration_for_manifest(tag_manifest, expiration_sec) diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 0aa34680a..73ffc553f 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -190,7 +190,7 @@ def start_blob_upload(namespace_name, repo_name): raise InvalidRequest() # Check if the blob will be uploaded now or in followup calls. If the `digest` is given, then - # the upload will occur as a monolithic chunk in this call. Ohterwise, we return a redirect + # the upload will occur as a monolithic chunk in this call. Otherwise, we return a redirect # for the client to upload the chunks as distinct operations. digest = request.args.get('digest', None) if digest is None: diff --git a/endpoints/v2/manifest.py b/endpoints/v2/manifest.py index 888a68b9a..f1c897868 100644 --- a/endpoints/v2/manifest.py +++ b/endpoints/v2/manifest.py @@ -44,7 +44,7 @@ def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref): if tag is None: if registry_model.has_expired_tag(repository_ref, manifest_ref): logger.debug('Found expired tag %s for repository %s/%s', manifest_ref, namespace_name, - repo_name) + repo_name) msg = 'Tag %s was deleted or has expired. To pull, revive via time machine' % manifest_ref raise TagExpired(msg) diff --git a/endpoints/v2/v2auth.py b/endpoints/v2/v2auth.py index 9cb80371c..e21e61e57 100644 --- a/endpoints/v2/v2auth.py +++ b/endpoints/v2/v2auth.py @@ -163,7 +163,7 @@ def _authorize_or_downscope_request(scope_param, has_valid_auth_context): raise NameInvalid(message='Invalid repository name: %s' % namespace_and_repo) # Ensure the namespace is enabled. - if not registry_model.is_namespace_enabled(namespace): + if registry_model.is_existing_disabled_namespace(namespace): msg = 'Namespace %s has been disabled. Please contact a system administrator.' % namespace raise NamespaceDisabled(message=msg) From d18a7935e172c4fbc912a47e43c407da190cbadb Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 18 Oct 2018 14:42:54 -0400 Subject: [PATCH 12/13] Fix pytest fixture import issue with hashability of tmpdir_factory --- test/fixtures.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/test/fixtures.py b/test/fixtures.py index e442234b8..3c0c1cde5 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -29,13 +29,22 @@ from initdb import initialize_database, populate_database from path_converters import APIRepositoryPathConverter, RegexConverter, RepositoryPathConverter from test.testconfig import FakeTransaction +INIT_DB_PATH = 0 + @pytest.fixture(scope="session") -@lru_cache(maxsize=1) # Important! pytest is calling this multiple times (despite it being session) def init_db_path(tmpdir_factory): """ Creates a new database and appropriate configuration. Note that the initial database is created *once* per session. In the non-full-db-test case, the database_uri fixture makes a copy of the SQLite database file on disk and passes a new copy to each test. """ + # NOTE: We use a global here because pytest runs this code multiple times, due to the fixture + # being imported instead of being in a conftest. Moving to conftest has its own issues, and this + # call is quite slow, so we simply cache it here. + global INIT_DB_PATH + INIT_DB_PATH = INIT_DB_PATH or _init_db_path(tmpdir_factory) + return INIT_DB_PATH + +def _init_db_path(tmpdir_factory): if os.environ.get('TEST_DATABASE_URI'): return _init_db_path_real_db(os.environ.get('TEST_DATABASE_URI')) From 88f19ee0b9465266c8b264812177612a87c87d8a Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 29 Oct 2018 12:21:18 -0400 Subject: [PATCH 13/13] Make blob upload errors more specific --- data/registry_model/blobuploader.py | 5 ++++- endpoints/v2/blob.py | 8 ++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/data/registry_model/blobuploader.py b/data/registry_model/blobuploader.py index a356f78e6..478a3d9af 100644 --- a/data/registry_model/blobuploader.py +++ b/data/registry_model/blobuploader.py @@ -24,6 +24,9 @@ BLOB_CONTENT_TYPE = 'application/octet-stream' class BlobUploadException(Exception): """ Base for all exceptions raised when uploading blobs. """ +class BlobRangeMismatchException(BlobUploadException): + """ Exception raised if the range to be uploaded does not match. """ + class BlobDigestMismatchException(BlobUploadException): """ Exception raised if the digest requested does not match that of the contents uploaded. """ @@ -134,7 +137,7 @@ class _BlobUploadManager(object): if start_offset > 0 and start_offset > self.blob_upload.byte_count: logger.error('start_offset provided greater than blob_upload.byte_count') - raise BlobUploadException() + raise BlobRangeMismatchException() # Ensure that we won't go over the allowed maximum size for blobs. max_blob_size = bitmath.parse_string_unsafe(self.settings.maximum_blob_size) diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 73ffc553f..0bfbe7b52 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -10,7 +10,8 @@ from data import database from data.registry_model import registry_model from data.registry_model.blobuploader import (create_blob_upload, retrieve_blob_upload_manager, complete_when_uploaded, BlobUploadSettings, - BlobUploadException, BlobTooLargeException) + BlobUploadException, BlobTooLargeException, + BlobRangeMismatchException) from digest import digest_tools from endpoints.decorators import anon_protect, parse_repository_name from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream @@ -428,7 +429,10 @@ def _upload_chunk(blob_uploader, commit_digest=None): return blob_uploader.commit_to_blob(app.config, commit_digest) except BlobTooLargeException as ble: raise LayerTooLarge(uploaded=ble.uploaded, max_allowed=ble.max_allowed) - except BlobUploadException: + except BlobRangeMismatchException: logger.exception('Exception when uploading blob to %s', blob_uploader.blob_upload_id) _abort_range_not_satisfiable(blob_uploader.blob_upload.byte_count, blob_uploader.blob_upload_id) + except BlobUploadException: + logger.exception('Exception when uploading blob to %s', blob_uploader.blob_upload_id) + raise BlobUploadInvalid()