diff --git a/data/registry_model/blobuploader.py b/data/registry_model/blobuploader.py new file mode 100644 index 000000000..3d53bcfe3 --- /dev/null +++ b/data/registry_model/blobuploader.py @@ -0,0 +1,310 @@ +import logging +import time + +from contextlib import contextmanager +from collections import namedtuple + +import bitmath +import resumablehashlib + +from data.registry_model import registry_model +from data.database import CloseForLongOperation, db_transaction +from digest import digest_tools +from util.registry.filelike import wrap_with_handler, StreamSlice +from util.registry.gzipstream import calculate_size_handler +from util.registry.torrent import PieceHasher + + +logger = logging.getLogger(__name__) + + +BLOB_CONTENT_TYPE = 'application/octet-stream' + + +class BlobUploadException(Exception): + """ Base for all exceptions raised when uploading blobs. """ + +class BlobDigestMismatchException(BlobUploadException): + """ Exception raised if the digest requested does not match that of the contents uploaded. """ + +class BlobTooLargeException(BlobUploadException): + """ Exception raised if the data uploaded exceeds the maximum_blob_size. """ + def __init__(self, uploaded, max_allowed): + super(BlobTooLargeException, self).__init__() + self.uploaded = uploaded + self.max_allowed = max_allowed + + +BlobUploadSettings = namedtuple('BlobUploadSettings', ['maximum_blob_size', 'bittorrent_piece_size', + 'committed_blob_expiration']) + + +def create_blob_upload(repository_ref, storage, settings, extra_blob_stream_handlers=None): + """ Creates a new blob upload in the specified repository and returns a manager for interacting + with that upload. Returns None if a new blob upload could not be started. + """ + location_name = storage.preferred_locations[0] + new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name) + blob_upload = registry_model.create_blob_upload(repository_ref, new_upload_uuid, location_name, + upload_metadata) + if blob_upload is None: + return None + + return _BlobUploadManager(repository_ref, blob_upload, settings, storage, + extra_blob_stream_handlers) + + +def retrieve_blob_upload_manager(repository_ref, blob_upload_id, storage, settings): + """ Retrieves the manager for an in-progress blob upload with the specified ID under the given + repository or None if none. + """ + blob_upload = registry_model.lookup_blob_upload(repository_ref, blob_upload_id) + if blob_upload is None: + return None + + return _BlobUploadManager(repository_ref, blob_upload, settings, storage) + + +@contextmanager +def upload_blob(repository_ref, storage, settings, extra_blob_stream_handlers=None): + """ Starts a new blob upload in the specified repository and yields a manager for interacting + with that upload. When the context manager completes, the blob upload is deleted, whether + committed to a blob or not. Yields None if a blob upload could not be started. + """ + created = create_blob_upload(repository_ref, storage, settings, extra_blob_stream_handlers) + if not created: + yield None + return + + try: + yield created + except Exception as ex: + logger.exception('Exception when uploading blob `%s`', created.blob_upload_id) + raise ex + finally: + # Cancel the upload if something went wrong or it was not commit to a blob. + if created.committed_blob is None: + created.cancel_upload() + + +class _BlobUploadManager(object): + """ Defines a helper class for easily interacting with blob uploads in progress, including + handling of database and storage calls. + """ + def __init__(self, repository_ref, blob_upload, settings, storage, + extra_blob_stream_handlers=None): + self.repository_ref = repository_ref + self.blob_upload = blob_upload + self.settings = settings + self.storage = storage + self.extra_blob_stream_handlers = extra_blob_stream_handlers + self.committed_blob = None + + @property + def blob_upload_id(self): + """ Returns the unique ID for the blob upload. """ + return self.blob_upload.upload_id + + def upload_chunk(self, app_config, input_fp, start_offset=0, length=-1, metric_queue=None): + """ Uploads a chunk of data found in the given input file-like interface. start_offset and + length are optional and should match a range header if any was given. + + If metric_queue is given, the upload time and chunk size are written into the metrics in + the queue. + + Returns the total number of bytes uploaded after this upload has completed. Raises + a BlobUploadException if the upload failed. + """ + assert start_offset is not None + assert length is not None + + if start_offset > 0 and start_offset > self.blob_upload.byte_count: + logger.error('start_offset provided greater than blob_upload.byte_count') + return None + + # Ensure that we won't go over the allowed maximum size for blobs. + max_blob_size = bitmath.parse_string_unsafe(self.settings.maximum_blob_size) + uploaded = bitmath.Byte(length + start_offset) + if length > -1 and uploaded > max_blob_size: + raise BlobTooLargeException(uploaded=uploaded.bytes, max_allowed=max_blob_size.bytes) + + location_set = {self.blob_upload.location_name} + upload_error = None + with CloseForLongOperation(app_config): + if start_offset > 0 and start_offset < self.blob_upload.byte_count: + # Skip the bytes which were received on a previous push, which are already stored and + # included in the sha calculation + overlap_size = self.blob_upload.byte_count - start_offset + input_fp = StreamSlice(input_fp, overlap_size) + + # Update our upload bounds to reflect the skipped portion of the overlap + start_offset = self.blob_upload.byte_count + length = max(length - overlap_size, 0) + + # We use this to escape early in case we have already processed all of the bytes the user + # wants to upload. + if length == 0: + return self.blob_upload.byte_count + + input_fp = wrap_with_handler(input_fp, self.blob_upload.sha_state.update) + + if self.extra_blob_stream_handlers: + for handler in self.extra_blob_stream_handlers: + input_fp = wrap_with_handler(input_fp, handler) + + # Add a hasher for calculating SHA1s for torrents if this is the first chunk and/or we have + # already calculated hash data for the previous chunk(s). + piece_hasher = None + if self.blob_upload.chunk_count == 0 or self.blob_upload.piece_sha_state: + initial_sha1_value = self.blob_upload.piece_sha_state or resumablehashlib.sha1() + initial_sha1_pieces_value = self.blob_upload.piece_hashes or '' + + piece_hasher = PieceHasher(self.settings.bittorrent_piece_size, start_offset, + initial_sha1_pieces_value, initial_sha1_value) + input_fp = wrap_with_handler(input_fp, piece_hasher.update) + + # If this is the first chunk and we're starting at the 0 offset, add a handler to gunzip the + # stream so we can determine the uncompressed size. We'll throw out this data if another chunk + # comes in, but in the common case the docker client only sends one chunk. + size_info = None + if start_offset == 0 and self.blob_upload.chunk_count == 0: + size_info, fn = calculate_size_handler() + input_fp = wrap_with_handler(input_fp, fn) + + start_time = time.time() + length_written, new_metadata, upload_error = self.storage.stream_upload_chunk( + location_set, + self.blob_upload.upload_id, + start_offset, + length, + input_fp, + self.blob_upload.storage_metadata, + content_type=BLOB_CONTENT_TYPE, + ) + + if upload_error is not None: + logger.error('storage.stream_upload_chunk returned error %s', upload_error) + raise BlobUploadException(upload_error) + + # Update the chunk upload time metric. + if metric_queue is not None: + metric_queue.chunk_upload_time.Observe(time.time() - start_time, labelvalues=[ + length_written, list(location_set)[0]]) + + # Ensure we have not gone beyond the max layer size. + new_blob_bytes = self.blob_upload.byte_count + length_written + new_blob_size = bitmath.Byte(new_blob_bytes) + if new_blob_size > max_blob_size: + raise BlobTooLargeException(uploaded=new_blob_size, max_allowed=max_blob_size.bytes) + + # If we determined an uncompressed size and this is the first chunk, add it to the blob. + # Otherwise, we clear the size from the blob as it was uploaded in multiple chunks. + uncompressed_byte_count = self.blob_upload.uncompressed_byte_count + if size_info is not None and self.blob_upload.chunk_count == 0 and size_info.is_valid: + uncompressed_byte_count = size_info.uncompressed_size + elif length_written > 0: + # Otherwise, if we wrote some bytes and the above conditions were not met, then we don't + # know the uncompressed size. + uncompressed_byte_count = None + + piece_hashes = None + piece_sha_state = None + if piece_hasher is not None: + piece_hashes = piece_hasher.piece_hashes + piece_sha_state = piece_hasher.hash_fragment + + self.blob_upload = registry_model.update_blob_upload(self.blob_upload, + uncompressed_byte_count, + piece_hashes, + piece_sha_state, + new_metadata, + new_blob_bytes, + self.blob_upload.chunk_count + 1, + self.blob_upload.sha_state) + if self.blob_upload is None: + raise BlobUploadException('Could not complete upload of chunk') + + return new_blob_bytes + + def cancel_upload(self): + """ Cancels the blob upload, deleting any data uploaded and removing the upload itself. """ + # Tell storage to cancel the chunked upload, deleting its contents. + self.storage.cancel_chunked_upload({self.blob_upload.location_name}, + self.blob_upload.upload_id, + self.blob_upload.storage_metadata) + + # Remove the blob upload record itself. + registry_model.delete_blob_upload(self.blob_upload) + + def commit_to_blob(self, app_config, expected_digest=None): + """ Commits the blob upload to a blob under the repository. The resulting blob will be marked + to not be GCed for some period of time (as configured by `committed_blob_expiration`). + + If expected_digest is specified, the content digest of the data uploaded for the blob is + compared to that given and, if it does not match, a BlobDigestMismatchException is + raised. The digest given must be of type `Digest` and not a string. + """ + # Compare the content digest. + if expected_digest is not None: + self._validate_digest(expected_digest) + + # Finalize the storage. + storage_already_existed = self._finalize_blob_storage(app_config) + + # Convert the upload to a blob. + computed_digest_str = digest_tools.sha256_digest_from_hashlib(self.blob_upload.sha_state) + + with db_transaction(): + blob = registry_model.commit_blob_upload(self.blob_upload, computed_digest_str, + self.settings.committed_blob_expiration) + if blob is None: + return None + + # Save torrent hash information (if available). + if self.blob_upload.piece_sha_state is not None and not storage_already_existed: + piece_bytes = self.blob_upload.piece_hashes + self.blob_upload.piece_sha_state.digest() + registry_model.set_torrent_info(blob, self.settings.bittorrent_piece_size, piece_bytes) + + self.committed_blob = blob + return blob + + def _validate_digest(self, expected_digest): + """ + Verifies that the digest's SHA matches that of the uploaded data. + """ + computed_digest = digest_tools.sha256_digest_from_hashlib(self.blob_upload.sha_state) + if not digest_tools.digests_equal(computed_digest, expected_digest): + logger.error('Digest mismatch for upload %s: Expected digest %s, found digest %s', + self.blob_upload.upload_id, expected_digest, computed_digest) + raise BlobDigestMismatchException() + + def _finalize_blob_storage(self, app_config): + """ + When an upload is successful, this ends the uploading process from the + storage's perspective. + + Returns True if the blob already existed. + """ + computed_digest = digest_tools.sha256_digest_from_hashlib(self.blob_upload.sha_state) + final_blob_location = digest_tools.content_path(computed_digest) + + # Close the database connection before we perform this operation, as it can take a while + # and we shouldn't hold the connection during that time. + with CloseForLongOperation(app_config): + # Move the storage into place, or if this was a re-upload, cancel it + already_existed = self.storage.exists({self.blob_upload.location_name}, final_blob_location) + if already_existed: + # It already existed, clean up our upload which served as proof that the + # uploader had the blob. + self.storage.cancel_chunked_upload({self.blob_upload.location_name}, + self.blob_upload.upload_id, + self.blob_upload.storage_metadata) + else: + # We were the first ones to upload this image (at least to this location) + # Let's copy it into place + self.storage.complete_chunked_upload({self.blob_upload.location_name}, + self.blob_upload.upload_id, + final_blob_location, + self.blob_upload.storage_metadata) + + return already_existed diff --git a/data/registry_model/datatypes.py b/data/registry_model/datatypes.py index 550baed22..3ec0c6f22 100644 --- a/data/registry_model/datatypes.py +++ b/data/registry_model/datatypes.py @@ -122,13 +122,14 @@ class LegacyImage(datatype('LegacyImage', ['docker_image_id', 'created', 'commen 'image_size', 'aggregate_size', 'uploading'])): """ LegacyImage represents a Docker V1-style image found in a repository. """ @classmethod - def for_image(cls, image, images_map=None, tags_map=None): + def for_image(cls, image, images_map=None, tags_map=None, blob=None): if image is None: return None return LegacyImage(db_id=image.id, inputs=dict(images_map=images_map, tags_map=tags_map, - ancestor_id_list=image.ancestor_id_list()), + ancestor_id_list=image.ancestor_id_list(), + blob=blob), docker_image_id=image.docker_image_id, created=image.created, comment=image.comment, @@ -148,6 +149,14 @@ class LegacyImage(datatype('LegacyImage', ['docker_image_id', 'created', 'commen for ancestor_id in reversed(ancestor_id_list) if images_map.get(ancestor_id)] + @property + @requiresinput('blob') + def blob(self, blob): + """ Returns the blob for this image. Raises an exception if the blob has + not been loaded before this property is invoked. + """ + return blob + @property @requiresinput('tags_map') def tags(self, tags_map): @@ -240,3 +249,21 @@ class TorrentInfo(datatype('TorrentInfo', ['pieces', 'piece_length'])): return TorrentInfo(db_id=torrent_info.id, pieces=torrent_info.pieces, piece_length=torrent_info.piece_length) + + +class BlobUpload(datatype('BlobUpload', ['upload_id', 'byte_count', 'uncompressed_byte_count', + 'chunk_count', 'sha_state', 'location_name', + 'storage_metadata', 'piece_sha_state', 'piece_hashes'])): + """ BlobUpload represents information about an in-progress upload to create a blob. """ + @classmethod + def for_upload(cls, blob_upload): + return BlobUpload(db_id=blob_upload.id, + upload_id=blob_upload.uuid, + byte_count=blob_upload.byte_count, + uncompressed_byte_count=blob_upload.uncompressed_byte_count, + chunk_count=blob_upload.chunk_count, + sha_state=blob_upload.sha_state, + location_name=blob_upload.location.name, + storage_metadata=blob_upload.storage_metadata, + piece_sha_state=blob_upload.piece_sha_state, + piece_hashes=blob_upload.piece_hashes) diff --git a/data/registry_model/interface.py b/data/registry_model/interface.py index a565504d6..748a8bcc0 100644 --- a/data/registry_model/interface.py +++ b/data/registry_model/interface.py @@ -42,7 +42,8 @@ class RegistryDataInterface(object): """ @abstractmethod - def get_legacy_image(self, repository_ref, docker_image_id, include_parents=False): + def get_legacy_image(self, repository_ref, docker_image_id, include_parents=False, + include_blob=False): """ Returns the matching LegacyImages under the matching repository, if any. If none, returns None. @@ -196,9 +197,36 @@ class RegistryDataInterface(object): """ @abstractmethod - def get_repo_blob_by_digest(self, repo_ref, blob_digest, include_placements=False): + def get_repo_blob_by_digest(self, repository_ref, blob_digest, include_placements=False): """ Returns the blob in the repository with the given digest, if any or None if none. Note that there may be multiple records in the same repository for the same blob digest, so the return value of this function may change. """ + + @abstractmethod + def create_blob_upload(self, repository_ref, upload_id, location_name, storage_metadata): + """ Creates a new blob upload and returns a reference. If the blob upload could not be + created, returns None. """ + + @abstractmethod + def lookup_blob_upload(self, repository_ref, blob_upload_id): + """ Looks up the blob upload withn the given ID under the specified repository and returns it + or None if none. + """ + + @abstractmethod + def update_blob_upload(self, blob_upload, uncompressed_byte_count, piece_hashes, piece_sha_state, + storage_metadata, byte_count, chunk_count, sha_state): + """ Updates the fields of the blob upload to match those given. Returns the updated blob upload + or None if the record does not exists. + """ + + @abstractmethod + def delete_blob_upload(self, blob_upload): + """ Deletes a blob upload record. """ + + @abstractmethod + def commit_blob_upload(self, blob_upload, blob_digest_str, blob_expiration_seconds): + """ Commits the blob upload into a blob and sets an expiration before that blob will be GCed. + """ diff --git a/data/registry_model/registry_pre_oci_model.py b/data/registry_model/registry_pre_oci_model.py index cdb038533..bba644df3 100644 --- a/data/registry_model/registry_pre_oci_model.py +++ b/data/registry_model/registry_pre_oci_model.py @@ -10,7 +10,7 @@ from data import model from data.registry_model.interface import RegistryDataInterface from data.registry_model.datatypes import (Tag, RepositoryReference, Manifest, LegacyImage, Label, SecurityScanStatus, ManifestLayer, Blob, DerivedImage, - TorrentInfo) + TorrentInfo, BlobUpload) from image.docker.schema1 import DockerSchema1ManifestBuilder, ManifestException @@ -99,7 +99,8 @@ class PreOCIModel(RegistryDataInterface): return [LegacyImage.for_image(image, images_map=all_images_map, tags_map=tags_by_image_id) for image in all_images] - def get_legacy_image(self, repository_ref, docker_image_id, include_parents=False): + def get_legacy_image(self, repository_ref, docker_image_id, include_parents=False, + include_blob=False): """ Returns the matching LegacyImages under the matching repository, if any. If none, returns None. @@ -117,7 +118,14 @@ class PreOCIModel(RegistryDataInterface): parent_images = model.image.get_parent_images(repo.namespace_user.username, repo.name, image) parent_images_map = {image.id: image for image in parent_images} - return LegacyImage.for_image(image, images_map=parent_images_map) + blob = None + if include_blob: + placements = list(model.storage.get_storage_locations(image.storage.uuid)) + blob = Blob.for_image_storage(image.storage, + storage_path=model.storage.get_layer_path(image.storage), + placements=placements) + + return LegacyImage.for_image(image, images_map=parent_images_map, blob=blob) def create_manifest_label(self, manifest, key, value, source_type_name, media_type_name=None): """ Creates a label on the manifest with the given key and value. """ @@ -547,14 +555,14 @@ class PreOCIModel(RegistryDataInterface): torrent_info = model.storage.save_torrent_info(image_storage, piece_length, pieces) return TorrentInfo.for_torrent_info(torrent_info) - def get_repo_blob_by_digest(self, repo_ref, blob_digest, include_placements=False): + def get_repo_blob_by_digest(self, repository_ref, blob_digest, include_placements=False): """ Returns the blob in the repository with the given digest, if any or None if none. Note that there may be multiple records in the same repository for the same blob digest, so the return value of this function may change. """ try: - image_storage = model.blob.get_repository_blob_by_digest(repo_ref._db_id, blob_digest) + image_storage = model.blob.get_repository_blob_by_digest(repository_ref._db_id, blob_digest) except model.BlobDoesNotExist: return None @@ -568,5 +576,76 @@ class PreOCIModel(RegistryDataInterface): storage_path=model.storage.get_layer_path(image_storage), placements=placements) + def create_blob_upload(self, repository_ref, new_upload_id, location_name, storage_metadata): + """ Creates a new blob upload and returns a reference. If the blob upload could not be + created, returns None. """ + repo = model.repository.lookup_repository(repository_ref._db_id) + if repo is None: + return None + + try: + upload_record = model.blob.initiate_upload(repo.namespace_user.username, repo.name, + new_upload_id, location_name, storage_metadata) + return BlobUpload.for_upload(upload_record) + except database.Repository.DoesNotExist: + return None + + def lookup_blob_upload(self, repository_ref, blob_upload_id): + """ Looks up the blob upload withn the given ID under the specified repository and returns it + or None if none. + """ + upload_record = model.blob.get_blob_upload_by_uuid(blob_upload_id) + if upload_record is None: + return None + + return BlobUpload.for_upload(upload_record) + + def update_blob_upload(self, blob_upload, uncompressed_byte_count, piece_hashes, piece_sha_state, + storage_metadata, byte_count, chunk_count, sha_state): + """ Updates the fields of the blob upload to match those given. Returns the updated blob upload + or None if the record does not exists. + """ + upload_record = model.blob.get_blob_upload_by_uuid(blob_upload.upload_id) + if upload_record is None: + return None + + upload_record.uncompressed_byte_count = uncompressed_byte_count + upload_record.piece_hashes = piece_hashes + upload_record.piece_sha_state = piece_sha_state + upload_record.storage_metadata = storage_metadata + upload_record.byte_count = byte_count + upload_record.chunk_count = chunk_count + upload_record.sha_state = sha_state + upload_record.save() + return BlobUpload.for_upload(upload_record) + + def delete_blob_upload(self, blob_upload): + """ Deletes a blob upload record. """ + upload_record = model.blob.get_blob_upload_by_uuid(blob_upload.upload_id) + if upload_record is not None: + upload_record.delete_instance() + + def commit_blob_upload(self, blob_upload, blob_digest_str, blob_expiration_seconds): + """ Commits the blob upload into a blob and sets an expiration before that blob will be GCed. + """ + upload_record = model.blob.get_blob_upload_by_uuid(blob_upload.upload_id) + if upload_record is None: + return None + + repository = upload_record.repository + namespace_name = repository.namespace_user.username + repo_name = repository.name + + # Create the blob and temporarily tag it. + location_obj = model.storage.get_image_location_for_name(blob_upload.location_name) + blob_record = model.blob.store_blob_record_and_temp_link( + namespace_name, repo_name, blob_digest_str, location_obj.id, blob_upload.byte_count, + blob_expiration_seconds, blob_upload.uncompressed_byte_count) + + # Delete the blob upload. + upload_record.delete_instance() + return Blob.for_image_storage(blob_record, + storage_path=model.storage.get_layer_path(blob_record)) + pre_oci_model = PreOCIModel() diff --git a/data/registry_model/test/test_blobuploader.py b/data/registry_model/test/test_blobuploader.py new file mode 100644 index 000000000..72dccb5f9 --- /dev/null +++ b/data/registry_model/test/test_blobuploader.py @@ -0,0 +1,115 @@ +import hashlib +import os + +from io import BytesIO + +import pytest + +from data.registry_model.datatypes import RepositoryReference +from data.registry_model.blobuploader import (create_blob_upload, retrieve_blob_upload_manager, + upload_blob, BlobUploadException, + BlobDigestMismatchException, BlobTooLargeException, + BlobUploadSettings) +from data.registry_model.registry_pre_oci_model import PreOCIModel + +from storage.distributedstorage import DistributedStorage +from storage.fakestorage import FakeStorage +from test.fixtures import * + +@pytest.fixture() +def pre_oci_model(initialized_db): + return PreOCIModel() + +@pytest.mark.parametrize('chunk_count', [ + 0, + 1, + 2, + 10, +]) +@pytest.mark.parametrize('subchunk', [ + True, + False, +]) +def test_basic_upload_blob(chunk_count, subchunk, pre_oci_model): + repository_ref = pre_oci_model.lookup_repository('devtable', 'complex') + storage = DistributedStorage({'local_us': FakeStorage(None)}, ['local_us']) + settings = BlobUploadSettings('2M', 512 * 1024, 3600) + app_config = {'TESTING': True} + + data = '' + with upload_blob(repository_ref, storage, settings) as manager: + assert manager + assert manager.blob_upload_id + + for index in range(0, chunk_count): + chunk_data = os.urandom(100) + data += chunk_data + + if subchunk: + manager.upload_chunk(app_config, BytesIO(chunk_data)) + manager.upload_chunk(app_config, BytesIO(chunk_data), (index * 100) + 50) + else: + manager.upload_chunk(app_config, BytesIO(chunk_data)) + + blob = manager.commit_to_blob(app_config) + + # Check the blob. + assert blob.compressed_size == len(data) + assert not blob.uploading + assert blob.digest == 'sha256:' + hashlib.sha256(data).hexdigest() + + # Ensure the blob exists in storage and has the expected data. + assert storage.get_content(['local_us'], blob.storage_path) == data + + +def test_cancel_upload(pre_oci_model): + repository_ref = pre_oci_model.lookup_repository('devtable', 'complex') + storage = DistributedStorage({'local_us': FakeStorage(None)}, ['local_us']) + settings = BlobUploadSettings('2M', 512 * 1024, 3600) + app_config = {'TESTING': True} + + blob_upload_id = None + with upload_blob(repository_ref, storage, settings) as manager: + blob_upload_id = manager.blob_upload_id + assert pre_oci_model.lookup_blob_upload(repository_ref, blob_upload_id) is not None + + manager.upload_chunk(app_config, BytesIO('hello world')) + + # Since the blob was not comitted, the upload should be deleted. + assert blob_upload_id + assert pre_oci_model.lookup_blob_upload(repository_ref, blob_upload_id) is None + + +def test_too_large(pre_oci_model): + repository_ref = pre_oci_model.lookup_repository('devtable', 'complex') + storage = DistributedStorage({'local_us': FakeStorage(None)}, ['local_us']) + settings = BlobUploadSettings('1K', 512 * 1024, 3600) + app_config = {'TESTING': True} + + with upload_blob(repository_ref, storage, settings) as manager: + with pytest.raises(BlobTooLargeException): + manager.upload_chunk(app_config, BytesIO(os.urandom(1024 * 1024 * 2))) + + +def test_extra_blob_stream_handlers(pre_oci_model): + handler1_result = [] + handler2_result = [] + + def handler1(bytes): + handler1_result.append(bytes) + + def handler2(bytes): + handler2_result.append(bytes) + + repository_ref = pre_oci_model.lookup_repository('devtable', 'complex') + storage = DistributedStorage({'local_us': FakeStorage(None)}, ['local_us']) + settings = BlobUploadSettings('1K', 512 * 1024, 3600) + app_config = {'TESTING': True} + + with upload_blob(repository_ref, storage, settings, + extra_blob_stream_handlers=[handler1, handler2]) as manager: + manager.upload_chunk(app_config, BytesIO('hello ')) + manager.upload_chunk(app_config, BytesIO('world')) + + assert ''.join(handler1_result) == 'hello world' + assert ''.join(handler2_result) == 'hello world' diff --git a/data/registry_model/test/test_pre_oci_model.py b/data/registry_model/test/test_pre_oci_model.py index e568dd923..cdeec9f45 100644 --- a/data/registry_model/test/test_pre_oci_model.py +++ b/data/registry_model/test/test_pre_oci_model.py @@ -1,3 +1,6 @@ +import hashlib +import uuid + from datetime import datetime, timedelta import pytest @@ -105,11 +108,13 @@ def test_legacy_images(repo_namespace, repo_name, pre_oci_model): found_image = pre_oci_model.get_legacy_image(repository_ref, image.docker_image_id, include_parents=True) - with assert_query_count(4 if found_image.parents else 3): + with assert_query_count(5 if found_image.parents else 4): found_image = pre_oci_model.get_legacy_image(repository_ref, image.docker_image_id, - include_parents=True) + include_parents=True, include_blob=True) assert found_image.docker_image_id == image.docker_image_id assert found_image.parents == image.parents + assert found_image.blob + assert found_image.blob.placements # Check that the tags list can be retrieved. assert image.tags is not None @@ -523,3 +528,50 @@ def test_torrent_info(pre_oci_model): assert torrent_info is not None assert torrent_info.piece_length == 2 assert torrent_info.pieces == 'foo' + + +def test_blob_uploads(pre_oci_model): + repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') + + blob_upload = pre_oci_model.create_blob_upload(repository_ref, str(uuid.uuid4()), + 'local_us', {'some': 'metadata'}) + assert blob_upload + assert blob_upload.storage_metadata == {'some': 'metadata'} + assert blob_upload.location_name == 'local_us' + + # Ensure we can find the blob upload. + assert pre_oci_model.lookup_blob_upload(repository_ref, blob_upload.upload_id) == blob_upload + + # Update and ensure the changes are saved. + assert pre_oci_model.update_blob_upload(blob_upload, 1, 'the-pieces_hash', + blob_upload.piece_sha_state, + {'new': 'metadata'}, 2, 3, + blob_upload.sha_state) + + updated = pre_oci_model.lookup_blob_upload(repository_ref, blob_upload.upload_id) + assert updated + assert updated.uncompressed_byte_count == 1 + assert updated.piece_hashes == 'the-pieces_hash' + assert updated.storage_metadata == {'new': 'metadata'} + assert updated.byte_count == 2 + assert updated.chunk_count == 3 + + # Delete the upload. + pre_oci_model.delete_blob_upload(blob_upload) + + # Ensure it can no longer be found. + assert not pre_oci_model.lookup_blob_upload(repository_ref, blob_upload.upload_id) + + +def test_commit_blob_upload(pre_oci_model): + repository_ref = pre_oci_model.lookup_repository('devtable', 'simple') + blob_upload = pre_oci_model.create_blob_upload(repository_ref, str(uuid.uuid4()), + 'local_us', {'some': 'metadata'}) + + # Commit the blob upload and make sure it is written as a blob. + digest = 'sha256:' + hashlib.sha256('hello').hexdigest() + blob = pre_oci_model.commit_blob_upload(blob_upload, digest, 60) + assert blob.digest == digest + + # Ensure the upload can no longer be found. + assert not pre_oci_model.lookup_blob_upload(repository_ref, blob_upload.upload_id) diff --git a/endpoints/v1/models_interface.py b/endpoints/v1/models_interface.py index a4d4455c9..d32a00931 100644 --- a/endpoints/v1/models_interface.py +++ b/endpoints/v1/models_interface.py @@ -79,9 +79,9 @@ class DockerRegistryV1DataInterface(object): pass @abstractmethod - def update_image_sizes(self, namespace_name, repo_name, image_id, size, uncompressed_size): + def update_image_blob(self, namespace_name, repo_name, image_id, blob): """ - Updates the sizing information for the image with the given V1 Docker ID. + Updates the blob for the image with the given V1 Docker ID. """ pass diff --git a/endpoints/v1/models_pre_oci.py b/endpoints/v1/models_pre_oci.py index 0b11eccdb..badaffce0 100644 --- a/endpoints/v1/models_pre_oci.py +++ b/endpoints/v1/models_pre_oci.py @@ -1,5 +1,6 @@ from app import app, storage as store from data import model +from data.database import db_transaction from endpoints.v1.models_interface import DockerRegistryV1DataInterface, Repository from util.morecollections import AttrDict @@ -56,8 +57,8 @@ class PreOCIModel(DockerRegistryV1DataInterface): if repo_image is None or repo_image.storage is None: return + assert repo_image.storage.content_checksum == content_checksum with model.db_transaction(): - repo_image.storage.content_checksum = content_checksum repo_image.v1_checksum = checksum repo_image.storage.save() repo_image.save() @@ -77,9 +78,19 @@ class PreOCIModel(DockerRegistryV1DataInterface): repo_image.storage.save() return repo_image.storage - def update_image_sizes(self, namespace_name, repo_name, image_id, size, uncompressed_size): - model.storage.set_image_storage_metadata(image_id, namespace_name, repo_name, size, - uncompressed_size) + def update_image_blob(self, namespace_name, repo_name, image_id, blob): + # Retrieve the existing image storage record and replace it with that given by the blob. + repo_image = model.image.get_repo_image_and_storage(namespace_name, repo_name, image_id) + if repo_image is None or repo_image.storage is None or not repo_image.storage.uploading: + return False + + with db_transaction(): + existing_storage = repo_image.storage + + repo_image.storage = blob._db_id + repo_image.save() + + existing_storage.delete_instance(recursive=True) def get_image_size(self, namespace_name, repo_name, image_id): repo_image = model.image.get_repo_image_and_storage(namespace_name, repo_name, image_id) diff --git a/endpoints/v1/registry.py b/endpoints/v1/registry.py index 330774df6..749babd51 100644 --- a/endpoints/v1/registry.py +++ b/endpoints/v1/registry.py @@ -12,6 +12,8 @@ from auth.auth_context import get_authenticated_user from auth.decorators import extract_namespace_repo_from_session, process_auth from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission) from data import model, database +from data.registry_model import registry_model +from data.registry_model.blobuploader import upload_blob, BlobUploadSettings, BlobUploadException from digest import checksums from endpoints.v1 import v1_bp from endpoints.v1.models_pre_oci import pre_oci_model as model @@ -26,14 +28,6 @@ from util.registry.torrent import PieceHasher logger = logging.getLogger(__name__) -def _finish_image(namespace, repository, image_id): - # Checksum is ok, we remove the marker - blob_ref = model.update_image_uploading(namespace, repository, image_id, False) - - # Send a job to the work queue to replicate the image layer. - queue_storage_replication(namespace, blob_ref) - - def require_completion(f): """This make sure that the image push correctly finished.""" @@ -183,51 +177,44 @@ def put_image_layer(namespace, repository, image_id): # encoding (Gunicorn) input_stream = request.environ['wsgi.input'] - # Create a socket reader to read the input stream containing the layer data. - sr = SocketReader(input_stream) + repository_ref = registry_model.lookup_repository(namespace, repository) + if repository_ref is None: + abort(404) + + expiration_sec = app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'] + settings = BlobUploadSettings(maximum_blob_size=app.config['MAXIMUM_LAYER_SIZE'], + bittorrent_piece_size=app.config['BITTORRENT_PIECE_SIZE'], + committed_blob_expiration=expiration_sec) + + extra_handlers = [] # Add a handler that copies the data into a temp file. This is used to calculate the tarsum, # which is only needed for older versions of Docker. requires_tarsum = session.get('checksum_format') == 'tarsum' if requires_tarsum: tmp, tmp_hndlr = store.temp_store_handler() - sr.add_handler(tmp_hndlr) + extra_handlers.append(tmp_hndlr) - # Add a handler to compute the compressed and uncompressed sizes of the layer. - size_info, size_hndlr = gzipstream.calculate_size_handler() - sr.add_handler(size_hndlr) - - # Add a handler to hash the chunks of the upload for torrenting - piece_hasher = PieceHasher(app.config['BITTORRENT_PIECE_SIZE']) - sr.add_handler(piece_hasher.update) - - # Add a handler which computes the checksum. + # Add a handler which computes the simple Docker V1 checksum. h, sum_hndlr = checksums.simple_checksum_handler(v1_metadata.compat_json) - sr.add_handler(sum_hndlr) + extra_handlers.append(sum_hndlr) - # Add a handler which computes the content checksum only - ch, content_sum_hndlr = checksums.content_checksum_handler() - sr.add_handler(content_sum_hndlr) + uploaded_blob = None + try: + with upload_blob(repository_ref, store, settings, + extra_blob_stream_handlers=extra_handlers) as manager: + manager.upload_chunk(app.config, input_stream) + uploaded_blob = manager.commit_to_blob(app.config) + except BlobUploadException: + logger.exception('Exception when writing image data') + abort(520, 'Image %(image_id)s could not be written. Please try again.', image_id=image_id) - # Stream write the data to storage. - locations, path = model.placement_locations_and_path_docker_v1(namespace, repository, image_id) - with database.CloseForLongOperation(app.config): - try: - start_time = time() - store.stream_write(locations, path, sr) - metric_queue.chunk_size.Observe(size_info.compressed_size, labelvalues=[list(locations)[0]]) - metric_queue.chunk_upload_time.Observe(time() - start_time, labelvalues=[list(locations)[0]]) - except IOError: - logger.exception('Exception when writing image data') - abort(520, 'Image %(image_id)s could not be written. Please try again.', image_id=image_id) + # Save the blob for the image. + model.update_image_blob(namespace, repository, image_id, uploaded_blob) - # Save the size of the image. - model.update_image_sizes(namespace, repository, image_id, size_info.compressed_size, - size_info.uncompressed_size) - - # Save the BitTorrent pieces. - model.create_bittorrent_pieces(namespace, repository, image_id, - piece_hasher.final_piece_hashes()) + # Send a job to the work queue to replicate the image layer. + # TODO: move this into a better place. + queue_storage_replication(namespace, uploaded_blob) # Append the computed checksum. csums = [] @@ -245,7 +232,7 @@ def put_image_layer(namespace, repository, image_id): # We don't have a checksum stored yet, that's fine skipping the check. # Not removing the mark though, image is not downloadable yet. session['checksum'] = csums - session['content_checksum'] = 'sha256:{0}'.format(ch.hexdigest()) + session['content_checksum'] = uploaded_blob.digest return make_response('true', 200) # We check if the checksums provided matches one the one we computed @@ -254,9 +241,6 @@ def put_image_layer(namespace, repository, image_id): abort(400, 'Checksum mismatch; ignoring the layer for image %(image_id)s', issue='checksum-mismatch', image_id=image_id) - # Mark the image as uploaded. - _finish_image(namespace, repository, image_id) - return make_response('true', 200) @@ -306,20 +290,12 @@ def put_image_checksum(namespace, repository, image_id): if not v1_metadata.compat_json: abort(404, 'Image not found: %(image_id)s', issue='unknown-image', image_id=image_id) - logger.debug('Marking image path') - if not model.is_image_uploading(namespace, repository, image_id): - abort(409, 'Cannot set checksum for image %(image_id)s', issue='image-write-error', - image_id=image_id) - - logger.debug('Storing image and content checksums') - + logger.debug('Storing image and checksum') content_checksum = session.get('content_checksum', None) checksum_parts = checksum.split(':') if len(checksum_parts) != 2: abort(400, 'Invalid checksum format') - model.store_docker_v1_checksums(namespace, repository, image_id, checksum, content_checksum) - if checksum not in session.get('checksum', []): logger.debug('session checksums: %s', session.get('checksum', [])) logger.debug('client supplied checksum: %s', checksum) @@ -327,9 +303,6 @@ def put_image_checksum(namespace, repository, image_id): abort(400, 'Checksum mismatch for image: %(image_id)s', issue='checksum-mismatch', image_id=image_id) - # Mark the image as uploaded. - _finish_image(namespace, repository, image_id) - return make_response('true', 200) diff --git a/endpoints/v1/tag.py b/endpoints/v1/tag.py index acad5e303..33e02a367 100644 --- a/endpoints/v1/tag.py +++ b/endpoints/v1/tag.py @@ -98,6 +98,9 @@ def delete_tag(namespace_name, repo_name, tag): if permission.can(): repo = model.get_repository(namespace_name, repo_name) + if repo is None: + abort(403) + if repo.kind != 'image': msg = 'This repository is for managing %s resources and not container images.' % repo.kind abort(405, message=msg, namespace=namespace_name) diff --git a/storage/fakestorage.py b/storage/fakestorage.py index dfe552f08..40a4e6f62 100644 --- a/storage/fakestorage.py +++ b/storage/fakestorage.py @@ -70,7 +70,6 @@ class FakeStorage(BaseStorageV2): def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None): upload_storage = _FAKE_STORAGE_MAP[uuid] - upload_storage.seek(offset) try: return self.stream_write_to_fp(in_fp, upload_storage, length), {}, None except IOError as ex: diff --git a/test/fixtures.py b/test/fixtures.py index 3822d28f3..e442234b8 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -135,6 +135,7 @@ def appconfig(database_uri): @pytest.fixture() def initialized_db(appconfig): """ Configures the database for the database found in the appconfig. """ + under_test_real_database = bool(os.environ.get('TEST_DATABASE_URI')) # Configure the database. configure(appconfig) @@ -144,8 +145,12 @@ def initialized_db(appconfig): model._basequery.get_public_repo_visibility() model.log.get_log_entry_kinds() + if not under_test_real_database: + # Make absolutely sure foreign key constraints are on. + db.obj.execute_sql('PRAGMA foreign_keys = ON;') + assert db.obj.execute_sql('PRAGMA foreign_keys;').fetchone()[0] == 1 + # If under a test *real* database, setup a savepoint. - under_test_real_database = bool(os.environ.get('TEST_DATABASE_URI')) if under_test_real_database: with db.transaction(): test_savepoint = db.savepoint() diff --git a/test/registry/protocol_v1.py b/test/registry/protocol_v1.py index e1eb9a411..53532200b 100644 --- a/test/registry/protocol_v1.py +++ b/test/registry/protocol_v1.py @@ -14,6 +14,7 @@ class V1ProtocolSteps(Enum): GET_IMAGES = 'get-images' PUT_TAG = 'put-tag' PUT_IMAGE_JSON = 'put-image-json' + DELETE_TAG = 'delete-tag' class V1Protocol(RegistryProtocol): @@ -192,3 +193,18 @@ class V1Protocol(RegistryProtocol): expected_status=204, headers=headers) return PushResult(checksums=None, manifests=None, headers=headers) + + def delete(self, session, namespace, repo_name, tag_names, credentials=None, + expected_failure=None, options=None): + auth = self._auth_for_credentials(credentials) + tag_names = [tag_names] if isinstance(tag_names, str) else tag_names + + # Ping! + self.ping(session) + + for tag_name in tag_names: + # DELETE /v1/repositories/{namespace}/{repository}/tags/{tag} + self.conduct(session, 'DELETE', + '/v1/repositories/%s/tags/%s' % (self.repo_name(namespace, repo_name), tag_name), + auth=auth, + expected_status=(200, expected_failure, V1ProtocolSteps.DELETE_TAG)) diff --git a/test/registry/registry_tests.py b/test/registry/registry_tests.py index 270f1b44e..a7d178239 100644 --- a/test/registry/registry_tests.py +++ b/test/registry/registry_tests.py @@ -37,6 +37,21 @@ def test_basic_push_pull(pusher, puller, basic_images, liveserver_session, app_r credentials=credentials) +def test_multi_layer_images_push_pull(pusher, puller, multi_layer_images, liveserver_session, + app_reloader): + """ Test: Basic push and pull of a multi-layered image to a new repository. """ + credentials = ('devtable', 'password') + + # Push a new repository. + pusher.push(liveserver_session, 'devtable', 'newrepo', 'latest', multi_layer_images, + credentials=credentials) + + # Pull the repository to verify. + puller.pull(liveserver_session, 'devtable', 'newrepo', 'latest', multi_layer_images, + credentials=credentials) + + + def test_no_tag_manifests(pusher, puller, basic_images, liveserver_session, app_reloader, liveserver, registry_server_executor): """ Test: Basic pull without manifests. """ @@ -601,19 +616,19 @@ def test_invalid_blob_reference(manifest_protocol, basic_images, liveserver_sess expected_failure=Failures.INVALID_BLOB) -def test_delete_tag(manifest_protocol, puller, basic_images, liveserver_session, +def test_delete_tag(pusher, puller, basic_images, liveserver_session, app_reloader): """ Test: Push a repository, delete a tag, and attempt to pull. """ credentials = ('devtable', 'password') # Push the tags. - result = manifest_protocol.push(liveserver_session, 'devtable', 'newrepo', ['one', 'two'], - basic_images, credentials=credentials) + result = pusher.push(liveserver_session, 'devtable', 'newrepo', ['one', 'two'], + basic_images, credentials=credentials) - # Delete tag `one` by digest. - manifest_protocol.delete(liveserver_session, 'devtable', 'newrepo', - result.manifests['one'].digest, - credentials=credentials) + # Delete tag `one` by digest or tag. + pusher.delete(liveserver_session, 'devtable', 'newrepo', + result.manifests['one'].digest if result.manifests else 'one', + credentials=credentials) # Attempt to pull tag `one` and ensure it doesn't work. puller.pull(liveserver_session, 'devtable', 'newrepo', 'one', basic_images,