diff --git a/data/registry_model/blobuploader.py b/data/registry_model/blobuploader.py new file mode 100644 index 000000000..3d53bcfe3 --- /dev/null +++ b/data/registry_model/blobuploader.py @@ -0,0 +1,310 @@ +import logging +import time + +from contextlib import contextmanager +from collections import namedtuple + +import bitmath +import resumablehashlib + +from data.registry_model import registry_model +from data.database import CloseForLongOperation, db_transaction +from digest import digest_tools +from util.registry.filelike import wrap_with_handler, StreamSlice +from util.registry.gzipstream import calculate_size_handler +from util.registry.torrent import PieceHasher + + +logger = logging.getLogger(__name__) + + +BLOB_CONTENT_TYPE = 'application/octet-stream' + + +class BlobUploadException(Exception): + """ Base for all exceptions raised when uploading blobs. """ + +class BlobDigestMismatchException(BlobUploadException): + """ Exception raised if the digest requested does not match that of the contents uploaded. """ + +class BlobTooLargeException(BlobUploadException): + """ Exception raised if the data uploaded exceeds the maximum_blob_size. """ + def __init__(self, uploaded, max_allowed): + super(BlobTooLargeException, self).__init__() + self.uploaded = uploaded + self.max_allowed = max_allowed + + +BlobUploadSettings = namedtuple('BlobUploadSettings', ['maximum_blob_size', 'bittorrent_piece_size', + 'committed_blob_expiration']) + + +def create_blob_upload(repository_ref, storage, settings, extra_blob_stream_handlers=None): + """ Creates a new blob upload in the specified repository and returns a manager for interacting + with that upload. Returns None if a new blob upload could not be started. + """ + location_name = storage.preferred_locations[0] + new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name) + blob_upload = registry_model.create_blob_upload(repository_ref, new_upload_uuid, location_name, + upload_metadata) + if blob_upload is None: + return None + + return _BlobUploadManager(repository_ref, blob_upload, settings, storage, + extra_blob_stream_handlers) + + +def retrieve_blob_upload_manager(repository_ref, blob_upload_id, storage, settings): + """ Retrieves the manager for an in-progress blob upload with the specified ID under the given + repository or None if none. + """ + blob_upload = registry_model.lookup_blob_upload(repository_ref, blob_upload_id) + if blob_upload is None: + return None + + return _BlobUploadManager(repository_ref, blob_upload, settings, storage) + + +@contextmanager +def upload_blob(repository_ref, storage, settings, extra_blob_stream_handlers=None): + """ Starts a new blob upload in the specified repository and yields a manager for interacting + with that upload. When the context manager completes, the blob upload is deleted, whether + committed to a blob or not. Yields None if a blob upload could not be started. + """ + created = create_blob_upload(repository_ref, storage, settings, extra_blob_stream_handlers) + if not created: + yield None + return + + try: + yield created + except Exception as ex: + logger.exception('Exception when uploading blob `%s`', created.blob_upload_id) + raise ex + finally: + # Cancel the upload if something went wrong or it was not commit to a blob. + if created.committed_blob is None: + created.cancel_upload() + + +class _BlobUploadManager(object): + """ Defines a helper class for easily interacting with blob uploads in progress, including + handling of database and storage calls. + """ + def __init__(self, repository_ref, blob_upload, settings, storage, + extra_blob_stream_handlers=None): + self.repository_ref = repository_ref + self.blob_upload = blob_upload + self.settings = settings + self.storage = storage + self.extra_blob_stream_handlers = extra_blob_stream_handlers + self.committed_blob = None + + @property + def blob_upload_id(self): + """ Returns the unique ID for the blob upload. """ + return self.blob_upload.upload_id + + def upload_chunk(self, app_config, input_fp, start_offset=0, length=-1, metric_queue=None): + """ Uploads a chunk of data found in the given input file-like interface. start_offset and + length are optional and should match a range header if any was given. + + If metric_queue is given, the upload time and chunk size are written into the metrics in + the queue. + + Returns the total number of bytes uploaded after this upload has completed. Raises + a BlobUploadException if the upload failed. + """ + assert start_offset is not None + assert length is not None + + if start_offset > 0 and start_offset > self.blob_upload.byte_count: + logger.error('start_offset provided greater than blob_upload.byte_count') + return None + + # Ensure that we won't go over the allowed maximum size for blobs. + max_blob_size = bitmath.parse_string_unsafe(self.settings.maximum_blob_size) + uploaded = bitmath.Byte(length + start_offset) + if length > -1 and uploaded > max_blob_size: + raise BlobTooLargeException(uploaded=uploaded.bytes, max_allowed=max_blob_size.bytes) + + location_set = {self.blob_upload.location_name} + upload_error = None + with CloseForLongOperation(app_config): + if start_offset > 0 and start_offset < self.blob_upload.byte_count: + # Skip the bytes which were received on a previous push, which are already stored and + # included in the sha calculation + overlap_size = self.blob_upload.byte_count - start_offset + input_fp = StreamSlice(input_fp, overlap_size) + + # Update our upload bounds to reflect the skipped portion of the overlap + start_offset = self.blob_upload.byte_count + length = max(length - overlap_size, 0) + + # We use this to escape early in case we have already processed all of the bytes the user + # wants to upload. + if length == 0: + return self.blob_upload.byte_count + + input_fp = wrap_with_handler(input_fp, self.blob_upload.sha_state.update) + + if self.extra_blob_stream_handlers: + for handler in self.extra_blob_stream_handlers: + input_fp = wrap_with_handler(input_fp, handler) + + # Add a hasher for calculating SHA1s for torrents if this is the first chunk and/or we have + # already calculated hash data for the previous chunk(s). + piece_hasher = None + if self.blob_upload.chunk_count == 0 or self.blob_upload.piece_sha_state: + initial_sha1_value = self.blob_upload.piece_sha_state or resumablehashlib.sha1() + initial_sha1_pieces_value = self.blob_upload.piece_hashes or '' + + piece_hasher = PieceHasher(self.settings.bittorrent_piece_size, start_offset, + initial_sha1_pieces_value, initial_sha1_value) + input_fp = wrap_with_handler(input_fp, piece_hasher.update) + + # If this is the first chunk and we're starting at the 0 offset, add a handler to gunzip the + # stream so we can determine the uncompressed size. We'll throw out this data if another chunk + # comes in, but in the common case the docker client only sends one chunk. + size_info = None + if start_offset == 0 and self.blob_upload.chunk_count == 0: + size_info, fn = calculate_size_handler() + input_fp = wrap_with_handler(input_fp, fn) + + start_time = time.time() + length_written, new_metadata, upload_error = self.storage.stream_upload_chunk( + location_set, + self.blob_upload.upload_id, + start_offset, + length, + input_fp, + self.blob_upload.storage_metadata, + content_type=BLOB_CONTENT_TYPE, + ) + + if upload_error is not None: + logger.error('storage.stream_upload_chunk returned error %s', upload_error) + raise BlobUploadException(upload_error) + + # Update the chunk upload time metric. + if metric_queue is not None: + metric_queue.chunk_upload_time.Observe(time.time() - start_time, labelvalues=[ + length_written, list(location_set)[0]]) + + # Ensure we have not gone beyond the max layer size. + new_blob_bytes = self.blob_upload.byte_count + length_written + new_blob_size = bitmath.Byte(new_blob_bytes) + if new_blob_size > max_blob_size: + raise BlobTooLargeException(uploaded=new_blob_size, max_allowed=max_blob_size.bytes) + + # If we determined an uncompressed size and this is the first chunk, add it to the blob. + # Otherwise, we clear the size from the blob as it was uploaded in multiple chunks. + uncompressed_byte_count = self.blob_upload.uncompressed_byte_count + if size_info is not None and self.blob_upload.chunk_count == 0 and size_info.is_valid: + uncompressed_byte_count = size_info.uncompressed_size + elif length_written > 0: + # Otherwise, if we wrote some bytes and the above conditions were not met, then we don't + # know the uncompressed size. + uncompressed_byte_count = None + + piece_hashes = None + piece_sha_state = None + if piece_hasher is not None: + piece_hashes = piece_hasher.piece_hashes + piece_sha_state = piece_hasher.hash_fragment + + self.blob_upload = registry_model.update_blob_upload(self.blob_upload, + uncompressed_byte_count, + piece_hashes, + piece_sha_state, + new_metadata, + new_blob_bytes, + self.blob_upload.chunk_count + 1, + self.blob_upload.sha_state) + if self.blob_upload is None: + raise BlobUploadException('Could not complete upload of chunk') + + return new_blob_bytes + + def cancel_upload(self): + """ Cancels the blob upload, deleting any data uploaded and removing the upload itself. """ + # Tell storage to cancel the chunked upload, deleting its contents. + self.storage.cancel_chunked_upload({self.blob_upload.location_name}, + self.blob_upload.upload_id, + self.blob_upload.storage_metadata) + + # Remove the blob upload record itself. + registry_model.delete_blob_upload(self.blob_upload) + + def commit_to_blob(self, app_config, expected_digest=None): + """ Commits the blob upload to a blob under the repository. The resulting blob will be marked + to not be GCed for some period of time (as configured by `committed_blob_expiration`). + + If expected_digest is specified, the content digest of the data uploaded for the blob is + compared to that given and, if it does not match, a BlobDigestMismatchException is + raised. The digest given must be of type `Digest` and not a string. + """ + # Compare the content digest. + if expected_digest is not None: + self._validate_digest(expected_digest) + + # Finalize the storage. + storage_already_existed = self._finalize_blob_storage(app_config) + + # Convert the upload to a blob. + computed_digest_str = digest_tools.sha256_digest_from_hashlib(self.blob_upload.sha_state) + + with db_transaction(): + blob = registry_model.commit_blob_upload(self.blob_upload, computed_digest_str, + self.settings.committed_blob_expiration) + if blob is None: + return None + + # Save torrent hash information (if available). + if self.blob_upload.piece_sha_state is not None and not storage_already_existed: + piece_bytes = self.blob_upload.piece_hashes + self.blob_upload.piece_sha_state.digest() + registry_model.set_torrent_info(blob, self.settings.bittorrent_piece_size, piece_bytes) + + self.committed_blob = blob + return blob + + def _validate_digest(self, expected_digest): + """ + Verifies that the digest's SHA matches that of the uploaded data. + """ + computed_digest = digest_tools.sha256_digest_from_hashlib(self.blob_upload.sha_state) + if not digest_tools.digests_equal(computed_digest, expected_digest): + logger.error('Digest mismatch for upload %s: Expected digest %s, found digest %s', + self.blob_upload.upload_id, expected_digest, computed_digest) + raise BlobDigestMismatchException() + + def _finalize_blob_storage(self, app_config): + """ + When an upload is successful, this ends the uploading process from the + storage's perspective. + + Returns True if the blob already existed. + """ + computed_digest = digest_tools.sha256_digest_from_hashlib(self.blob_upload.sha_state) + final_blob_location = digest_tools.content_path(computed_digest) + + # Close the database connection before we perform this operation, as it can take a while + # and we shouldn't hold the connection during that time. + with CloseForLongOperation(app_config): + # Move the storage into place, or if this was a re-upload, cancel it + already_existed = self.storage.exists({self.blob_upload.location_name}, final_blob_location) + if already_existed: + # It already existed, clean up our upload which served as proof that the + # uploader had the blob. + self.storage.cancel_chunked_upload({self.blob_upload.location_name}, + self.blob_upload.upload_id, + self.blob_upload.storage_metadata) + else: + # We were the first ones to upload this image (at least to this location) + # Let's copy it into place + self.storage.complete_chunked_upload({self.blob_upload.location_name}, + self.blob_upload.upload_id, + final_blob_location, + self.blob_upload.storage_metadata) + + return already_existed diff --git a/data/registry_model/test/test_blobuploader.py b/data/registry_model/test/test_blobuploader.py new file mode 100644 index 000000000..72dccb5f9 --- /dev/null +++ b/data/registry_model/test/test_blobuploader.py @@ -0,0 +1,115 @@ +import hashlib +import os + +from io import BytesIO + +import pytest + +from data.registry_model.datatypes import RepositoryReference +from data.registry_model.blobuploader import (create_blob_upload, retrieve_blob_upload_manager, + upload_blob, BlobUploadException, + BlobDigestMismatchException, BlobTooLargeException, + BlobUploadSettings) +from data.registry_model.registry_pre_oci_model import PreOCIModel + +from storage.distributedstorage import DistributedStorage +from storage.fakestorage import FakeStorage +from test.fixtures import * + +@pytest.fixture() +def pre_oci_model(initialized_db): + return PreOCIModel() + +@pytest.mark.parametrize('chunk_count', [ + 0, + 1, + 2, + 10, +]) +@pytest.mark.parametrize('subchunk', [ + True, + False, +]) +def test_basic_upload_blob(chunk_count, subchunk, pre_oci_model): + repository_ref = pre_oci_model.lookup_repository('devtable', 'complex') + storage = DistributedStorage({'local_us': FakeStorage(None)}, ['local_us']) + settings = BlobUploadSettings('2M', 512 * 1024, 3600) + app_config = {'TESTING': True} + + data = '' + with upload_blob(repository_ref, storage, settings) as manager: + assert manager + assert manager.blob_upload_id + + for index in range(0, chunk_count): + chunk_data = os.urandom(100) + data += chunk_data + + if subchunk: + manager.upload_chunk(app_config, BytesIO(chunk_data)) + manager.upload_chunk(app_config, BytesIO(chunk_data), (index * 100) + 50) + else: + manager.upload_chunk(app_config, BytesIO(chunk_data)) + + blob = manager.commit_to_blob(app_config) + + # Check the blob. + assert blob.compressed_size == len(data) + assert not blob.uploading + assert blob.digest == 'sha256:' + hashlib.sha256(data).hexdigest() + + # Ensure the blob exists in storage and has the expected data. + assert storage.get_content(['local_us'], blob.storage_path) == data + + +def test_cancel_upload(pre_oci_model): + repository_ref = pre_oci_model.lookup_repository('devtable', 'complex') + storage = DistributedStorage({'local_us': FakeStorage(None)}, ['local_us']) + settings = BlobUploadSettings('2M', 512 * 1024, 3600) + app_config = {'TESTING': True} + + blob_upload_id = None + with upload_blob(repository_ref, storage, settings) as manager: + blob_upload_id = manager.blob_upload_id + assert pre_oci_model.lookup_blob_upload(repository_ref, blob_upload_id) is not None + + manager.upload_chunk(app_config, BytesIO('hello world')) + + # Since the blob was not comitted, the upload should be deleted. + assert blob_upload_id + assert pre_oci_model.lookup_blob_upload(repository_ref, blob_upload_id) is None + + +def test_too_large(pre_oci_model): + repository_ref = pre_oci_model.lookup_repository('devtable', 'complex') + storage = DistributedStorage({'local_us': FakeStorage(None)}, ['local_us']) + settings = BlobUploadSettings('1K', 512 * 1024, 3600) + app_config = {'TESTING': True} + + with upload_blob(repository_ref, storage, settings) as manager: + with pytest.raises(BlobTooLargeException): + manager.upload_chunk(app_config, BytesIO(os.urandom(1024 * 1024 * 2))) + + +def test_extra_blob_stream_handlers(pre_oci_model): + handler1_result = [] + handler2_result = [] + + def handler1(bytes): + handler1_result.append(bytes) + + def handler2(bytes): + handler2_result.append(bytes) + + repository_ref = pre_oci_model.lookup_repository('devtable', 'complex') + storage = DistributedStorage({'local_us': FakeStorage(None)}, ['local_us']) + settings = BlobUploadSettings('1K', 512 * 1024, 3600) + app_config = {'TESTING': True} + + with upload_blob(repository_ref, storage, settings, + extra_blob_stream_handlers=[handler1, handler2]) as manager: + manager.upload_chunk(app_config, BytesIO('hello ')) + manager.upload_chunk(app_config, BytesIO('world')) + + assert ''.join(handler1_result) == 'hello world' + assert ''.join(handler2_result) == 'hello world' diff --git a/storage/fakestorage.py b/storage/fakestorage.py index dfe552f08..40a4e6f62 100644 --- a/storage/fakestorage.py +++ b/storage/fakestorage.py @@ -70,7 +70,6 @@ class FakeStorage(BaseStorageV2): def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None): upload_storage = _FAKE_STORAGE_MAP[uuid] - upload_storage.seek(offset) try: return self.stream_write_to_fp(in_fp, upload_storage, length), {}, None except IOError as ex: