Fix image replication for images with remote layers
This commit is contained in:
		
							parent
							
								
									0eb84f8077
								
							
						
					
					
						commit
						180d8847db
					
				
					 7 changed files with 103 additions and 40 deletions
				
			
		|  | @ -191,11 +191,8 @@ class RegistryDataInterface(object): | ||||||
|     """ Returns whether the given namespace exists and is enabled. """ |     """ Returns whether the given namespace exists and is enabled. """ | ||||||
| 
 | 
 | ||||||
|   @abstractmethod |   @abstractmethod | ||||||
|   def list_manifest_layers(self, manifest, include_placements=False): |   def get_manifest_local_blobs(self, manifest, include_placements=False): | ||||||
|     """ Returns an *ordered list* of the layers found in the manifest, starting at the base and |     """ Returns the set of local blobs for the given manifest or None if none. """ | ||||||
|         working towards the leaf, including the associated Blob and its placements (if specified). |  | ||||||
|         Returns None if the manifest could not be parsed and validated. |  | ||||||
|     """ |  | ||||||
| 
 | 
 | ||||||
|   @abstractmethod |   @abstractmethod | ||||||
|   def list_parsed_manifest_layers(self, repository_ref, parsed_manifest, include_placements=False): |   def list_parsed_manifest_layers(self, repository_ref, parsed_manifest, include_placements=False): | ||||||
|  |  | ||||||
|  | @ -531,5 +531,15 @@ class OCIModel(SharedModel, RegistryDataInterface): | ||||||
|     return self._list_manifest_layers(repository_ref._db_id, parsed_manifest, include_placements, |     return self._list_manifest_layers(repository_ref._db_id, parsed_manifest, include_placements, | ||||||
|                                       by_manifest=True) |                                       by_manifest=True) | ||||||
| 
 | 
 | ||||||
|  |   def get_manifest_local_blobs(self, manifest, include_placements=False): | ||||||
|  |     """ Returns the set of local blobs for the given manifest or None if none. """ | ||||||
|  |     try: | ||||||
|  |       manifest_row = database.Manifest.get(id=manifest._db_id) | ||||||
|  |     except database.Manifest.DoesNotExist: | ||||||
|  |       return None | ||||||
|  | 
 | ||||||
|  |     return self._get_manifest_local_blobs(manifest, manifest_row.repository_id, include_placements, | ||||||
|  |                                           by_manifest=True) | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| oci_model = OCIModel() | oci_model = OCIModel() | ||||||
|  |  | ||||||
|  | @ -584,5 +584,15 @@ class PreOCIModel(SharedModel, RegistryDataInterface): | ||||||
|     """ |     """ | ||||||
|     return self._list_manifest_layers(repository_ref._db_id, parsed_manifest, include_placements) |     return self._list_manifest_layers(repository_ref._db_id, parsed_manifest, include_placements) | ||||||
| 
 | 
 | ||||||
|  |   def get_manifest_local_blobs(self, manifest, include_placements=False): | ||||||
|  |     """ Returns the set of local blobs for the given manifest or None if none. """ | ||||||
|  |     try: | ||||||
|  |       tag_manifest = database.TagManifest.get(id=manifest._db_id) | ||||||
|  |     except database.TagManifest.DoesNotExist: | ||||||
|  |       return None | ||||||
|  | 
 | ||||||
|  |     return self._get_manifest_local_blobs(manifest, tag_manifest.tag.repository_id, | ||||||
|  |                                           include_placements) | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| pre_oci_model = PreOCIModel() | pre_oci_model = PreOCIModel() | ||||||
|  |  | ||||||
|  | @ -290,6 +290,32 @@ class SharedModel: | ||||||
| 
 | 
 | ||||||
|     return LegacyImage.for_image(image, images_map=parent_images_map, blob=blob) |     return LegacyImage.for_image(image, images_map=parent_images_map, blob=blob) | ||||||
| 
 | 
 | ||||||
|  |   def _get_manifest_local_blobs(self, manifest, repo_id, include_placements=False, | ||||||
|  |                                 by_manifest=False): | ||||||
|  |     parsed = manifest.get_parsed_manifest() | ||||||
|  |     if parsed is None: | ||||||
|  |       return None | ||||||
|  | 
 | ||||||
|  |     local_blob_digests = list(set(parsed.local_blob_digests)) | ||||||
|  |     if not len(local_blob_digests): | ||||||
|  |       return [] | ||||||
|  | 
 | ||||||
|  |     blob_query = model.storage.lookup_repo_storages_by_content_checksum(repo_id, | ||||||
|  |                                                                         local_blob_digests, | ||||||
|  |                                                                         by_manifest=by_manifest) | ||||||
|  |     blobs = [] | ||||||
|  |     for image_storage in blob_query: | ||||||
|  |       placements = None | ||||||
|  |       if include_placements: | ||||||
|  |         placements = list(model.storage.get_storage_locations(image_storage.uuid)) | ||||||
|  | 
 | ||||||
|  |       blob = Blob.for_image_storage(image_storage, | ||||||
|  |                                     storage_path=model.storage.get_layer_path(image_storage), | ||||||
|  |                                     placements=placements) | ||||||
|  |       blobs.append(blob) | ||||||
|  | 
 | ||||||
|  |     return blobs | ||||||
|  | 
 | ||||||
|   def _list_manifest_layers(self, repo_id, parsed, include_placements=False, by_manifest=False): |   def _list_manifest_layers(self, repo_id, parsed, include_placements=False, by_manifest=False): | ||||||
|     """ Returns an *ordered list* of the layers found in the manifest, starting at the base and |     """ Returns an *ordered list* of the layers found in the manifest, starting at the base and | ||||||
|         working towards the leaf, including the associated Blob and its placements (if specified). |         working towards the leaf, including the associated Blob and its placements (if specified). | ||||||
|  |  | ||||||
|  | @ -462,7 +462,7 @@ def test_is_namespace_enabled(namespace, expect_enabled, registry_model): | ||||||
|   ('devtable', 'history'), |   ('devtable', 'history'), | ||||||
|   ('buynlarge', 'orgrepo'), |   ('buynlarge', 'orgrepo'), | ||||||
| ]) | ]) | ||||||
| def test_list_manifest_layers(repo_namespace, repo_name, registry_model): | def test_layers_and_blobs(repo_namespace, repo_name, registry_model): | ||||||
|   repository_ref = registry_model.lookup_repository(repo_namespace, repo_name) |   repository_ref = registry_model.lookup_repository(repo_namespace, repo_name) | ||||||
|   tags = registry_model.list_repository_tags(repository_ref) |   tags = registry_model.list_repository_tags(repository_ref) | ||||||
|   assert tags |   assert tags | ||||||
|  | @ -471,10 +471,14 @@ def test_list_manifest_layers(repo_namespace, repo_name, registry_model): | ||||||
|     manifest = registry_model.get_manifest_for_tag(tag) |     manifest = registry_model.get_manifest_for_tag(tag) | ||||||
|     assert manifest |     assert manifest | ||||||
| 
 | 
 | ||||||
|     layers = registry_model.list_manifest_layers(manifest) |     parsed = manifest.get_parsed_manifest() | ||||||
|  |     assert parsed | ||||||
|  | 
 | ||||||
|  |     layers = registry_model.list_parsed_manifest_layers(repository_ref, parsed) | ||||||
|     assert layers |     assert layers | ||||||
| 
 | 
 | ||||||
|     layers = registry_model.list_manifest_layers(manifest, include_placements=True) |     layers = registry_model.list_parsed_manifest_layers(repository_ref, parsed, | ||||||
|  |                                                         include_placements=True) | ||||||
|     assert layers |     assert layers | ||||||
| 
 | 
 | ||||||
|     parsed_layers = list(manifest.get_parsed_manifest().layers) |     parsed_layers = list(manifest.get_parsed_manifest().layers) | ||||||
|  | @ -491,6 +495,9 @@ def test_list_manifest_layers(repo_namespace, repo_name, registry_model): | ||||||
| 
 | 
 | ||||||
|       assert manifest_layer.estimated_size(1) is not None |       assert manifest_layer.estimated_size(1) is not None | ||||||
| 
 | 
 | ||||||
|  |     blobs = registry_model.get_manifest_local_blobs(manifest, include_placements=True) | ||||||
|  |     assert {b.digest for b in blobs} == set(parsed.local_blob_digests) | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| def test_manifest_remote_layers(oci_model): | def test_manifest_remote_layers(oci_model): | ||||||
|   # Create a config blob for testing. |   # Create a config blob for testing. | ||||||
|  | @ -524,7 +531,8 @@ def test_manifest_remote_layers(oci_model): | ||||||
|                                                                    'sometag', storage) |                                                                    'sometag', storage) | ||||||
|   assert created_manifest |   assert created_manifest | ||||||
| 
 | 
 | ||||||
|   layers = oci_model.list_manifest_layers(created_manifest) |   layers = oci_model.list_parsed_manifest_layers(repository_ref, | ||||||
|  |                                                  created_manifest.get_parsed_manifest()) | ||||||
|   assert len(layers) == 1 |   assert len(layers) == 1 | ||||||
|   assert layers[0].layer_info.is_remote |   assert layers[0].layer_info.is_remote | ||||||
|   assert layers[0].layer_info.urls == ['http://hello/world'] |   assert layers[0].layer_info.urls == ['http://hello/world'] | ||||||
|  | @ -601,25 +609,25 @@ def test_torrent_info(registry_model): | ||||||
|   tag = registry_model.get_repo_tag(repository_ref, 'latest') |   tag = registry_model.get_repo_tag(repository_ref, 'latest') | ||||||
|   manifest = registry_model.get_manifest_for_tag(tag) |   manifest = registry_model.get_manifest_for_tag(tag) | ||||||
| 
 | 
 | ||||||
|   layers = registry_model.list_manifest_layers(manifest) |   blobs = registry_model.get_manifest_local_blobs(manifest) | ||||||
|   assert layers |   assert blobs | ||||||
| 
 | 
 | ||||||
|   assert registry_model.get_torrent_info(layers[0].blob) is None |   assert registry_model.get_torrent_info(blobs[0]) is None | ||||||
|   registry_model.set_torrent_info(layers[0].blob, 2, 'foo') |   registry_model.set_torrent_info(blobs[0], 2, 'foo') | ||||||
| 
 | 
 | ||||||
|   # Set it again exactly, which should be a no-op. |   # Set it again exactly, which should be a no-op. | ||||||
|   registry_model.set_torrent_info(layers[0].blob, 2, 'foo') |   registry_model.set_torrent_info(blobs[0], 2, 'foo') | ||||||
| 
 | 
 | ||||||
|   # Check the information we've set. |   # Check the information we've set. | ||||||
|   torrent_info = registry_model.get_torrent_info(layers[0].blob) |   torrent_info = registry_model.get_torrent_info(blobs[0]) | ||||||
|   assert torrent_info is not None |   assert torrent_info is not None | ||||||
|   assert torrent_info.piece_length == 2 |   assert torrent_info.piece_length == 2 | ||||||
|   assert torrent_info.pieces == 'foo' |   assert torrent_info.pieces == 'foo' | ||||||
| 
 | 
 | ||||||
|   # Try setting it again. Nothing should happen. |   # Try setting it again. Nothing should happen. | ||||||
|   registry_model.set_torrent_info(layers[0].blob, 3, 'bar') |   registry_model.set_torrent_info(blobs[0], 3, 'bar') | ||||||
| 
 | 
 | ||||||
|   torrent_info = registry_model.get_torrent_info(layers[0].blob) |   torrent_info = registry_model.get_torrent_info(blobs[0]) | ||||||
|   assert torrent_info is not None |   assert torrent_info is not None | ||||||
|   assert torrent_info.piece_length == 2 |   assert torrent_info.piece_length == 2 | ||||||
|   assert torrent_info.pieces == 'foo' |   assert torrent_info.pieces == 'foo' | ||||||
|  | @ -680,19 +688,19 @@ def test_mount_blob_into_repository(pre_oci_model): | ||||||
| 
 | 
 | ||||||
|   target_repository_ref = pre_oci_model.lookup_repository('devtable', 'complex') |   target_repository_ref = pre_oci_model.lookup_repository('devtable', 'complex') | ||||||
| 
 | 
 | ||||||
|   layers = pre_oci_model.list_manifest_layers(manifest, include_placements=True) |   blobs = pre_oci_model.get_manifest_local_blobs(manifest, include_placements=True) | ||||||
|   assert layers |   assert blobs | ||||||
| 
 | 
 | ||||||
|   for layer in layers: |   for blob in blobs: | ||||||
|     # Ensure the blob doesn't exist under the repository. |     # Ensure the blob doesn't exist under the repository. | ||||||
|     assert not pre_oci_model.get_repo_blob_by_digest(target_repository_ref, layer.blob.digest) |     assert not pre_oci_model.get_repo_blob_by_digest(target_repository_ref, blob.digest) | ||||||
| 
 | 
 | ||||||
|     # Mount the blob into the repository. |     # Mount the blob into the repository. | ||||||
|     assert pre_oci_model.mount_blob_into_repository(layer.blob, target_repository_ref, 60) |     assert pre_oci_model.mount_blob_into_repository(blob, target_repository_ref, 60) | ||||||
| 
 | 
 | ||||||
|     # Ensure it now exists. |     # Ensure it now exists. | ||||||
|     found = pre_oci_model.get_repo_blob_by_digest(target_repository_ref, layer.blob.digest) |     found = pre_oci_model.get_repo_blob_by_digest(target_repository_ref, blob.digest) | ||||||
|     assert found == layer.blob |     assert found == blob | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class SomeException(Exception): | class SomeException(Exception): | ||||||
|  | @ -706,10 +714,10 @@ def test_get_cached_repo_blob(registry_model): | ||||||
|   latest_tag = registry_model.get_repo_tag(repository_ref, 'latest') |   latest_tag = registry_model.get_repo_tag(repository_ref, 'latest') | ||||||
|   manifest = registry_model.get_manifest_for_tag(latest_tag) |   manifest = registry_model.get_manifest_for_tag(latest_tag) | ||||||
| 
 | 
 | ||||||
|   layers = registry_model.list_manifest_layers(manifest, include_placements=True) |   blobs = registry_model.get_manifest_local_blobs(manifest, include_placements=True) | ||||||
|   assert layers |   assert blobs | ||||||
| 
 | 
 | ||||||
|   blob = layers[0].blob |   blob = blobs[0] | ||||||
| 
 | 
 | ||||||
|   # Load a blob to add it to the cache. |   # Load a blob to add it to the cache. | ||||||
|   found = registry_model.get_cached_repo_blob(model_cache, 'devtable', 'simple', blob.digest) |   found = registry_model.get_cached_repo_blob(model_cache, 'devtable', 'simple', blob.digest) | ||||||
|  | @ -764,9 +772,6 @@ def test_create_manifest_and_retarget_tag(registry_model): | ||||||
|   assert tag.name == 'anothertag' |   assert tag.name == 'anothertag' | ||||||
|   assert another_manifest.get_parsed_manifest().manifest_dict == sample_manifest.manifest_dict |   assert another_manifest.get_parsed_manifest().manifest_dict == sample_manifest.manifest_dict | ||||||
| 
 | 
 | ||||||
|   layers = registry_model.list_manifest_layers(another_manifest) |  | ||||||
|   assert len(layers) == 1 |  | ||||||
| 
 |  | ||||||
| 
 | 
 | ||||||
| def test_get_schema1_parsed_manifest(registry_model): | def test_get_schema1_parsed_manifest(registry_model): | ||||||
|   repository_ref = registry_model.lookup_repository('devtable', 'simple') |   repository_ref = registry_model.lookup_repository('devtable', 'simple') | ||||||
|  | @ -804,8 +809,5 @@ def test_create_manifest_and_retarget_tag_with_labels(registry_model): | ||||||
|   assert tag.name == 'anothertag' |   assert tag.name == 'anothertag' | ||||||
|   assert another_manifest.get_parsed_manifest().manifest_dict == sample_manifest.manifest_dict |   assert another_manifest.get_parsed_manifest().manifest_dict == sample_manifest.manifest_dict | ||||||
| 
 | 
 | ||||||
|   layers = registry_model.list_manifest_layers(another_manifest) |  | ||||||
|   assert len(layers) == 1 |  | ||||||
| 
 |  | ||||||
|   # Ensure the labels were applied. |   # Ensure the labels were applied. | ||||||
|   assert tag.lifetime_end_ms is not None |   assert tag.lifetime_end_ms is not None | ||||||
|  |  | ||||||
|  | @ -250,13 +250,13 @@ def _write_manifest_and_log(namespace_name, repo_name, tag_name, manifest_impl): | ||||||
| 
 | 
 | ||||||
|   # Queue all blob manifests for replication. |   # Queue all blob manifests for replication. | ||||||
|   if features.STORAGE_REPLICATION: |   if features.STORAGE_REPLICATION: | ||||||
|     layers = registry_model.list_manifest_layers(manifest) |     blobs = registry_model.get_manifest_local_blobs(manifest) | ||||||
|     if layers is None: |     if blobs is None: | ||||||
|       raise ManifestInvalid() |       logger.error('Could not lookup blobs for manifest `%s`', manifest.digest) | ||||||
| 
 |     else: | ||||||
|       with queue_replication_batch(namespace_name) as queue_storage_replication: |       with queue_replication_batch(namespace_name) as queue_storage_replication: | ||||||
|       for layer in layers: |         for blob_digest in blobs: | ||||||
|         queue_storage_replication(layer.blob) |           queue_storage_replication(blob_digest) | ||||||
| 
 | 
 | ||||||
|   track_and_log('push_repo', repository_ref, tag=tag_name) |   track_and_log('push_repo', repository_ref, tag=tag_name) | ||||||
|   spawn_notification(repository_ref, 'repo_push', {'updated_tags': [tag_name]}) |   spawn_notification(repository_ref, 'repo_push', {'updated_tags': [tag_name]}) | ||||||
|  |  | ||||||
|  | @ -459,6 +459,24 @@ def test_image_replication(pusher, puller, basic_images, liveserver_session, app | ||||||
|       assert r.text == 'OK' |       assert r.text == 'OK' | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | def test_image_replication_empty_layers(pusher, puller, images_with_empty_layer, liveserver_session, | ||||||
|  |                                         app_reloader, liveserver, registry_server_executor): | ||||||
|  |   """ Test: Ensure that entries are created for replication of the images pushed. """ | ||||||
|  |   credentials = ('devtable', 'password') | ||||||
|  | 
 | ||||||
|  |   with FeatureFlagValue('STORAGE_REPLICATION', True, registry_server_executor.on(liveserver)): | ||||||
|  |     pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', images_with_empty_layer, | ||||||
|  |                 credentials=credentials) | ||||||
|  | 
 | ||||||
|  |     result = puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', | ||||||
|  |                          images_with_empty_layer, credentials=credentials) | ||||||
|  | 
 | ||||||
|  |     # Ensure that entries were created for each image. | ||||||
|  |     for image_id in result.image_ids.values(): | ||||||
|  |       r = registry_server_executor.on(liveserver).get_storage_replication_entry(image_id) | ||||||
|  |       assert r.text == 'OK' | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| @pytest.mark.parametrize('repo_name, expected_failure', [ | @pytest.mark.parametrize('repo_name, expected_failure', [ | ||||||
|   ('something', None), |   ('something', None), | ||||||
|   ('some/slash', Failures.SLASH_REPOSITORY), |   ('some/slash', Failures.SLASH_REPOSITORY), | ||||||
|  |  | ||||||
		Reference in a new issue