Merge pull request #3362 from quay/shared-layer-fixes
Shared blob fixes and optimizations
This commit is contained in:
		
						commit
						39db907172
					
				
					 9 changed files with 121 additions and 26 deletions
				
			
		|  | @ -164,6 +164,18 @@ def initiate_upload(namespace, repo_name, uuid, location_name, storage_metadata) | ||||||
|                            storage_metadata=storage_metadata) |                            storage_metadata=storage_metadata) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | def get_shared_blob(digest): | ||||||
|  |   """ Returns the ImageStorage blob with the given digest or, if not present, | ||||||
|  |       returns None. This method is *only* to be used for shared blobs that are | ||||||
|  |       globally accessible, such as the special empty gzipped tar layer that Docker | ||||||
|  |       no longer pushes to us. | ||||||
|  |   """ | ||||||
|  |   try: | ||||||
|  |     return ImageStorage.get(content_checksum=digest, uploading=False) | ||||||
|  |   except ImageStorage.DoesNotExist: | ||||||
|  |     return None | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| def get_or_create_shared_blob(digest, byte_data, storage): | def get_or_create_shared_blob(digest, byte_data, storage): | ||||||
|   """ Returns the ImageStorage blob with the given digest or, if not present, |   """ Returns the ImageStorage blob with the given digest or, if not present, | ||||||
|       adds a row and writes the given byte data to the storage engine. |       adds a row and writes the given byte data to the storage engine. | ||||||
|  |  | ||||||
|  | @ -7,7 +7,7 @@ from peewee import IntegrityError, JOIN | ||||||
| from data.database import (Tag, Manifest, ManifestBlob, ManifestLegacyImage, ManifestChild, | from data.database import (Tag, Manifest, ManifestBlob, ManifestLegacyImage, ManifestChild, | ||||||
|                            db_transaction) |                            db_transaction) | ||||||
| from data.model import BlobDoesNotExist | from data.model import BlobDoesNotExist | ||||||
| from data.model.blob import get_or_create_shared_blob | from data.model.blob import get_or_create_shared_blob, get_shared_blob | ||||||
| from data.model.oci.tag import filter_to_alive_tags | from data.model.oci.tag import filter_to_alive_tags | ||||||
| from data.model.oci.label import create_manifest_label | from data.model.oci.label import create_manifest_label | ||||||
| from data.model.oci.retriever import RepositoryContentRetriever | from data.model.oci.retriever import RepositoryContentRetriever | ||||||
|  | @ -108,9 +108,20 @@ def _create_manifest(repository_id, manifest_interface_instance, storage): | ||||||
|   # Ensure all the blobs in the manifest exist. |   # Ensure all the blobs in the manifest exist. | ||||||
|   digests = set(manifest_interface_instance.local_blob_digests) |   digests = set(manifest_interface_instance.local_blob_digests) | ||||||
|   blob_map = {} |   blob_map = {} | ||||||
|  | 
 | ||||||
|  |   # If the special empty layer is required, simply load it directly. This is much faster | ||||||
|  |   # than trying to load it on a per repository basis, and that is unnecessary anyway since | ||||||
|  |   # this layer is predefined. | ||||||
|  |   if EMPTY_LAYER_BLOB_DIGEST in digests: | ||||||
|  |     digests.remove(EMPTY_LAYER_BLOB_DIGEST) | ||||||
|  |     blob_map[EMPTY_LAYER_BLOB_DIGEST] = get_shared_blob(EMPTY_LAYER_BLOB_DIGEST) | ||||||
|  |     if not blob_map[EMPTY_LAYER_BLOB_DIGEST]: | ||||||
|  |       logger.warning('Could not find the special empty blob in storage') | ||||||
|  |       return None | ||||||
|  | 
 | ||||||
|   if digests: |   if digests: | ||||||
|     query = lookup_repo_storages_by_content_checksum(repository_id, digests) |     query = lookup_repo_storages_by_content_checksum(repository_id, digests) | ||||||
|     blob_map = {s.content_checksum: s for s in query} |     blob_map.update({s.content_checksum: s for s in query}) | ||||||
|     for digest_str in digests: |     for digest_str in digests: | ||||||
|       if digest_str not in blob_map: |       if digest_str not in blob_map: | ||||||
|         logger.warning('Unknown blob `%s` under manifest `%s` for repository `%s`', digest_str, |         logger.warning('Unknown blob `%s` under manifest `%s` for repository `%s`', digest_str, | ||||||
|  | @ -120,11 +131,12 @@ def _create_manifest(repository_id, manifest_interface_instance, storage): | ||||||
|   # Special check: If the empty layer blob is needed for this manifest, add it to the |   # Special check: If the empty layer blob is needed for this manifest, add it to the | ||||||
|   # blob map. This is necessary because Docker decided to elide sending of this special |   # blob map. This is necessary because Docker decided to elide sending of this special | ||||||
|   # empty layer in schema version 2, but we need to have it referenced for GC and schema version 1. |   # empty layer in schema version 2, but we need to have it referenced for GC and schema version 1. | ||||||
|   if manifest_interface_instance.get_requires_empty_layer_blob(retriever): |   if EMPTY_LAYER_BLOB_DIGEST not in blob_map: | ||||||
|     shared_blob = get_or_create_shared_blob(EMPTY_LAYER_BLOB_DIGEST, EMPTY_LAYER_BYTES, storage) |     if manifest_interface_instance.get_requires_empty_layer_blob(retriever): | ||||||
|     assert not shared_blob.uploading |       shared_blob = get_or_create_shared_blob(EMPTY_LAYER_BLOB_DIGEST, EMPTY_LAYER_BYTES, storage) | ||||||
|     assert shared_blob.content_checksum == EMPTY_LAYER_BLOB_DIGEST |       assert not shared_blob.uploading | ||||||
|     blob_map[EMPTY_LAYER_BLOB_DIGEST] = shared_blob |       assert shared_blob.content_checksum == EMPTY_LAYER_BLOB_DIGEST | ||||||
|  |       blob_map[EMPTY_LAYER_BLOB_DIGEST] = shared_blob | ||||||
| 
 | 
 | ||||||
|   # Determine and populate the legacy image if necessary. Manifest lists will not have a legacy |   # Determine and populate the legacy image if necessary. Manifest lists will not have a legacy | ||||||
|   # image. |   # image. | ||||||
|  |  | ||||||
|  | @ -264,6 +264,9 @@ def get_layer_path_for_storage(storage_uuid, cas_path, content_checksum): | ||||||
| def lookup_repo_storages_by_content_checksum(repo, checksums, by_manifest=False): | def lookup_repo_storages_by_content_checksum(repo, checksums, by_manifest=False): | ||||||
|   """ Looks up repository storages (without placements) matching the given repository |   """ Looks up repository storages (without placements) matching the given repository | ||||||
|       and checksum. """ |       and checksum. """ | ||||||
|  |   if not checksums: | ||||||
|  |     return [] | ||||||
|  | 
 | ||||||
|   # There may be many duplicates of the checksums, so for performance reasons we are going |   # There may be many duplicates of the checksums, so for performance reasons we are going | ||||||
|   # to use a union to select just one storage with each checksum |   # to use a union to select just one storage with each checksum | ||||||
|   queries = [] |   queries = [] | ||||||
|  |  | ||||||
|  | @ -1,3 +1,4 @@ | ||||||
|  | from app import storage | ||||||
| from data import model, database | from data import model, database | ||||||
| 
 | 
 | ||||||
| from test.fixtures import * | from test.fixtures import * | ||||||
|  | @ -30,3 +31,19 @@ def test_store_blob(initialized_db): | ||||||
|   assert blob_storage3.id != blob_storage.id |   assert blob_storage3.id != blob_storage.id | ||||||
|   assert blob_storage3.image_size == 1234 |   assert blob_storage3.image_size == 1234 | ||||||
|   assert blob_storage3.uncompressed_size == 5678 |   assert blob_storage3.uncompressed_size == 5678 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_get_or_create_shared_blob(initialized_db): | ||||||
|  |   shared = model.blob.get_or_create_shared_blob('sha256:abcdef', 'somecontent', storage) | ||||||
|  |   assert shared.content_checksum == 'sha256:abcdef' | ||||||
|  | 
 | ||||||
|  |   again = model.blob.get_or_create_shared_blob('sha256:abcdef', 'somecontent', storage) | ||||||
|  |   assert shared == again | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def test_lookup_repo_storages_by_content_checksum(initialized_db): | ||||||
|  |   for image in database.Image.select(): | ||||||
|  |     found = model.storage.lookup_repo_storages_by_content_checksum(image.repository, | ||||||
|  |                                                                    [image.storage.content_checksum]) | ||||||
|  |     assert len(found) == 1 | ||||||
|  |     assert found[0].content_checksum == image.storage.content_checksum | ||||||
|  |  | ||||||
|  | @ -565,11 +565,13 @@ class OCIModel(SharedModel, RegistryDataInterface): | ||||||
|     there may be multiple records in the same repository for the same blob digest, so the return |     there may be multiple records in the same repository for the same blob digest, so the return | ||||||
|     value of this function may change. |     value of this function may change. | ||||||
|     """ |     """ | ||||||
|     image_storage = oci.blob.get_repository_blob_by_digest(repository_ref._db_id, blob_digest) |     image_storage = self._get_shared_storage(blob_digest) | ||||||
|     if image_storage is None: |     if image_storage is None: | ||||||
|       return None |       image_storage = oci.blob.get_repository_blob_by_digest(repository_ref._db_id, blob_digest) | ||||||
|  |       if image_storage is None: | ||||||
|  |         return None | ||||||
| 
 | 
 | ||||||
|     assert image_storage.cas_path is not None |       assert image_storage.cas_path is not None | ||||||
| 
 | 
 | ||||||
|     placements = None |     placements = None | ||||||
|     if include_placements: |     if include_placements: | ||||||
|  |  | ||||||
|  | @ -120,7 +120,7 @@ class PreOCIModel(SharedModel, RegistryDataInterface): | ||||||
| 
 | 
 | ||||||
|     # Ensure all the blobs in the manifest exist. |     # Ensure all the blobs in the manifest exist. | ||||||
|     digests = manifest_interface_instance.checksums |     digests = manifest_interface_instance.checksums | ||||||
|     query = model.storage.lookup_repo_storages_by_content_checksum(repository_ref._db_id, digests) |     query = self._lookup_repo_storages_by_content_checksum(repository_ref._db_id, digests) | ||||||
|     blob_map = {s.content_checksum: s for s in query} |     blob_map = {s.content_checksum: s for s in query} | ||||||
|     for layer in manifest_interface_instance.layers: |     for layer in manifest_interface_instance.layers: | ||||||
|       digest_str = str(layer.digest) |       digest_str = str(layer.digest) | ||||||
|  | @ -481,9 +481,7 @@ class PreOCIModel(SharedModel, RegistryDataInterface): | ||||||
|     if manifest is None: |     if manifest is None: | ||||||
|       return None |       return None | ||||||
| 
 | 
 | ||||||
|     blob_query = model.storage.lookup_repo_storages_by_content_checksum(repo, |     blob_query = self._lookup_repo_storages_by_content_checksum(repo, manifest.checksums) | ||||||
|                                                                         manifest.checksums) |  | ||||||
| 
 |  | ||||||
|     storage_map = {blob.content_checksum: blob.id for blob in blob_query} |     storage_map = {blob.content_checksum: blob.id for blob in blob_query} | ||||||
|     try: |     try: | ||||||
|       tag_manifest, _ = model.tag.associate_generated_tag_manifest_with_tag(tag_obj, manifest, |       tag_manifest, _ = model.tag.associate_generated_tag_manifest_with_tag(tag_obj, manifest, | ||||||
|  | @ -585,10 +583,12 @@ class PreOCIModel(SharedModel, RegistryDataInterface): | ||||||
|     there may be multiple records in the same repository for the same blob digest, so the return |     there may be multiple records in the same repository for the same blob digest, so the return | ||||||
|     value of this function may change. |     value of this function may change. | ||||||
|     """ |     """ | ||||||
|     try: |     image_storage = self._get_shared_storage(blob_digest) | ||||||
|       image_storage = model.blob.get_repository_blob_by_digest(repository_ref._db_id, blob_digest) |     if image_storage is None: | ||||||
|     except model.BlobDoesNotExist: |       try: | ||||||
|       return None |         image_storage = model.blob.get_repository_blob_by_digest(repository_ref._db_id, blob_digest) | ||||||
|  |       except model.BlobDoesNotExist: | ||||||
|  |         return None | ||||||
| 
 | 
 | ||||||
|     assert image_storage.cas_path is not None |     assert image_storage.cas_path is not None | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -8,6 +8,7 @@ from data import database | ||||||
| from data import model | from data import model | ||||||
| from data.cache import cache_key | from data.cache import cache_key | ||||||
| from data.model.oci.retriever import RepositoryContentRetriever | from data.model.oci.retriever import RepositoryContentRetriever | ||||||
|  | from data.model.blob import get_shared_blob | ||||||
| from data.registry_model.datatype import FromDictionaryException | from data.registry_model.datatype import FromDictionaryException | ||||||
| from data.registry_model.datatypes import (RepositoryReference, Blob, TorrentInfo, BlobUpload, | from data.registry_model.datatypes import (RepositoryReference, Blob, TorrentInfo, BlobUpload, | ||||||
|                                            LegacyImage, ManifestLayer, DerivedImage) |                                            LegacyImage, ManifestLayer, DerivedImage) | ||||||
|  | @ -323,9 +324,8 @@ class SharedModel: | ||||||
|     if not len(local_blob_digests): |     if not len(local_blob_digests): | ||||||
|       return [] |       return [] | ||||||
| 
 | 
 | ||||||
|     blob_query = model.storage.lookup_repo_storages_by_content_checksum(repo_id, |     blob_query = self._lookup_repo_storages_by_content_checksum(repo_id, local_blob_digests, | ||||||
|                                                                         local_blob_digests, |                                                                 by_manifest=by_manifest) | ||||||
|                                                                         by_manifest=by_manifest) |  | ||||||
|     blobs = [] |     blobs = [] | ||||||
|     for image_storage in blob_query: |     for image_storage in blob_query: | ||||||
|       placements = None |       placements = None | ||||||
|  | @ -356,9 +356,8 @@ class SharedModel: | ||||||
|       blob_digests.append(EMPTY_LAYER_BLOB_DIGEST) |       blob_digests.append(EMPTY_LAYER_BLOB_DIGEST) | ||||||
| 
 | 
 | ||||||
|     if blob_digests: |     if blob_digests: | ||||||
|       blob_query = model.storage.lookup_repo_storages_by_content_checksum(repo_id, |       blob_query = self._lookup_repo_storages_by_content_checksum(repo_id, blob_digests, | ||||||
|                                                                           blob_digests, |                                                                   by_manifest=by_manifest) | ||||||
|                                                                           by_manifest=by_manifest) |  | ||||||
|       storage_map = {blob.content_checksum: blob for blob in blob_query} |       storage_map = {blob.content_checksum: blob for blob in blob_query} | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -441,3 +440,31 @@ class SharedModel: | ||||||
| 
 | 
 | ||||||
|     # Sign the manifest with our signing key. |     # Sign the manifest with our signing key. | ||||||
|     return builder.build(docker_v2_signing_key) |     return builder.build(docker_v2_signing_key) | ||||||
|  | 
 | ||||||
|  |   def _get_shared_storage(self, blob_digest): | ||||||
|  |     """ Returns an ImageStorage row for the blob digest if it is a globally shared storage. """ | ||||||
|  |     # If the EMPTY_LAYER_BLOB_DIGEST is in the checksums, look it up directly. Since we have | ||||||
|  |     # so many duplicate copies in the database currently, looking it up bound to a repository | ||||||
|  |     # can be incredibly slow, and, since it is defined as a globally shared layer, this is extra | ||||||
|  |     # work we don't need to do. | ||||||
|  |     if blob_digest == EMPTY_LAYER_BLOB_DIGEST: | ||||||
|  |       return get_shared_blob(EMPTY_LAYER_BLOB_DIGEST) | ||||||
|  | 
 | ||||||
|  |     return None | ||||||
|  | 
 | ||||||
|  |   def _lookup_repo_storages_by_content_checksum(self, repo, checksums, by_manifest=False): | ||||||
|  |     checksums = set(checksums) | ||||||
|  | 
 | ||||||
|  |     # Load any shared storages first. | ||||||
|  |     extra_storages = [] | ||||||
|  |     for checksum in list(checksums): | ||||||
|  |       shared_storage = self._get_shared_storage(checksum) | ||||||
|  |       if shared_storage is not None: | ||||||
|  |         extra_storages.append(shared_storage) | ||||||
|  |         checksums.remove(checksum) | ||||||
|  | 
 | ||||||
|  |     found = [] | ||||||
|  |     if checksums: | ||||||
|  |       found = list(model.storage.lookup_repo_storages_by_content_checksum(repo, checksums, | ||||||
|  |                                                                           by_manifest=by_manifest)) | ||||||
|  |     return found + extra_storages | ||||||
|  |  | ||||||
|  | @ -132,7 +132,8 @@ def _try_to_mount_blob(repository_ref, mount_blob_digest): | ||||||
|       return None |       return None | ||||||
| 
 | 
 | ||||||
|   # Lookup if the mount blob's digest exists in the repository. |   # Lookup if the mount blob's digest exists in the repository. | ||||||
|   mount_blob = registry_model.get_repo_blob_by_digest(from_repository_ref, mount_blob_digest) |   mount_blob = registry_model.get_cached_repo_blob(model_cache, from_namespace, from_repo_name, | ||||||
|  |                                                    mount_blob_digest) | ||||||
|   if mount_blob is None: |   if mount_blob is None: | ||||||
|     logger.debug('Blob `%s` under repository `%s` not found', mount_blob_digest, from_repo) |     logger.debug('Blob `%s` under repository `%s` not found', mount_blob_digest, from_repo) | ||||||
|     return None |     return None | ||||||
|  |  | ||||||
|  | @ -1204,7 +1204,8 @@ def test_blob_mounting(push_user, push_namespace, push_repo, mount_repo_name, ex | ||||||
|   options.mount_blobs = {'sha256:' + hashlib.sha256(image.bytes).hexdigest(): mount_repo_name |   options.mount_blobs = {'sha256:' + hashlib.sha256(image.bytes).hexdigest(): mount_repo_name | ||||||
|                          for image in basic_images} |                          for image in basic_images} | ||||||
| 
 | 
 | ||||||
|   manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', basic_images, |   manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', | ||||||
|  |                          basic_images, | ||||||
|                          credentials=('devtable', 'password'), |                          credentials=('devtable', 'password'), | ||||||
|                          options=options, |                          options=options, | ||||||
|                          expected_failure=expected_failure) |                          expected_failure=expected_failure) | ||||||
|  | @ -1215,6 +1216,26 @@ def test_blob_mounting(push_user, push_namespace, push_repo, mount_repo_name, ex | ||||||
|                 credentials=('devtable', 'password')) |                 credentials=('devtable', 'password')) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | def test_blob_mounting_with_empty_layers(manifest_protocol, pusher, puller, images_with_empty_layer, | ||||||
|  |                                          liveserver_session, app_reloader): | ||||||
|  |   # Push an image so we can attempt to mount it. | ||||||
|  |   pusher.push(liveserver_session, 'devtable', 'simple', 'latest', images_with_empty_layer, | ||||||
|  |               credentials=('devtable', 'password')) | ||||||
|  | 
 | ||||||
|  |   # Push again, trying to mount the image layer(s) from the mount repo. | ||||||
|  |   options = ProtocolOptions() | ||||||
|  |   options.scopes = ['repository:devtable/newrepo:push,pull', | ||||||
|  |                     'repository:%s:pull' % ('devtable/simple')] | ||||||
|  |   options.mount_blobs = {'sha256:' + hashlib.sha256(image.bytes).hexdigest(): 'devtable/simple' | ||||||
|  |                          for image in images_with_empty_layer} | ||||||
|  |   options.skip_head_checks = True | ||||||
|  | 
 | ||||||
|  |   manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', 'latest', | ||||||
|  |                          images_with_empty_layer, | ||||||
|  |                          credentials=('devtable', 'password'), | ||||||
|  |                          options=options) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| def get_robot_password(api_caller): | def get_robot_password(api_caller): | ||||||
|   api_caller.conduct_auth('devtable', 'password') |   api_caller.conduct_auth('devtable', 'password') | ||||||
|   resp = api_caller.get('/api/v1/organization/buynlarge/robots/ownerbot') |   resp = api_caller.get('/api/v1/organization/buynlarge/robots/ownerbot') | ||||||
|  |  | ||||||
		Reference in a new issue