diff --git a/data/registry_model/blobuploader.py b/data/registry_model/blobuploader.py index 3d53bcfe3..a356f78e6 100644 --- a/data/registry_model/blobuploader.py +++ b/data/registry_model/blobuploader.py @@ -64,6 +64,20 @@ def retrieve_blob_upload_manager(repository_ref, blob_upload_id, storage, settin return _BlobUploadManager(repository_ref, blob_upload, settings, storage) +@contextmanager +def complete_when_uploaded(blob_upload): + """ Wraps the given blob upload in a context manager that completes the upload when the context + closes. + """ + try: + yield blob_upload + except Exception as ex: + logger.exception('Exception when uploading blob `%s`', blob_upload.blob_upload_id) + raise ex + finally: + # Cancel the upload if something went wrong or it was not commit to a blob. + if blob_upload.committed_blob is None: + blob_upload.cancel_upload() @contextmanager def upload_blob(repository_ref, storage, settings, extra_blob_stream_handlers=None): @@ -120,7 +134,7 @@ class _BlobUploadManager(object): if start_offset > 0 and start_offset > self.blob_upload.byte_count: logger.error('start_offset provided greater than blob_upload.byte_count') - return None + raise BlobUploadException() # Ensure that we won't go over the allowed maximum size for blobs. max_blob_size = bitmath.parse_string_unsafe(self.settings.maximum_blob_size) diff --git a/data/registry_model/interface.py b/data/registry_model/interface.py index 099851d67..c5a1d126d 100644 --- a/data/registry_model/interface.py +++ b/data/registry_model/interface.py @@ -266,7 +266,7 @@ class RegistryDataInterface(object): Mounts the blob from another repository into the specified target repository, and adds an expiration before that blob is automatically GCed. This function is useful during push operations if an existing blob from another repositroy is being pushed. Returns False if - the mounting fails. + the mounting fails. Note that this function does *not* check security for mounting the blob. """ @abstractmethod diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index ccff226d5..49eddbcab 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -1,30 +1,24 @@ import logging import re -import time from flask import url_for, request, redirect, Response, abort as flask_abort -import bitmath -import resumablehashlib - -from app import storage, app, get_app_url, metric_queue, model_cache +from app import storage, app, get_app_url, metric_queue from auth.registry_jwt_auth import process_registry_jwt_auth from auth.permissions import ReadRepositoryPermission from data import database -from data.cache import cache_key +from data.registry_model import registry_model +from data.registry_model.blobuploader import (create_blob_upload, retrieve_blob_upload_manager, + complete_when_uploaded, BlobUploadSettings, + BlobUploadException, BlobTooLargeException) from digest import digest_tools from endpoints.decorators import anon_protect, parse_repository_name from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream from endpoints.v2.errors import ( BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported, NameUnknown, LayerTooLarge, InvalidRequest) -from endpoints.v2.models_interface import Blob -from endpoints.v2.models_pre_oci import data_model as model from util.cache import cache_control from util.names import parse_namespace_repository -from util.registry.filelike import wrap_with_handler, StreamSlice -from util.registry.gzipstream import calculate_size_handler -from util.registry.torrent import PieceHasher logger = logging.getLogger(__name__) @@ -38,23 +32,6 @@ class _InvalidRangeHeader(Exception): pass -def _get_repository_blob(namespace_name, repo_name, digest): - """ Returns the blob with the given digest under the repository with the given - name. If one does not exist (or it is still uploading), returns None. - Automatically handles caching. - """ - def load_blob(): - blob = model.get_blob_by_digest(namespace_name, repo_name, digest) - if blob is None: - return None - - return blob._asdict() - - blob_cache_key = cache_key.for_repository_blob(namespace_name, repo_name, digest) - blob_dict = model_cache.retrieve(blob_cache_key, load_blob) - return Blob(**blob_dict) if blob_dict is not None else None - - @v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD']) @parse_repository_name() @process_registry_jwt_auth(scopes=['pull']) @@ -62,20 +39,24 @@ def _get_repository_blob(namespace_name, repo_name, digest): @anon_protect @cache_control(max_age=31436000) def check_blob_exists(namespace_name, repo_name, digest): + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + raise NameUnknown() + # Find the blob. - blob = _get_repository_blob(namespace_name, repo_name, digest) + blob = registry_model.get_repo_blob_by_digest(repository_ref, digest, include_placements=True) if blob is None: raise BlobUnknown() # Build the response headers. headers = { 'Docker-Content-Digest': digest, - 'Content-Length': blob.size, + 'Content-Length': blob.compressed_size, 'Content-Type': BLOB_CONTENT_TYPE, } # If our storage supports range requests, let the client know. - if storage.get_supports_resumable_downloads(blob.locations): + if storage.get_supports_resumable_downloads(blob.placements): headers['Accept-Ranges'] = 'bytes' # Write the response to the client. @@ -89,8 +70,12 @@ def check_blob_exists(namespace_name, repo_name, digest): @anon_protect @cache_control(max_age=31536000) def download_blob(namespace_name, repo_name, digest): + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + raise NameUnknown() + # Find the blob. - blob = _get_repository_blob(namespace_name, repo_name, digest) + blob = registry_model.get_repo_blob_by_digest(repository_ref, digest, include_placements=True) if blob is None: raise BlobUnknown() @@ -98,15 +83,13 @@ def download_blob(namespace_name, repo_name, digest): headers = {'Docker-Content-Digest': digest} # If our storage supports range requests, let the client know. - if storage.get_supports_resumable_downloads(blob.locations): + if storage.get_supports_resumable_downloads(blob.placements): headers['Accept-Ranges'] = 'bytes' - # Find the storage path for the blob. - path = model.get_blob_path(blob) - # Short-circuit by redirecting if the storage supports it. + path = blob.storage_path logger.debug('Looking up the direct download URL for path: %s', path) - direct_download_url = storage.get_direct_download_url(blob.locations, path, request.remote_addr) + direct_download_url = storage.get_direct_download_url(blob.placements, path, request.remote_addr) if direct_download_url: logger.debug('Returning direct download URL') resp = redirect(direct_download_url) @@ -118,63 +101,77 @@ def download_blob(namespace_name, repo_name, digest): with database.CloseForLongOperation(app.config): # Stream the response to the client. return Response( - storage.stream_read(blob.locations, path), + storage.stream_read(blob.placements, path), headers=headers.update({ - 'Content-Length': blob.size, - 'Content-Type': BLOB_CONTENT_TYPE,}),) + 'Content-Length': blob.compressed_size, + 'Content-Type': BLOB_CONTENT_TYPE, + }), + ) -def _try_to_mount_blob(namespace_name, repo_name, mount_blob_digest): +def _try_to_mount_blob(repository_ref, mount_blob_digest): """ Attempts to mount a blob requested by the user from another repository. """ - logger.debug('Got mount request for blob `%s` into `%s/%s`', mount_blob_digest, namespace_name, - repo_name) + logger.debug('Got mount request for blob `%s` into `%s`', mount_blob_digest, repository_ref) from_repo = request.args.get('from', None) if from_repo is None: raise InvalidRequest # Ensure the user has access to the repository. - logger.debug('Got mount request for blob `%s` under repository `%s` into `%s/%s`', - mount_blob_digest, from_repo, namespace_name, repo_name) + logger.debug('Got mount request for blob `%s` under repository `%s` into `%s`', + mount_blob_digest, from_repo, repository_ref) from_namespace, from_repo_name = parse_namespace_repository(from_repo, app.config['LIBRARY_NAMESPACE'], include_tag=False) - # First check permission. This does not hit the DB so we do it first. + from_repository_ref = registry_model.lookup_repository(from_namespace, from_repo_name) + if from_repository_ref is None: + logger.debug('Could not find from repo: `%s/%s`', from_namespace, from_repo_name) + return None + + # First check permission. read_permission = ReadRepositoryPermission(from_namespace, from_repo_name).can() if not read_permission: # If no direct permission, check if the repostory is public. - if not model.is_repository_public(from_namespace, from_repo_name): - logger.debug('No permission to mount blob `%s` under repository `%s` into `%s/%s`', - mount_blob_digest, from_repo, namespace_name, repo_name) + if not from_repository_ref.is_public: + logger.debug('No permission to mount blob `%s` under repository `%s` into `%s`', + mount_blob_digest, from_repo, repository_ref) return None # Lookup if the mount blob's digest exists in the repository. - mount_blob = model.get_blob_by_digest(from_namespace, from_repo_name, mount_blob_digest) + mount_blob = registry_model.get_repo_blob_by_digest(from_repository_ref, mount_blob_digest) if mount_blob is None: logger.debug('Blob `%s` under repository `%s` not found', mount_blob_digest, from_repo) return None - logger.debug('Mounting blob `%s` under repository `%s` into `%s/%s`', mount_blob_digest, - from_repo, namespace_name, repo_name) + logger.debug('Mounting blob `%s` under repository `%s` into `%s`', mount_blob_digest, + from_repo, repository_ref) # Mount the blob into the current repository and return that we've completed the operation. expiration_sec = app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'] - if not model.mount_blob_and_temp_tag(namespace_name, repo_name, mount_blob, expiration_sec): + mounted = registry_model.mount_blob_into_repository(mount_blob, repository_ref, expiration_sec) + if not mounted: logger.debug('Could not mount blob `%s` under repository `%s` not found', mount_blob_digest, from_repo) return # Return the response for the blob indicating that it was mounted, and including its content # digest. - logger.debug('Mounted blob `%s` under repository `%s` into `%s/%s`', mount_blob_digest, - from_repo, namespace_name, repo_name) + logger.debug('Mounted blob `%s` under repository `%s` into `%s`', mount_blob_digest, + from_repo, repository_ref) + + namespace_name = repository_ref.namespace_name + repo_name = repository_ref.name + return Response( status=201, headers={ 'Docker-Content-Digest': mount_blob_digest, 'Location': - get_app_url() + url_for('v2.download_blob', repository='%s/%s' % - (namespace_name, repo_name), digest=mount_blob_digest),},) + get_app_url() + url_for('v2.download_blob', + repository='%s/%s' % (namespace_name, repo_name), + digest=mount_blob_digest), + }, + ) @v2_bp.route('//blobs/uploads/', methods=['POST']) @@ -183,63 +180,56 @@ def _try_to_mount_blob(namespace_name, repo_name, mount_blob_digest): @require_repo_write @anon_protect def start_blob_upload(namespace_name, repo_name): - # Begin the blob upload process in the database and storage. - location_name = storage.preferred_locations[0] - new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name) - repository_exists = model.create_blob_upload(namespace_name, repo_name, new_upload_uuid, - location_name, upload_metadata) - if not repository_exists: + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: raise NameUnknown() # Check for mounting of a blob from another repository. mount_blob_digest = request.args.get('mount', None) if mount_blob_digest is not None: - response = _try_to_mount_blob(namespace_name, repo_name, mount_blob_digest) + response = _try_to_mount_blob(repository_ref, mount_blob_digest) if response is not None: return response - # Check for a normal blob upload. + # Begin the blob upload process. + blob_uploader = create_blob_upload(repository_ref, storage, _upload_settings()) + if blob_uploader is None: + logger.debug('Could not create a blob upload for `%s/%s`', namespace_name, repo_name) + raise InvalidRequest() + + # Check if the blob will be uploaded now or in followup calls. If the `digest` is given, then + # the upload will occur as a monolithic chunk in this call. Ohterwise, we return a redirect + # for the client to upload the chunks as distinct operations. digest = request.args.get('digest', None) if digest is None: # Short-circuit because the user will send the blob data in another request. return Response( status=202, headers={ - 'Docker-Upload-UUID': - new_upload_uuid, - 'Range': - _render_range(0), + 'Docker-Upload-UUID': blob_uploader.blob_upload_id, + 'Range': _render_range(0), 'Location': - get_app_url() + url_for('v2.upload_chunk', repository='%s/%s' % - (namespace_name, repo_name), upload_uuid=new_upload_uuid)},) + get_app_url() + url_for('v2.upload_chunk', + repository='%s/%s' % (namespace_name, repo_name), + upload_uuid=blob_uploader.blob_upload_id) + }, + ) - # The user plans to send us the entire body right now. - # Find the upload. - blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, new_upload_uuid) - if blob_upload is None: - raise BlobUploadUnknown() - - # Upload the chunk to storage while calculating some metadata and updating - # the upload state. - updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range')) - if updated_blob_upload is None: - _abort_range_not_satisfiable(blob_upload.byte_count, new_upload_uuid) - - # Save the upload state to the database. - model.update_blob_upload(updated_blob_upload) - - # Finalize the upload process in the database and storage. - _finish_upload(namespace_name, repo_name, updated_blob_upload, digest) + # Upload the data sent and commit it to a blob. + with complete_when_uploaded(blob_uploader): + _upload_chunk(blob_uploader, digest) # Write the response to the client. return Response( status=201, headers={ - 'Docker-Content-Digest': - digest, + 'Docker-Content-Digest': digest, 'Location': - get_app_url() + url_for('v2.download_blob', repository='%s/%s' % - (namespace_name, repo_name), digest=digest),},) + get_app_url() + url_for('v2.download_blob', + repository='%s/%s' % (namespace_name, repo_name), + digest=digest), + }, + ) @v2_bp.route('//blobs/uploads/', methods=['GET']) @@ -248,16 +238,21 @@ def start_blob_upload(namespace_name, repo_name): @require_repo_write @anon_protect def fetch_existing_upload(namespace_name, repo_name, upload_uuid): - blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid) - if blob_upload is None: + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + raise NameUnknown() + + uploader = retrieve_blob_upload_manager(repository_ref, upload_uuid, storage, _upload_settings()) + if uploader is None: raise BlobUploadUnknown() return Response( status=204, headers={ 'Docker-Upload-UUID': upload_uuid, - 'Range': _render_range(blob_upload.byte_count + 1), # byte ranges are exclusive - },) + 'Range': _render_range(uploader.blob_upload.byte_count + 1), # byte ranges are exclusive + }, + ) @v2_bp.route('//blobs/uploads/', methods=['PATCH']) @@ -266,27 +261,26 @@ def fetch_existing_upload(namespace_name, repo_name, upload_uuid): @require_repo_write @anon_protect def upload_chunk(namespace_name, repo_name, upload_uuid): - # Find the upload. - blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid) - if blob_upload is None: + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + raise NameUnknown() + + uploader = retrieve_blob_upload_manager(repository_ref, upload_uuid, storage, _upload_settings()) + if uploader is None: raise BlobUploadUnknown() - # Upload the chunk to storage while calculating some metadata and updating - # the upload state. - updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range')) - if updated_blob_upload is None: - _abort_range_not_satisfiable(blob_upload.byte_count, upload_uuid) - - # Save the upload state to the database. - model.update_blob_upload(updated_blob_upload) + # Upload the chunk for the blob. + _upload_chunk(uploader) # Write the response to the client. return Response( status=204, headers={ 'Location': _current_request_url(), - 'Range': _render_range(updated_blob_upload.byte_count, with_bytes_prefix=False), - 'Docker-Upload-UUID': upload_uuid,},) + 'Range': _render_range(uploader.blob_upload.byte_count, with_bytes_prefix=False), + 'Docker-Upload-UUID': upload_uuid, + }, + ) @v2_bp.route('//blobs/uploads/', methods=['PUT']) @@ -301,26 +295,27 @@ def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid): raise BlobUploadInvalid(detail={'reason': 'Missing digest arg on monolithic upload'}) # Find the upload. - blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid) - if blob_upload is None: + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + raise NameUnknown() + + uploader = retrieve_blob_upload_manager(repository_ref, upload_uuid, storage, _upload_settings()) + if uploader is None: raise BlobUploadUnknown() - # Upload the chunk to storage while calculating some metadata and updating - # the upload state. - updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range')) - if updated_blob_upload is None: - _abort_range_not_satisfiable(blob_upload.byte_count, upload_uuid) - - # Finalize the upload process in the database and storage. - _finish_upload(namespace_name, repo_name, updated_blob_upload, digest) + # Upload the chunk for the blob and commit it once complete. + with complete_when_uploaded(uploader): + _upload_chunk(uploader, digest) # Write the response to the client. return Response(status=201, headers={ - 'Docker-Content-Digest': - digest, + 'Docker-Content-Digest': digest, 'Location': - get_app_url() + url_for('v2.download_blob', repository='%s/%s' % - (namespace_name, repo_name), digest=digest),}) + get_app_url() + url_for('v2.download_blob', + repository='%s/%s' % (namespace_name, repo_name), + digest=digest), + }, + ) @v2_bp.route('//blobs/uploads/', methods=['DELETE']) @@ -329,16 +324,15 @@ def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid): @require_repo_write @anon_protect def cancel_upload(namespace_name, repo_name, upload_uuid): - blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid) - if blob_upload is None: + repository_ref = registry_model.lookup_repository(namespace_name, repo_name) + if repository_ref is None: + raise NameUnknown() + + uploader = retrieve_blob_upload_manager(repository_ref, upload_uuid, storage, _upload_settings()) + if uploader is None: raise BlobUploadUnknown() - # We delete the record for the upload first, since if the partial upload in - # storage fails to delete, it doesn't break anything. - model.delete_blob_upload(namespace_name, repo_name, upload_uuid) - storage.cancel_chunked_upload({blob_upload.location_name}, blob_upload.uuid, - blob_upload.storage_metadata) - + uploader.cancel_upload() return Response(status=204) @@ -413,182 +407,36 @@ def _start_offset_and_length(range_header): return start_offset, length -def _upload_chunk(blob_upload, range_header): +def _upload_settings(): + """ Returns the settings for instantiating a blob upload manager. """ + expiration_sec = app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'] + settings = BlobUploadSettings(maximum_blob_size=app.config['MAXIMUM_LAYER_SIZE'], + bittorrent_piece_size=app.config['BITTORRENT_PIECE_SIZE'], + committed_blob_expiration=expiration_sec) + return settings + + +def _upload_chunk(blob_uploader, commit_digest=None): + """ Performs uploading of a chunk of data in the current request's stream, via the blob uploader + given. If commit_digest is specified, the upload is committed to a blob once the stream's + data has been read and stored. """ - Calculates metadata while uploading a chunk to storage. + start_offset, length = _start_offset_and_length(request.headers.get('range')) + if None in {start_offset, length}: + raise InvalidRequest() - Returns a BlobUpload object or None if there was a failure. - """ - max_layer_size = bitmath.parse_string_unsafe(app.config['MAXIMUM_LAYER_SIZE']) + input_fp = get_input_stream(request) - # Get the offset and length of the current chunk. - start_offset, length = _start_offset_and_length(range_header) - if blob_upload is None or None in {start_offset, length}: - logger.error('Invalid arguments provided to _upload_chunk') - return None + try: + # Upload the data received. + blob_uploader.upload_chunk(app.config, input_fp, start_offset, length, metric_queue) - if start_offset > 0 and start_offset > blob_upload.byte_count: - logger.error('start_offset provided to _upload_chunk greater than blob.upload.byte_count') - return None - - # Check if we should raise 413 before accepting the data. - uploaded = bitmath.Byte(length + start_offset) - if length > -1 and uploaded > max_layer_size: - raise LayerTooLarge(uploaded=uploaded.bytes, max_allowed=max_layer_size.bytes) - - location_set = {blob_upload.location_name} - - upload_error = None - with database.CloseForLongOperation(app.config): - input_fp = get_input_stream(request) - - if start_offset > 0 and start_offset < blob_upload.byte_count: - # Skip the bytes which were received on a previous push, which are already stored and - # included in the sha calculation - overlap_size = blob_upload.byte_count - start_offset - input_fp = StreamSlice(input_fp, overlap_size) - - # Update our upload bounds to reflect the skipped portion of the overlap - start_offset = blob_upload.byte_count - length = max(length - overlap_size, 0) - - # We use this to escape early in case we have already processed all of the bytes the user - # wants to upload - if length == 0: - return blob_upload - - input_fp = wrap_with_handler(input_fp, blob_upload.sha_state.update) - - # Add a hasher for calculating SHA1s for torrents if this is the first chunk and/or we have - # already calculated hash data for the previous chunk(s). - piece_hasher = None - if blob_upload.chunk_count == 0 or blob_upload.piece_sha_state: - initial_sha1_value = blob_upload.piece_sha_state or resumablehashlib.sha1() - initial_sha1_pieces_value = blob_upload.piece_hashes or '' - - piece_hasher = PieceHasher(app.config['BITTORRENT_PIECE_SIZE'], start_offset, - initial_sha1_pieces_value, initial_sha1_value) - - input_fp = wrap_with_handler(input_fp, piece_hasher.update) - - # If this is the first chunk and we're starting at the 0 offset, add a handler to gunzip the - # stream so we can determine the uncompressed size. We'll throw out this data if another chunk - # comes in, but in the common case the docker client only sends one chunk. - size_info = None - if start_offset == 0 and blob_upload.chunk_count == 0: - size_info, fn = calculate_size_handler() - input_fp = wrap_with_handler(input_fp, fn) - - start_time = time.time() - length_written, new_metadata, upload_error = storage.stream_upload_chunk( - location_set, - blob_upload.uuid, - start_offset, - length, - input_fp, - blob_upload.storage_metadata, - content_type=BLOB_CONTENT_TYPE,) - - if upload_error is not None: - logger.error('storage.stream_upload_chunk returned error %s', upload_error) - return None - - # Update the chunk upload time metric. - metric_queue.chunk_upload_time.Observe(time.time() - start_time, labelvalues=[ - length_written, list(location_set)[0]]) - - # If we determined an uncompressed size and this is the first chunk, add it to the blob. - # Otherwise, we clear the size from the blob as it was uploaded in multiple chunks. - if size_info is not None and blob_upload.chunk_count == 0 and size_info.is_valid: - blob_upload.uncompressed_byte_count = size_info.uncompressed_size - elif length_written > 0: - # Otherwise, if we wrote some bytes and the above conditions were not met, then we don't - # know the uncompressed size. - blob_upload.uncompressed_byte_count = None - - if piece_hasher is not None: - blob_upload.piece_hashes = piece_hasher.piece_hashes - blob_upload.piece_sha_state = piece_hasher.hash_fragment - - blob_upload.storage_metadata = new_metadata - blob_upload.byte_count += length_written - blob_upload.chunk_count += 1 - - # Ensure we have not gone beyond the max layer size. - upload_size = bitmath.Byte(blob_upload.byte_count) - if upload_size > max_layer_size: - raise LayerTooLarge(uploaded=upload_size.bytes, max_allowed=max_layer_size.bytes) - - return blob_upload - - -def _validate_digest(blob_upload, expected_digest): - """ - Verifies that the digest's SHA matches that of the uploaded data. - """ - computed_digest = digest_tools.sha256_digest_from_hashlib(blob_upload.sha_state) - if not digest_tools.digests_equal(computed_digest, expected_digest): - logger.error('Digest mismatch for upload %s: Expected digest %s, found digest %s', - blob_upload.uuid, expected_digest, computed_digest) - raise BlobUploadInvalid(detail={'reason': 'Digest mismatch on uploaded blob'}) - - -def _finalize_blob_storage(blob_upload, expected_digest): - """ - When an upload is successful, this ends the uploading process from the - storage's perspective. - - Returns True if the blob already existed. - """ - final_blob_location = digest_tools.content_path(expected_digest) - - # Move the storage into place, or if this was a re-upload, cancel it - with database.CloseForLongOperation(app.config): - already_existed = storage.exists({blob_upload.location_name}, final_blob_location) - if already_existed: - # It already existed, clean up our upload which served as proof that the - # uploader had the blob. - storage.cancel_chunked_upload({blob_upload.location_name}, blob_upload.uuid, - blob_upload.storage_metadata) - - else: - # We were the first ones to upload this image (at least to this location) - # Let's copy it into place - storage.complete_chunked_upload({blob_upload.location_name}, blob_upload.uuid, - final_blob_location, blob_upload.storage_metadata) - return already_existed - - -def _finalize_blob_database(namespace_name, repo_name, blob_upload, digest, already_existed): - """ - When an upload is successful, this ends the uploading process from the - database's perspective. - """ - # Create the blob and temporarily tag it. - blob_storage = model.create_blob_and_temp_tag( - namespace_name, - repo_name, - digest, - blob_upload, - app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'],) - - # If it doesn't already exist, create the BitTorrent pieces for the blob. - if blob_upload.piece_sha_state is not None and not already_existed: - piece_bytes = blob_upload.piece_hashes + blob_upload.piece_sha_state.digest() - model.save_bittorrent_pieces(blob_storage, app.config['BITTORRENT_PIECE_SIZE'], piece_bytes) - - # Delete the blob upload. - model.delete_blob_upload(namespace_name, repo_name, blob_upload.uuid) - - -def _finish_upload(namespace_name, repo_name, blob_upload, digest): - """ - When an upload is successful, this ends the uploading process. - """ - _validate_digest(blob_upload, digest) - _finalize_blob_database( - namespace_name, - repo_name, - blob_upload, - digest, - _finalize_blob_storage(blob_upload, digest),) + if commit_digest is not None: + # Commit the upload to a blob. + return blob_uploader.commit_to_blob(app.config, commit_digest) + except BlobTooLargeException as ble: + raise LayerTooLarge(uploaded=ble.uploaded, max_allowed=ble.max_allowed) + except BlobUploadException: + logger.exception('Exception when uploading blob to %s', blob_uploader.blob_upload_id) + _abort_range_not_satisfiable(blob_uploader.blob_upload.byte_count, + blob_uploader.blob_upload_id)