Merge pull request #3250 from quay/joseph.schorr/QUAY-1030/interfacing-part-9
Implement blob uploader and change V1 to use it
This commit is contained in:
		
						commit
						468e5a8fc2
					
				
					 14 changed files with 717 additions and 84 deletions
				
			
		
							
								
								
									
										310
									
								
								data/registry_model/blobuploader.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										310
									
								
								data/registry_model/blobuploader.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,310 @@ | |||
| import logging | ||||
| import time | ||||
| 
 | ||||
| from contextlib import contextmanager | ||||
| from collections import namedtuple | ||||
| 
 | ||||
| import bitmath | ||||
| import resumablehashlib | ||||
| 
 | ||||
| from data.registry_model import registry_model | ||||
| from data.database import CloseForLongOperation, db_transaction | ||||
| from digest import digest_tools | ||||
| from util.registry.filelike import wrap_with_handler, StreamSlice | ||||
| from util.registry.gzipstream import calculate_size_handler | ||||
| from util.registry.torrent import PieceHasher | ||||
| 
 | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| BLOB_CONTENT_TYPE = 'application/octet-stream' | ||||
| 
 | ||||
| 
 | ||||
| class BlobUploadException(Exception): | ||||
|   """ Base for all exceptions raised when uploading blobs. """ | ||||
| 
 | ||||
| class BlobDigestMismatchException(BlobUploadException): | ||||
|   """ Exception raised if the digest requested does not match that of the contents uploaded. """ | ||||
| 
 | ||||
| class BlobTooLargeException(BlobUploadException): | ||||
|   """ Exception raised if the data uploaded exceeds the maximum_blob_size. """ | ||||
|   def __init__(self, uploaded, max_allowed): | ||||
|     super(BlobTooLargeException, self).__init__() | ||||
|     self.uploaded = uploaded | ||||
|     self.max_allowed = max_allowed | ||||
| 
 | ||||
| 
 | ||||
| BlobUploadSettings = namedtuple('BlobUploadSettings', ['maximum_blob_size', 'bittorrent_piece_size', | ||||
|                                                        'committed_blob_expiration']) | ||||
| 
 | ||||
| 
 | ||||
| def create_blob_upload(repository_ref, storage, settings, extra_blob_stream_handlers=None): | ||||
|   """ Creates a new blob upload in the specified repository and returns a manager for interacting | ||||
|       with that upload. Returns None if a new blob upload could not be started. | ||||
|   """ | ||||
|   location_name = storage.preferred_locations[0] | ||||
|   new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name) | ||||
|   blob_upload = registry_model.create_blob_upload(repository_ref, new_upload_uuid, location_name, | ||||
|                                                   upload_metadata) | ||||
|   if blob_upload is None: | ||||
|     return None | ||||
| 
 | ||||
|   return _BlobUploadManager(repository_ref, blob_upload, settings, storage, | ||||
|                             extra_blob_stream_handlers) | ||||
| 
 | ||||
| 
 | ||||
| def retrieve_blob_upload_manager(repository_ref, blob_upload_id, storage, settings): | ||||
|   """ Retrieves the manager for an in-progress blob upload with the specified ID under the given | ||||
|       repository or None if none. | ||||
|   """ | ||||
|   blob_upload = registry_model.lookup_blob_upload(repository_ref, blob_upload_id) | ||||
|   if blob_upload is None: | ||||
|     return None | ||||
| 
 | ||||
|   return _BlobUploadManager(repository_ref, blob_upload, settings, storage) | ||||
| 
 | ||||
| 
 | ||||
| @contextmanager | ||||
| def upload_blob(repository_ref, storage, settings, extra_blob_stream_handlers=None): | ||||
|   """ Starts a new blob upload in the specified repository and yields a manager for interacting | ||||
|       with that upload. When the context manager completes, the blob upload is deleted, whether | ||||
|       committed to a blob or not. Yields None if a blob upload could not be started. | ||||
|   """ | ||||
|   created = create_blob_upload(repository_ref, storage, settings, extra_blob_stream_handlers) | ||||
|   if not created: | ||||
|     yield None | ||||
|     return | ||||
| 
 | ||||
|   try: | ||||
|     yield created | ||||
|   except Exception as ex: | ||||
|     logger.exception('Exception when uploading blob `%s`', created.blob_upload_id) | ||||
|     raise ex | ||||
|   finally: | ||||
|     # Cancel the upload if something went wrong or it was not commit to a blob. | ||||
|     if created.committed_blob is None: | ||||
|       created.cancel_upload() | ||||
| 
 | ||||
| 
 | ||||
| class _BlobUploadManager(object): | ||||
|   """ Defines a helper class for easily interacting with blob uploads in progress, including | ||||
|       handling of database and storage calls. | ||||
|   """ | ||||
|   def __init__(self, repository_ref, blob_upload, settings, storage, | ||||
|                extra_blob_stream_handlers=None): | ||||
|     self.repository_ref = repository_ref | ||||
|     self.blob_upload = blob_upload | ||||
|     self.settings = settings | ||||
|     self.storage = storage | ||||
|     self.extra_blob_stream_handlers = extra_blob_stream_handlers | ||||
|     self.committed_blob = None | ||||
| 
 | ||||
|   @property | ||||
|   def blob_upload_id(self): | ||||
|     """ Returns the unique ID for the blob upload. """ | ||||
|     return self.blob_upload.upload_id | ||||
| 
 | ||||
|   def upload_chunk(self, app_config, input_fp, start_offset=0, length=-1, metric_queue=None): | ||||
|     """ Uploads a chunk of data found in the given input file-like interface. start_offset and | ||||
|         length are optional and should match a range header if any was given. | ||||
| 
 | ||||
|         If metric_queue is given, the upload time and chunk size are written into the metrics in | ||||
|         the queue. | ||||
| 
 | ||||
|         Returns the total number of bytes uploaded after this upload has completed. Raises | ||||
|         a BlobUploadException if the upload failed. | ||||
|     """ | ||||
|     assert start_offset is not None | ||||
|     assert length is not None | ||||
| 
 | ||||
|     if start_offset > 0 and start_offset > self.blob_upload.byte_count: | ||||
|       logger.error('start_offset provided greater than blob_upload.byte_count') | ||||
|       return None | ||||
| 
 | ||||
|     # Ensure that we won't go over the allowed maximum size for blobs. | ||||
|     max_blob_size = bitmath.parse_string_unsafe(self.settings.maximum_blob_size) | ||||
|     uploaded = bitmath.Byte(length + start_offset) | ||||
|     if length > -1 and uploaded > max_blob_size: | ||||
|       raise BlobTooLargeException(uploaded=uploaded.bytes, max_allowed=max_blob_size.bytes) | ||||
| 
 | ||||
|     location_set = {self.blob_upload.location_name} | ||||
|     upload_error = None | ||||
|     with CloseForLongOperation(app_config): | ||||
|       if start_offset > 0 and start_offset < self.blob_upload.byte_count: | ||||
|         # Skip the bytes which were received on a previous push, which are already stored and | ||||
|         # included in the sha calculation | ||||
|         overlap_size = self.blob_upload.byte_count - start_offset | ||||
|         input_fp = StreamSlice(input_fp, overlap_size) | ||||
| 
 | ||||
|         # Update our upload bounds to reflect the skipped portion of the overlap | ||||
|         start_offset = self.blob_upload.byte_count | ||||
|         length = max(length - overlap_size, 0) | ||||
| 
 | ||||
|       # We use this to escape early in case we have already processed all of the bytes the user | ||||
|       # wants to upload. | ||||
|       if length == 0: | ||||
|         return self.blob_upload.byte_count | ||||
| 
 | ||||
|       input_fp = wrap_with_handler(input_fp, self.blob_upload.sha_state.update) | ||||
| 
 | ||||
|       if self.extra_blob_stream_handlers: | ||||
|         for handler in self.extra_blob_stream_handlers: | ||||
|           input_fp = wrap_with_handler(input_fp, handler) | ||||
| 
 | ||||
|       # Add a hasher for calculating SHA1s for torrents if this is the first chunk and/or we have | ||||
|       # already calculated hash data for the previous chunk(s). | ||||
|       piece_hasher = None | ||||
|       if self.blob_upload.chunk_count == 0 or self.blob_upload.piece_sha_state: | ||||
|         initial_sha1_value = self.blob_upload.piece_sha_state or resumablehashlib.sha1() | ||||
|         initial_sha1_pieces_value = self.blob_upload.piece_hashes or '' | ||||
| 
 | ||||
|         piece_hasher = PieceHasher(self.settings.bittorrent_piece_size, start_offset, | ||||
|                                    initial_sha1_pieces_value, initial_sha1_value) | ||||
|         input_fp = wrap_with_handler(input_fp, piece_hasher.update) | ||||
| 
 | ||||
|       # If this is the first chunk and we're starting at the 0 offset, add a handler to gunzip the | ||||
|       # stream so we can determine the uncompressed size. We'll throw out this data if another chunk | ||||
|       # comes in, but in the common case the docker client only sends one chunk. | ||||
|       size_info = None | ||||
|       if start_offset == 0 and self.blob_upload.chunk_count == 0: | ||||
|         size_info, fn = calculate_size_handler() | ||||
|         input_fp = wrap_with_handler(input_fp, fn) | ||||
| 
 | ||||
|       start_time = time.time() | ||||
|       length_written, new_metadata, upload_error = self.storage.stream_upload_chunk( | ||||
|         location_set, | ||||
|         self.blob_upload.upload_id, | ||||
|         start_offset, | ||||
|         length, | ||||
|         input_fp, | ||||
|         self.blob_upload.storage_metadata, | ||||
|         content_type=BLOB_CONTENT_TYPE, | ||||
|       ) | ||||
| 
 | ||||
|       if upload_error is not None: | ||||
|         logger.error('storage.stream_upload_chunk returned error %s', upload_error) | ||||
|         raise BlobUploadException(upload_error) | ||||
| 
 | ||||
|       # Update the chunk upload time metric. | ||||
|       if metric_queue is not None: | ||||
|         metric_queue.chunk_upload_time.Observe(time.time() - start_time, labelvalues=[ | ||||
|           length_written, list(location_set)[0]]) | ||||
| 
 | ||||
|     # Ensure we have not gone beyond the max layer size. | ||||
|     new_blob_bytes = self.blob_upload.byte_count + length_written | ||||
|     new_blob_size = bitmath.Byte(new_blob_bytes) | ||||
|     if new_blob_size > max_blob_size: | ||||
|       raise BlobTooLargeException(uploaded=new_blob_size, max_allowed=max_blob_size.bytes) | ||||
| 
 | ||||
|     # If we determined an uncompressed size and this is the first chunk, add it to the blob. | ||||
|     # Otherwise, we clear the size from the blob as it was uploaded in multiple chunks. | ||||
|     uncompressed_byte_count = self.blob_upload.uncompressed_byte_count | ||||
|     if size_info is not None and self.blob_upload.chunk_count == 0 and size_info.is_valid: | ||||
|       uncompressed_byte_count = size_info.uncompressed_size | ||||
|     elif length_written > 0: | ||||
|       # Otherwise, if we wrote some bytes and the above conditions were not met, then we don't | ||||
|       # know the uncompressed size. | ||||
|       uncompressed_byte_count = None | ||||
| 
 | ||||
|     piece_hashes = None | ||||
|     piece_sha_state = None | ||||
|     if piece_hasher is not None: | ||||
|       piece_hashes = piece_hasher.piece_hashes | ||||
|       piece_sha_state = piece_hasher.hash_fragment | ||||
| 
 | ||||
|     self.blob_upload = registry_model.update_blob_upload(self.blob_upload, | ||||
|                                                          uncompressed_byte_count, | ||||
|                                                          piece_hashes, | ||||
|                                                          piece_sha_state, | ||||
|                                                          new_metadata, | ||||
|                                                          new_blob_bytes, | ||||
|                                                          self.blob_upload.chunk_count + 1, | ||||
|                                                          self.blob_upload.sha_state) | ||||
|     if self.blob_upload is None: | ||||
|       raise BlobUploadException('Could not complete upload of chunk') | ||||
| 
 | ||||
|     return new_blob_bytes | ||||
| 
 | ||||
|   def cancel_upload(self): | ||||
|     """ Cancels the blob upload, deleting any data uploaded and removing the upload itself. """ | ||||
|     # Tell storage to cancel the chunked upload, deleting its contents. | ||||
|     self.storage.cancel_chunked_upload({self.blob_upload.location_name}, | ||||
|                                        self.blob_upload.upload_id, | ||||
|                                        self.blob_upload.storage_metadata) | ||||
| 
 | ||||
|     # Remove the blob upload record itself. | ||||
|     registry_model.delete_blob_upload(self.blob_upload) | ||||
| 
 | ||||
|   def commit_to_blob(self, app_config, expected_digest=None): | ||||
|     """ Commits the blob upload to a blob under the repository. The resulting blob will be marked | ||||
|         to not be GCed for some period of time (as configured by `committed_blob_expiration`). | ||||
| 
 | ||||
|         If expected_digest is specified, the content digest of the data uploaded for the blob is | ||||
|         compared to that given and, if it does not match, a BlobDigestMismatchException is | ||||
|         raised. The digest given must be of type `Digest` and not a string. | ||||
|     """ | ||||
|     # Compare the content digest. | ||||
|     if expected_digest is not None: | ||||
|       self._validate_digest(expected_digest) | ||||
| 
 | ||||
|     # Finalize the storage. | ||||
|     storage_already_existed = self._finalize_blob_storage(app_config) | ||||
| 
 | ||||
|     # Convert the upload to a blob. | ||||
|     computed_digest_str = digest_tools.sha256_digest_from_hashlib(self.blob_upload.sha_state) | ||||
| 
 | ||||
|     with db_transaction(): | ||||
|       blob = registry_model.commit_blob_upload(self.blob_upload, computed_digest_str, | ||||
|                                                self.settings.committed_blob_expiration) | ||||
|       if blob is None: | ||||
|         return None | ||||
| 
 | ||||
|       # Save torrent hash information (if available). | ||||
|       if self.blob_upload.piece_sha_state is not None and not storage_already_existed: | ||||
|         piece_bytes = self.blob_upload.piece_hashes + self.blob_upload.piece_sha_state.digest() | ||||
|         registry_model.set_torrent_info(blob, self.settings.bittorrent_piece_size, piece_bytes) | ||||
| 
 | ||||
|     self.committed_blob = blob | ||||
|     return blob | ||||
| 
 | ||||
|   def _validate_digest(self, expected_digest): | ||||
|     """ | ||||
|     Verifies that the digest's SHA matches that of the uploaded data. | ||||
|     """ | ||||
|     computed_digest = digest_tools.sha256_digest_from_hashlib(self.blob_upload.sha_state) | ||||
|     if not digest_tools.digests_equal(computed_digest, expected_digest): | ||||
|       logger.error('Digest mismatch for upload %s: Expected digest %s, found digest %s', | ||||
|                    self.blob_upload.upload_id, expected_digest, computed_digest) | ||||
|       raise BlobDigestMismatchException() | ||||
| 
 | ||||
|   def _finalize_blob_storage(self, app_config): | ||||
|     """ | ||||
|     When an upload is successful, this ends the uploading process from the | ||||
|     storage's perspective. | ||||
| 
 | ||||
|     Returns True if the blob already existed. | ||||
|     """ | ||||
|     computed_digest = digest_tools.sha256_digest_from_hashlib(self.blob_upload.sha_state) | ||||
|     final_blob_location = digest_tools.content_path(computed_digest) | ||||
| 
 | ||||
|     # Close the database connection before we perform this operation, as it can take a while | ||||
|     # and we shouldn't hold the connection during that time. | ||||
|     with CloseForLongOperation(app_config): | ||||
|       # Move the storage into place, or if this was a re-upload, cancel it | ||||
|       already_existed = self.storage.exists({self.blob_upload.location_name}, final_blob_location) | ||||
|       if already_existed: | ||||
|         # It already existed, clean up our upload which served as proof that the | ||||
|         # uploader had the blob. | ||||
|         self.storage.cancel_chunked_upload({self.blob_upload.location_name}, | ||||
|                                            self.blob_upload.upload_id, | ||||
|                                            self.blob_upload.storage_metadata) | ||||
|       else: | ||||
|         # We were the first ones to upload this image (at least to this location) | ||||
|         # Let's copy it into place | ||||
|         self.storage.complete_chunked_upload({self.blob_upload.location_name}, | ||||
|                                              self.blob_upload.upload_id, | ||||
|                                              final_blob_location, | ||||
|                                              self.blob_upload.storage_metadata) | ||||
| 
 | ||||
|     return already_existed | ||||
|  | @ -122,13 +122,14 @@ class LegacyImage(datatype('LegacyImage', ['docker_image_id', 'created', 'commen | |||
|                                            'image_size', 'aggregate_size', 'uploading'])): | ||||
|   """ LegacyImage represents a Docker V1-style image found in a repository. """ | ||||
|   @classmethod | ||||
|   def for_image(cls, image, images_map=None, tags_map=None): | ||||
|   def for_image(cls, image, images_map=None, tags_map=None, blob=None): | ||||
|     if image is None: | ||||
|       return None | ||||
| 
 | ||||
|     return LegacyImage(db_id=image.id, | ||||
|                        inputs=dict(images_map=images_map, tags_map=tags_map, | ||||
|                                    ancestor_id_list=image.ancestor_id_list()), | ||||
|                                    ancestor_id_list=image.ancestor_id_list(), | ||||
|                                    blob=blob), | ||||
|                        docker_image_id=image.docker_image_id, | ||||
|                        created=image.created, | ||||
|                        comment=image.comment, | ||||
|  | @ -148,6 +149,14 @@ class LegacyImage(datatype('LegacyImage', ['docker_image_id', 'created', 'commen | |||
|             for ancestor_id in reversed(ancestor_id_list) | ||||
|             if images_map.get(ancestor_id)] | ||||
| 
 | ||||
|   @property | ||||
|   @requiresinput('blob') | ||||
|   def blob(self, blob): | ||||
|     """ Returns the blob for this image. Raises an exception if the blob has | ||||
|         not been loaded before this property is invoked. | ||||
|     """ | ||||
|     return blob | ||||
| 
 | ||||
|   @property | ||||
|   @requiresinput('tags_map') | ||||
|   def tags(self, tags_map): | ||||
|  | @ -240,3 +249,21 @@ class TorrentInfo(datatype('TorrentInfo', ['pieces', 'piece_length'])): | |||
|     return TorrentInfo(db_id=torrent_info.id, | ||||
|                        pieces=torrent_info.pieces, | ||||
|                        piece_length=torrent_info.piece_length) | ||||
| 
 | ||||
| 
 | ||||
| class BlobUpload(datatype('BlobUpload', ['upload_id', 'byte_count', 'uncompressed_byte_count', | ||||
|                                          'chunk_count', 'sha_state', 'location_name', | ||||
|                                          'storage_metadata', 'piece_sha_state', 'piece_hashes'])): | ||||
|   """ BlobUpload represents information about an in-progress upload to create a blob. """ | ||||
|   @classmethod | ||||
|   def for_upload(cls, blob_upload): | ||||
|     return BlobUpload(db_id=blob_upload.id, | ||||
|                       upload_id=blob_upload.uuid, | ||||
|                       byte_count=blob_upload.byte_count, | ||||
|                       uncompressed_byte_count=blob_upload.uncompressed_byte_count, | ||||
|                       chunk_count=blob_upload.chunk_count, | ||||
|                       sha_state=blob_upload.sha_state, | ||||
|                       location_name=blob_upload.location.name, | ||||
|                       storage_metadata=blob_upload.storage_metadata, | ||||
|                       piece_sha_state=blob_upload.piece_sha_state, | ||||
|                       piece_hashes=blob_upload.piece_hashes) | ||||
|  |  | |||
|  | @ -42,7 +42,8 @@ class RegistryDataInterface(object): | |||
|     """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def get_legacy_image(self, repository_ref, docker_image_id, include_parents=False): | ||||
|   def get_legacy_image(self, repository_ref, docker_image_id, include_parents=False, | ||||
|                        include_blob=False): | ||||
|     """ | ||||
|     Returns the matching LegacyImages under the matching repository, if any. If none, | ||||
|     returns None. | ||||
|  | @ -196,9 +197,36 @@ class RegistryDataInterface(object): | |||
|     """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def get_repo_blob_by_digest(self, repo_ref, blob_digest, include_placements=False): | ||||
|   def get_repo_blob_by_digest(self, repository_ref, blob_digest, include_placements=False): | ||||
|     """ | ||||
|     Returns the blob in the repository with the given digest, if any or None if none. Note that | ||||
|     there may be multiple records in the same repository for the same blob digest, so the return | ||||
|     value of this function may change. | ||||
|     """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def create_blob_upload(self, repository_ref, upload_id, location_name, storage_metadata): | ||||
|     """ Creates a new blob upload and returns a reference. If the blob upload could not be | ||||
|         created, returns None. """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def lookup_blob_upload(self, repository_ref, blob_upload_id): | ||||
|     """ Looks up the blob upload withn the given ID under the specified repository and returns it | ||||
|         or None if none. | ||||
|     """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def update_blob_upload(self, blob_upload, uncompressed_byte_count, piece_hashes, piece_sha_state, | ||||
|                          storage_metadata, byte_count, chunk_count, sha_state): | ||||
|     """ Updates the fields of the blob upload to match those given. Returns the updated blob upload | ||||
|         or None if the record does not exists. | ||||
|     """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def delete_blob_upload(self, blob_upload): | ||||
|     """ Deletes a blob upload record. """ | ||||
| 
 | ||||
|   @abstractmethod | ||||
|   def commit_blob_upload(self, blob_upload, blob_digest_str, blob_expiration_seconds): | ||||
|     """ Commits the blob upload into a blob and sets an expiration before that blob will be GCed. | ||||
|     """ | ||||
|  |  | |||
|  | @ -10,7 +10,7 @@ from data import model | |||
| from data.registry_model.interface import RegistryDataInterface | ||||
| from data.registry_model.datatypes import (Tag, RepositoryReference, Manifest, LegacyImage, Label, | ||||
|                                            SecurityScanStatus, ManifestLayer, Blob, DerivedImage, | ||||
|                                            TorrentInfo) | ||||
|                                            TorrentInfo, BlobUpload) | ||||
| from image.docker.schema1 import DockerSchema1ManifestBuilder, ManifestException | ||||
| 
 | ||||
| 
 | ||||
|  | @ -99,7 +99,8 @@ class PreOCIModel(RegistryDataInterface): | |||
|     return [LegacyImage.for_image(image, images_map=all_images_map, tags_map=tags_by_image_id) | ||||
|             for image in all_images] | ||||
| 
 | ||||
|   def get_legacy_image(self, repository_ref, docker_image_id, include_parents=False): | ||||
|   def get_legacy_image(self, repository_ref, docker_image_id, include_parents=False, | ||||
|                        include_blob=False): | ||||
|     """ | ||||
|     Returns the matching LegacyImages under the matching repository, if any. If none, | ||||
|     returns None. | ||||
|  | @ -117,7 +118,14 @@ class PreOCIModel(RegistryDataInterface): | |||
|       parent_images = model.image.get_parent_images(repo.namespace_user.username, repo.name, image) | ||||
|       parent_images_map = {image.id: image for image in parent_images} | ||||
| 
 | ||||
|     return LegacyImage.for_image(image, images_map=parent_images_map) | ||||
|     blob = None | ||||
|     if include_blob: | ||||
|       placements = list(model.storage.get_storage_locations(image.storage.uuid)) | ||||
|       blob = Blob.for_image_storage(image.storage, | ||||
|                                     storage_path=model.storage.get_layer_path(image.storage), | ||||
|                                     placements=placements) | ||||
| 
 | ||||
|     return LegacyImage.for_image(image, images_map=parent_images_map, blob=blob) | ||||
| 
 | ||||
|   def create_manifest_label(self, manifest, key, value, source_type_name, media_type_name=None): | ||||
|     """ Creates a label on the manifest with the given key and value. """ | ||||
|  | @ -547,14 +555,14 @@ class PreOCIModel(RegistryDataInterface): | |||
|     torrent_info = model.storage.save_torrent_info(image_storage, piece_length, pieces) | ||||
|     return TorrentInfo.for_torrent_info(torrent_info) | ||||
| 
 | ||||
|   def get_repo_blob_by_digest(self, repo_ref, blob_digest, include_placements=False): | ||||
|   def get_repo_blob_by_digest(self, repository_ref, blob_digest, include_placements=False): | ||||
|     """ | ||||
|     Returns the blob in the repository with the given digest, if any or None if none. Note that | ||||
|     there may be multiple records in the same repository for the same blob digest, so the return | ||||
|     value of this function may change. | ||||
|     """ | ||||
|     try: | ||||
|       image_storage = model.blob.get_repository_blob_by_digest(repo_ref._db_id, blob_digest) | ||||
|       image_storage = model.blob.get_repository_blob_by_digest(repository_ref._db_id, blob_digest) | ||||
|     except model.BlobDoesNotExist: | ||||
|       return None | ||||
| 
 | ||||
|  | @ -568,5 +576,76 @@ class PreOCIModel(RegistryDataInterface): | |||
|                                   storage_path=model.storage.get_layer_path(image_storage), | ||||
|                                   placements=placements) | ||||
| 
 | ||||
|   def create_blob_upload(self, repository_ref, new_upload_id, location_name, storage_metadata): | ||||
|     """ Creates a new blob upload and returns a reference. If the blob upload could not be | ||||
|         created, returns None. """ | ||||
|     repo = model.repository.lookup_repository(repository_ref._db_id) | ||||
|     if repo is None: | ||||
|       return None | ||||
| 
 | ||||
|     try: | ||||
|       upload_record = model.blob.initiate_upload(repo.namespace_user.username, repo.name, | ||||
|                                                  new_upload_id, location_name, storage_metadata) | ||||
|       return BlobUpload.for_upload(upload_record) | ||||
|     except database.Repository.DoesNotExist: | ||||
|       return None | ||||
| 
 | ||||
|   def lookup_blob_upload(self, repository_ref, blob_upload_id): | ||||
|     """ Looks up the blob upload withn the given ID under the specified repository and returns it | ||||
|         or None if none. | ||||
|     """ | ||||
|     upload_record = model.blob.get_blob_upload_by_uuid(blob_upload_id) | ||||
|     if upload_record is None: | ||||
|       return None | ||||
| 
 | ||||
|     return BlobUpload.for_upload(upload_record) | ||||
| 
 | ||||
|   def update_blob_upload(self, blob_upload, uncompressed_byte_count, piece_hashes, piece_sha_state, | ||||
|                          storage_metadata, byte_count, chunk_count, sha_state): | ||||
|     """ Updates the fields of the blob upload to match those given. Returns the updated blob upload | ||||
|         or None if the record does not exists. | ||||
|     """ | ||||
|     upload_record = model.blob.get_blob_upload_by_uuid(blob_upload.upload_id) | ||||
|     if upload_record is None: | ||||
|       return None | ||||
| 
 | ||||
|     upload_record.uncompressed_byte_count = uncompressed_byte_count | ||||
|     upload_record.piece_hashes = piece_hashes | ||||
|     upload_record.piece_sha_state = piece_sha_state | ||||
|     upload_record.storage_metadata = storage_metadata | ||||
|     upload_record.byte_count = byte_count | ||||
|     upload_record.chunk_count = chunk_count | ||||
|     upload_record.sha_state = sha_state | ||||
|     upload_record.save() | ||||
|     return BlobUpload.for_upload(upload_record) | ||||
| 
 | ||||
|   def delete_blob_upload(self, blob_upload): | ||||
|     """ Deletes a blob upload record. """ | ||||
|     upload_record = model.blob.get_blob_upload_by_uuid(blob_upload.upload_id) | ||||
|     if upload_record is not None: | ||||
|       upload_record.delete_instance() | ||||
| 
 | ||||
|   def commit_blob_upload(self, blob_upload, blob_digest_str, blob_expiration_seconds): | ||||
|     """ Commits the blob upload into a blob and sets an expiration before that blob will be GCed. | ||||
|     """ | ||||
|     upload_record = model.blob.get_blob_upload_by_uuid(blob_upload.upload_id) | ||||
|     if upload_record is None: | ||||
|       return None | ||||
| 
 | ||||
|     repository = upload_record.repository | ||||
|     namespace_name = repository.namespace_user.username | ||||
|     repo_name = repository.name | ||||
| 
 | ||||
|     # Create the blob and temporarily tag it. | ||||
|     location_obj = model.storage.get_image_location_for_name(blob_upload.location_name) | ||||
|     blob_record = model.blob.store_blob_record_and_temp_link( | ||||
|       namespace_name, repo_name, blob_digest_str, location_obj.id, blob_upload.byte_count, | ||||
|       blob_expiration_seconds, blob_upload.uncompressed_byte_count) | ||||
| 
 | ||||
|     # Delete the blob upload. | ||||
|     upload_record.delete_instance() | ||||
|     return Blob.for_image_storage(blob_record, | ||||
|                                   storage_path=model.storage.get_layer_path(blob_record)) | ||||
| 
 | ||||
| 
 | ||||
| pre_oci_model = PreOCIModel() | ||||
|  |  | |||
							
								
								
									
										115
									
								
								data/registry_model/test/test_blobuploader.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										115
									
								
								data/registry_model/test/test_blobuploader.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,115 @@ | |||
| import hashlib | ||||
| import os | ||||
| 
 | ||||
| from io import BytesIO | ||||
| 
 | ||||
| import pytest | ||||
| 
 | ||||
| from data.registry_model.datatypes import RepositoryReference | ||||
| from data.registry_model.blobuploader import (create_blob_upload, retrieve_blob_upload_manager, | ||||
|                                               upload_blob, BlobUploadException, | ||||
|                                               BlobDigestMismatchException, BlobTooLargeException, | ||||
|                                               BlobUploadSettings) | ||||
| from data.registry_model.registry_pre_oci_model import PreOCIModel | ||||
| 
 | ||||
| from storage.distributedstorage import DistributedStorage | ||||
| from storage.fakestorage import FakeStorage | ||||
| from test.fixtures import * | ||||
| 
 | ||||
| @pytest.fixture() | ||||
| def pre_oci_model(initialized_db): | ||||
|   return PreOCIModel() | ||||
| 
 | ||||
| @pytest.mark.parametrize('chunk_count', [ | ||||
|   0, | ||||
|   1, | ||||
|   2, | ||||
|   10, | ||||
| ]) | ||||
| @pytest.mark.parametrize('subchunk', [ | ||||
|   True, | ||||
|   False, | ||||
| ]) | ||||
| def test_basic_upload_blob(chunk_count, subchunk, pre_oci_model): | ||||
|   repository_ref = pre_oci_model.lookup_repository('devtable', 'complex') | ||||
|   storage = DistributedStorage({'local_us': FakeStorage(None)}, ['local_us']) | ||||
|   settings = BlobUploadSettings('2M', 512 * 1024, 3600) | ||||
|   app_config = {'TESTING': True} | ||||
| 
 | ||||
|   data = '' | ||||
|   with upload_blob(repository_ref, storage, settings) as manager: | ||||
|     assert manager | ||||
|     assert manager.blob_upload_id | ||||
| 
 | ||||
|     for index in range(0, chunk_count): | ||||
|       chunk_data = os.urandom(100) | ||||
|       data += chunk_data | ||||
| 
 | ||||
|       if subchunk: | ||||
|         manager.upload_chunk(app_config, BytesIO(chunk_data)) | ||||
|         manager.upload_chunk(app_config, BytesIO(chunk_data), (index * 100) + 50) | ||||
|       else: | ||||
|         manager.upload_chunk(app_config, BytesIO(chunk_data)) | ||||
| 
 | ||||
|     blob = manager.commit_to_blob(app_config) | ||||
| 
 | ||||
|   # Check the blob. | ||||
|   assert blob.compressed_size == len(data) | ||||
|   assert not blob.uploading | ||||
|   assert blob.digest == 'sha256:' + hashlib.sha256(data).hexdigest() | ||||
| 
 | ||||
|   # Ensure the blob exists in storage and has the expected data. | ||||
|   assert storage.get_content(['local_us'], blob.storage_path) == data | ||||
| 
 | ||||
| 
 | ||||
| def test_cancel_upload(pre_oci_model): | ||||
|   repository_ref = pre_oci_model.lookup_repository('devtable', 'complex') | ||||
|   storage = DistributedStorage({'local_us': FakeStorage(None)}, ['local_us']) | ||||
|   settings = BlobUploadSettings('2M', 512 * 1024, 3600) | ||||
|   app_config = {'TESTING': True} | ||||
| 
 | ||||
|   blob_upload_id = None | ||||
|   with upload_blob(repository_ref, storage, settings) as manager: | ||||
|     blob_upload_id = manager.blob_upload_id | ||||
|     assert pre_oci_model.lookup_blob_upload(repository_ref, blob_upload_id) is not None | ||||
| 
 | ||||
|     manager.upload_chunk(app_config, BytesIO('hello world')) | ||||
| 
 | ||||
|   # Since the blob was not comitted, the upload should be deleted. | ||||
|   assert blob_upload_id | ||||
|   assert pre_oci_model.lookup_blob_upload(repository_ref, blob_upload_id) is None | ||||
| 
 | ||||
| 
 | ||||
| def test_too_large(pre_oci_model): | ||||
|   repository_ref = pre_oci_model.lookup_repository('devtable', 'complex') | ||||
|   storage = DistributedStorage({'local_us': FakeStorage(None)}, ['local_us']) | ||||
|   settings = BlobUploadSettings('1K', 512 * 1024, 3600) | ||||
|   app_config = {'TESTING': True} | ||||
| 
 | ||||
|   with upload_blob(repository_ref, storage, settings) as manager: | ||||
|     with pytest.raises(BlobTooLargeException): | ||||
|       manager.upload_chunk(app_config, BytesIO(os.urandom(1024 * 1024 * 2))) | ||||
| 
 | ||||
| 
 | ||||
| def test_extra_blob_stream_handlers(pre_oci_model): | ||||
|   handler1_result = [] | ||||
|   handler2_result = [] | ||||
| 
 | ||||
|   def handler1(bytes): | ||||
|     handler1_result.append(bytes) | ||||
| 
 | ||||
|   def handler2(bytes): | ||||
|     handler2_result.append(bytes) | ||||
| 
 | ||||
|   repository_ref = pre_oci_model.lookup_repository('devtable', 'complex') | ||||
|   storage = DistributedStorage({'local_us': FakeStorage(None)}, ['local_us']) | ||||
|   settings = BlobUploadSettings('1K', 512 * 1024, 3600) | ||||
|   app_config = {'TESTING': True} | ||||
| 
 | ||||
|   with upload_blob(repository_ref, storage, settings, | ||||
|                    extra_blob_stream_handlers=[handler1, handler2]) as manager: | ||||
|     manager.upload_chunk(app_config, BytesIO('hello ')) | ||||
|     manager.upload_chunk(app_config, BytesIO('world')) | ||||
| 
 | ||||
|   assert ''.join(handler1_result) == 'hello world' | ||||
|   assert ''.join(handler2_result) == 'hello world' | ||||
|  | @ -1,3 +1,6 @@ | |||
| import hashlib | ||||
| import uuid | ||||
| 
 | ||||
| from datetime import datetime, timedelta | ||||
| 
 | ||||
| import pytest | ||||
|  | @ -105,11 +108,13 @@ def test_legacy_images(repo_namespace, repo_name, pre_oci_model): | |||
|     found_image = pre_oci_model.get_legacy_image(repository_ref, image.docker_image_id, | ||||
|                                                  include_parents=True) | ||||
| 
 | ||||
|     with assert_query_count(4 if found_image.parents else 3): | ||||
|     with assert_query_count(5 if found_image.parents else 4): | ||||
|       found_image = pre_oci_model.get_legacy_image(repository_ref, image.docker_image_id, | ||||
|                                                    include_parents=True) | ||||
|                                                    include_parents=True, include_blob=True) | ||||
|       assert found_image.docker_image_id == image.docker_image_id | ||||
|       assert found_image.parents == image.parents | ||||
|       assert found_image.blob | ||||
|       assert found_image.blob.placements | ||||
| 
 | ||||
|     # Check that the tags list can be retrieved. | ||||
|     assert image.tags is not None | ||||
|  | @ -523,3 +528,50 @@ def test_torrent_info(pre_oci_model): | |||
|   assert torrent_info is not None | ||||
|   assert torrent_info.piece_length == 2 | ||||
|   assert torrent_info.pieces == 'foo' | ||||
| 
 | ||||
| 
 | ||||
| def test_blob_uploads(pre_oci_model): | ||||
|   repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') | ||||
| 
 | ||||
|   blob_upload = pre_oci_model.create_blob_upload(repository_ref, str(uuid.uuid4()), | ||||
|                                                  'local_us', {'some': 'metadata'}) | ||||
|   assert blob_upload | ||||
|   assert blob_upload.storage_metadata == {'some': 'metadata'} | ||||
|   assert blob_upload.location_name == 'local_us' | ||||
| 
 | ||||
|   # Ensure we can find the blob upload. | ||||
|   assert pre_oci_model.lookup_blob_upload(repository_ref, blob_upload.upload_id) == blob_upload | ||||
| 
 | ||||
|   # Update and ensure the changes are saved. | ||||
|   assert pre_oci_model.update_blob_upload(blob_upload, 1, 'the-pieces_hash', | ||||
|                                           blob_upload.piece_sha_state, | ||||
|                                           {'new': 'metadata'}, 2, 3, | ||||
|                                           blob_upload.sha_state) | ||||
| 
 | ||||
|   updated = pre_oci_model.lookup_blob_upload(repository_ref, blob_upload.upload_id) | ||||
|   assert updated | ||||
|   assert updated.uncompressed_byte_count == 1 | ||||
|   assert updated.piece_hashes == 'the-pieces_hash' | ||||
|   assert updated.storage_metadata == {'new': 'metadata'} | ||||
|   assert updated.byte_count == 2 | ||||
|   assert updated.chunk_count == 3 | ||||
| 
 | ||||
|   # Delete the upload. | ||||
|   pre_oci_model.delete_blob_upload(blob_upload) | ||||
| 
 | ||||
|   # Ensure it can no longer be found. | ||||
|   assert not pre_oci_model.lookup_blob_upload(repository_ref, blob_upload.upload_id) | ||||
| 
 | ||||
| 
 | ||||
| def test_commit_blob_upload(pre_oci_model): | ||||
|   repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') | ||||
|   blob_upload = pre_oci_model.create_blob_upload(repository_ref, str(uuid.uuid4()), | ||||
|                                                  'local_us', {'some': 'metadata'}) | ||||
| 
 | ||||
|   # Commit the blob upload and make sure it is written as a blob. | ||||
|   digest = 'sha256:' + hashlib.sha256('hello').hexdigest() | ||||
|   blob = pre_oci_model.commit_blob_upload(blob_upload, digest, 60) | ||||
|   assert blob.digest == digest | ||||
| 
 | ||||
|   # Ensure the upload can no longer be found. | ||||
|   assert not pre_oci_model.lookup_blob_upload(repository_ref, blob_upload.upload_id) | ||||
|  |  | |||
		Reference in a new issue