diff --git a/data/model/v2.py b/data/model/v2.py index e10ab9054..6e9e570f8 100644 --- a/data/model/v2.py +++ b/data/model/v2.py @@ -1,8 +1,10 @@ from data.types import ( + Blob, + BlobUpload, + DockerV1Metadata, + ManifestJSON, Repository, Tag, - ManifestJSON, - DockerV1Metadata, ) def get_repository(namespace_name, repo_name): @@ -108,3 +110,73 @@ def repository_tags(namespace_name, repo_name, limit, offset): def get_visible_repositories(username, limit, offset): return [Repository()] + + +def create_blob_upload(namespace_name, repo_name, upload_uuid, location_name, storage_metadata): + """ + Creates a blob upload. + + Returns False if the upload's repository does not exist. + """ + + try: + model.blob.initiate_upload(namespace_name, repo_name, new_upload_uuid, location_name, + upload_metadata) + return True + except database.Repository.DoesNotExist: + return False + + +def blob_upload_by_uuid(uuid): + try: + found = model.blob.get_blob_upload(namespace_name, repo_name, upload_uuid) + except model.InvalidBlobUpload: + raise BlobUploadUnknown() + + return BlobUpload( + uuid=uuid, + byte_count=found.byte_count, + uncompressed_byte_count=found.uncompressed_byte_count, + chunk_count=found.chunk_count, + location_name=found.location.name, + storage_metadata=found.storage_metadata, + ) + + +def update_blob_upload(blob_upload): + # old implementation: + # blob_upload.save() + pass + + +def delete_blob_upload(uuid): + try: + found = model.blob.get_blob_upload(namespace_name, repo_name, upload_uuid) + except model.InvalidBlobUpload: + raise BlobUploadUnknown() + + found.delete_instance() + +def create_blob_and_temp_tag(namespace_name, repo_name, expected_digest, upload_obj): + return model.blob.store_blob_record_and_temp_link(namespace_name, repo_name, expected_digest, + upload_obj.location, + upload_obj.byte_count, + app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'], + upload_obj.uncompressed_byte_count) + + +def blob_by_digest(namespace_name, repo_name, digest): + try: + return model.blob.get_repo_blob_by_digest(namespace_name, repo_name, digest) + except model.BlobDoesNotExist: + return None + + +def create_bittorrent_pieces(blob_storage, piece_size, piece_bytes) + model.storage.save_torrent_info(blob_storage.id, piece_size, piece_bytes) + + +def get_blob_path(blob): + # Once everything is moved over, this could be in util.registry and not even + # touch the database. + model.storage.get_layer_path(blob) diff --git a/data/types.py b/data/types.py index f734f3506..e93c06539 100644 --- a/data/types.py +++ b/data/types.py @@ -344,3 +344,15 @@ DockerV1Metadata = namedtuple('DockerV1Metadata', ['namespace_name', 'command', 'parent_image_id', 'compat_json']) + +BlobUpload = namedtuple('BlobUpload', ['uuid', + 'byte_count', + 'uncompressed_byte_count', + 'chunk_count', + 'sha_state', + 'location_name', + 'storage_metadata', + 'piece_sha_state', + 'piece_hashes']) + +Blob = namedtuple('Blob', ['digest', 'size', 'locations']) diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 3136b4580..a5836bb61 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -7,7 +7,7 @@ import resumablehashlib from app import storage, app from auth.registry_jwt_auth import process_registry_jwt_auth -from data import model, database +from data import database from digest import digest_tools from endpoints.common import parse_repository_name from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream @@ -33,28 +33,6 @@ class _InvalidRangeHeader(Exception): pass -def _base_blob_fetch(namespace_name, repo_name, digest): - """ Some work that is common to both GET and HEAD requests. Callers MUST check for proper - authorization before calling this method. - """ - try: - found = model.blob.get_repo_blob_by_digest(namespace_name, repo_name, digest) - except model.BlobDoesNotExist: - raise BlobUnknown() - - headers = { - 'Docker-Content-Digest': digest, - } - - # Add the Accept-Ranges header if the storage engine supports resumable - # downloads. - if storage.get_supports_resumable_downloads(found.locations): - logger.debug('Storage supports resumable downloads') - headers['Accept-Ranges'] = 'bytes' - - return found, headers - - @v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD']) @parse_repository_name() @process_registry_jwt_auth(scopes=['pull']) @@ -62,12 +40,25 @@ def _base_blob_fetch(namespace_name, repo_name, digest): @anon_protect @cache_control(max_age=31436000) def check_blob_exists(namespace_name, repo_name, digest): - found, headers = _base_blob_fetch(namespace_name, repo_name, digest) + # Find the blob. + blob = v2.blob_by_digest(namespace_name, repo_name, digest) + if blob is None: + raise BlobUnknown() + # Build the response headers. + headers = { + 'Docker-Content-Digest': digest, + 'Content-Length': blob.size, + 'Content-Type': BLOB_CONTENT_TYPE, + } + + # If our storage supports range requests, let the Docker client know. + if storage.get_supports_resumable_downloads(blob.locations): + headers['Accept-Ranges'] = 'bytes' + + # Write the response to the Docker client. response = make_response('') response.headers.extend(headers) - response.headers['Content-Length'] = found.image_size - response.headers['Content-Type'] = BLOB_CONTENT_TYPE return response @@ -78,31 +69,42 @@ def check_blob_exists(namespace_name, repo_name, digest): @anon_protect @cache_control(max_age=31536000) def download_blob(namespace_name, repo_name, digest): - found, headers = _base_blob_fetch(namespace_name, repo_name, digest) + # Find the blob. + blob = v2.blob_by_digest(namespace_name, repo_name, digest) + if blob is None: + raise BlobUnknown() - path = model.storage.get_layer_path(found) + # Build the response headers. + headers = {'Docker-Content-Digest': digest} + + # If our storage supports range requests, let the Docker client know. + if storage.get_supports_resumable_downloads(blob.locations): + headers['Accept-Ranges'] = 'bytes' + + # Find the storage path for the blob. + path = v2.get_blob_path(blob) + + # Short-circuit by redirecting if the storage supports it. logger.debug('Looking up the direct download URL for path: %s', path) - direct_download_url = storage.get_direct_download_url(found.locations, path) - + direct_download_url = storage.get_direct_download_url(blob.locations, path) if direct_download_url: logger.debug('Returning direct download URL') resp = redirect(direct_download_url) resp.headers.extend(headers) return resp - logger.debug('Streaming layer data') - - # Close the database handle here for this process before we send the long download. + # Close the database connection before we stream the download. + logger.debug('Closing database connection before streaming layer data') database.close_db_filter(None) - headers['Content-Length'] = found.image_size - headers['Content-Type'] = BLOB_CONTENT_TYPE - - return Response(storage.stream_read(found.locations, path), headers=headers) - - -def _render_range(num_uploaded_bytes, with_bytes_prefix=True): - return '{0}0-{1}'.format('bytes=' if with_bytes_prefix else '', num_uploaded_bytes - 1) + # Stream the response to the Docker client. + return Response( + storage.stream_read(blob.locations, path), + headers=headers.update({ + 'Content-Length': blob.size, + 'Content-Type': BLOB_CONTENT_TYPE, + }), + ) @v2_bp.route('//blobs/uploads/', methods=['POST']) @@ -111,37 +113,50 @@ def _render_range(num_uploaded_bytes, with_bytes_prefix=True): @require_repo_write @anon_protect def start_blob_upload(namespace_name, repo_name): + # Begin the blob upload process in the database and storage. location_name = storage.preferred_locations[0] new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name) - - try: - model.blob.initiate_upload(namespace_name, repo_name, new_upload_uuid, location_name, - upload_metadata) - except database.Repository.DoesNotExist: + repository_exists = v2.create_blob_upload(namespace_name, repo_name, new_upload_uuid, + location_name, upload_metadata) + if not repository_exists: raise NameUnknown() digest = request.args.get('digest', None) if digest is None: - # The user will send the blob data in another request + # Short-circuit because the user will send the blob data in another request. accepted = make_response('', 202) accepted.headers['Location'] = url_for('v2.upload_chunk', repository='%s/%s' % (namespace_name, repo_name), upload_uuid=new_upload_uuid) - accepted.headers['Range'] = _render_range(0) accepted.headers['Docker-Upload-UUID'] = new_upload_uuid return accepted - else: - # The user plans to send us the entire body right now - blob_upload, upload_error = _upload_chunk(namespace_name, repo_name, new_upload_uuid) - blob_upload.save() - if upload_error: - logger.error('Got error when uploading chunk for blob %s under repository %s/%s: %s', - namespace_name, repo_name, new_upload_uuid, upload_error) - _range_not_satisfiable(blob_upload.byte_count) + # The user plans to send us the entire body right now. + # Find the upload. + blob_upload = v2.blob_upload_by_uuid(new_upload_uuid) + if blob_upload is None: + raise BlobUploadUnknown() - return _finish_upload(namespace_name, repo_name, blob_upload, digest) + # Upload the chunk to storage while calculating some metadata and updating + # the upload state. + updated_blob_upload = _upload_chunk(blob_upload, *_start_offset_and_length(request.headers)) + if updated_blob_upload is None: + _abort_range_not_satisfiable(updated_blob_upload.byte_count, new_upload_uuid) + + # Save the upload state to the database. + v2.update_blob_upload(updated_blob_upload) + + # Finalize the upload process in the database and storage. + _finish_upload(namespace_name, repo_name, updated_blob_upload, digest) + + # Write the response to the docker client. + response = make_response('', 201) + response.headers['Docker-Content-Digest'] = digest + response.headers['Location'] = url_for('v2.download_blob', + repository='%s/%s' % (namespace_name, repo_name), + digest=digest) + return response @v2_bp.route('//blobs/uploads/', methods=['GET']) @@ -150,33 +165,141 @@ def start_blob_upload(namespace_name, repo_name): @require_repo_write @anon_protect def fetch_existing_upload(namespace_name, repo_name, upload_uuid): - try: - found = model.blob.get_blob_upload(namespace_name, repo_name, upload_uuid) - except model.InvalidBlobUpload: + blob_upload = v2.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid) + if blob_upload is None: raise BlobUploadUnknown() - # Note: Docker byte ranges are exclusive so we have to add one to the byte count. accepted = make_response('', 204) - accepted.headers['Range'] = _render_range(found.byte_count + 1) - accepted.headers['Docker-Upload-UUID'] = upload_uuid + accepted.headers.extend({ + 'Docker-Upload-UUID': upload_uuid, + 'Range': _render_range(blob_upload.byte_count+1), # Docker byte ranges are exclusive + }) return accepted +@v2_bp.route('//blobs/uploads/', methods=['PATCH']) +@parse_repository_name() +@process_registry_jwt_auth(scopes=['pull', 'push']) +@require_repo_write +@anon_protect +def upload_chunk(namespace_name, repo_name, upload_uuid): + # Find the upload. + blob_upload = v2.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid) + if blob_upload is None: + raise BlobUploadUnknown() + + # Upload the chunk to storage while calculating some metadata and updating + # the upload state. + updated_blob_upload = _upload_chunk(blob_upload, *_start_offset_and_length(request.headers)) + if updated_blob_upload is None: + _abort_range_not_satisfiable(updated_blob_upload.byte_count, upload_uuid) + + # Save the upload state to the database. + v2.update_blob_upload(updated_blob_upload) + + # Write the response to the Docker client. + accepted = make_response('', 204) + accepted.headers.extend({ + 'Location': _current_request_path(), + 'Range': _render_range(updated_blob_upload.byte_count, with_bytes_prefix=False), + 'Docker-Upload-UUID': upload_uuid, + }) + return accepted + + +@v2_bp.route('//blobs/uploads/', methods=['PUT']) +@parse_repository_name() +@process_registry_jwt_auth(scopes=['pull', 'push']) +@require_repo_write +@anon_protect +def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid): + # Ensure the digest is present before proceeding. + digest = request.args.get('digest', None) + if digest is None: + raise BlobUploadInvalid() + + # Find the upload. + blob_upload = v2.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid) + if blob_upload is None: + raise BlobUploadUnknown() + + # Upload the chunk to storage while calculating some metadata and updating + # the upload state. + updated_blob_upload = _upload_chunk(blob_upload, *_start_offset_and_length(request.headers)) + if updated_blob_upload is None: + _abort_range_not_satisfiable(updated_blob_upload.byte_count, upload_uuid) + + # Finalize the upload process in the database and storage. + _finish_upload(namespace_name, repo_name, updated_blob_upload, digest) + + # Write the response to the Docker client. + response = make_response('', 201) + response.headers.extend({ + 'Docker-Content-Digest': digest, + 'Location': url_for('v2.download_blob', repository='%s/%s' % (namespace_name, repo_name), + digest=digest) + }) + return response + + +@v2_bp.route('//blobs/uploads/', methods=['DELETE']) +@parse_repository_name() +@process_registry_jwt_auth(scopes=['pull', 'push']) +@require_repo_write +@anon_protect +def cancel_upload(namespace_name, repo_name, upload_uuid): + upload = v2.blob_upload_by_uuid(upload_uuid) + if upload is None: + raise BlobUploadUnknown() + + # We delete the record for the upload first, since if the partial upload in + # storage fails to delete, it doesn't break anything + v2.delete_blob_upload(upload_uuid) + storage.cancel_chunked_upload({upload.location_name}, upload.uuid, upload.storage_metadata) + + return make_response('', 204) + + +@v2_bp.route('//blobs/', methods=['DELETE']) +@parse_repository_name() +@process_registry_jwt_auth(scopes=['pull', 'push']) +@require_repo_write +@anon_protect +def delete_digest(namespace_name, repo_name, upload_uuid): + # We do not support deleting arbitrary digests, as they break repo images. + raise Unsupported() + + +def _render_range(num_uploaded_bytes, with_bytes_prefix=True): + return '{0}0-{1}'.format('bytes=' if with_bytes_prefix else '', num_uploaded_bytes - 1) + + def _current_request_path(): return '{0}{1}'.format(request.script_root, request.path) -def _range_not_satisfiable(valid_end): +def _abort_range_not_satisfiable(valid_end, upload_uuid): + """ + Writes a failure response for scenarios where the registry cannot function + with the provided range. + + TODO(jzelinskie): Unify this with the V2RegistryException class. + """ invalid_range = make_response('', 416) - invalid_range.headers['Location'] = _current_request_path() - invalid_range.headers['Range'] = '0-{0}'.format(valid_end) - invalid_range.headers['Docker-Upload-UUID'] = request.view_args['upload_uuid'] + invalid_range.headers.extend({ + 'Location': _current_request_path(), + 'Range': '0-{0}'.format(valid_end), + 'Docker-Upload-UUID': upload_uuid, + }) flask_abort(invalid_range) def _parse_range_header(range_header_text): - """ Parses the range header, and returns a tuple of the start offset and the length, - or raises an _InvalidRangeHeader exception. + """ + Parses the range header. + + Returns a tuple of the start offset and the length. + If the parse fails, raises _InvalidRangeHeader. """ found = RANGE_HEADER_REGEX.match(range_header_text) if found is None: @@ -191,60 +314,66 @@ def _parse_range_header(range_header_text): return (start, length) -def _upload_chunk(namespace_name, repo_name, upload_uuid): - """ Common code among the various uploading paths for appending data to blobs. - Callers MUST call .save() or .delete_instance() on the returned database object. - Returns the BlobUpload object and the error that occurred, if any (or None if none). +def _start_offset_and_length(headers): + """ + Returns a tuple of the start offset and the length. + If the range header doesn't exist, defaults to (0, -1). + If parsing fails, returns (None, None). """ - try: - found = model.blob.get_blob_upload(namespace_name, repo_name, upload_uuid) - except model.InvalidBlobUpload: - raise BlobUploadUnknown() - start_offset, length = 0, -1 - range_header = request.headers.get('range', None) + range_header = headers.get('range', None) if range_header is not None: try: start_offset, length = _parse_range_header(range_header) except _InvalidRangeHeader: - _range_not_satisfiable(found.byte_count) + return None, None + return start_offset, length - if start_offset > 0 and start_offset > found.byte_count: - _range_not_satisfiable(found.byte_count) - location_set = {found.location.name} +def _upload_chunk(blob_upload, start_offset, length): + """ + Calculates metadata while uploading a chunk to storage. + + Returns a BlobUpload object or None if there was a failure. + """ + # Check for invalidate arguments. + if None in {blob_upload, start_offset, length}: + return None + if start_offset > 0 and start_offset > blob_upload.byte_count: + return None + + location_set = {blob_upload.location_name} upload_error = None with database.CloseForLongOperation(app.config): input_fp = get_input_stream(request) - if start_offset > 0 and start_offset < found.byte_count: + if start_offset > 0 and start_offset < blob_upload.byte_count: # Skip the bytes which were received on a previous push, which are already stored and # included in the sha calculation - overlap_size = found.byte_count - start_offset + overlap_size = blob_upload.byte_count - start_offset input_fp = StreamSlice(input_fp, overlap_size) # Update our upload bounds to reflect the skipped portion of the overlap - start_offset = found.byte_count + start_offset = blob_upload.byte_count length = max(length - overlap_size, 0) # We use this to escape early in case we have already processed all of the bytes the user # wants to upload if length == 0: - return found, None + return blob_upload - input_fp = wrap_with_handler(input_fp, found.sha_state.update) + input_fp = wrap_with_handler(input_fp, blob_upload.sha_state.update) # Add a hasher for calculating SHA1s for torrents if this is the first chunk and/or we have # already calculated hash data for the previous chunk(s). piece_hasher = None - if found.chunk_count == 0 or found.piece_sha_state: - initial_sha1_value = found.piece_sha_state or resumablehashlib.sha1() - initial_sha1_pieces_value = found.piece_hashes or '' + if blob_upload.chunk_count == 0 or blob_upload.piece_sha_state: + initial_sha1_value = blob_upload.piece_sha_state or resumablehashlib.sha1() + initial_sha1_pieces_value = blob_upload.piece_hashes or '' piece_hasher = PieceHasher(app.config['BITTORRENT_PIECE_SIZE'], start_offset, - initial_sha1_pieces_value, - initial_sha1_value) + initial_sha1_pieces_value, initial_sha1_value) input_fp = wrap_with_handler(input_fp, piece_hasher.update) @@ -252,147 +381,114 @@ def _upload_chunk(namespace_name, repo_name, upload_uuid): # stream so we can determine the uncompressed size. We'll throw out this data if another chunk # comes in, but in the common case Docker only sends one chunk. size_info = None - if start_offset == 0 and found.chunk_count == 0: + if start_offset == 0 and blob_upload.chunk_count == 0: size_info, fn = calculate_size_handler() input_fp = wrap_with_handler(input_fp, fn) - chunk_result = storage.stream_upload_chunk(location_set, upload_uuid, start_offset, length, - input_fp, found.storage_metadata, - content_type=BLOB_CONTENT_TYPE) - length_written, new_metadata, upload_error = chunk_result + try: + length_written, new_metadata, error = storage.stream_upload_chunk( + location_set, + blob_upload.uuid, + start_offset, + length, + input_fp, + blob_upload.storage_metadata, + content_type=BLOB_CONTENT_TYPE, + ) + if error is not None: + return None + except InvalidChunkException: + return None # If we determined an uncompressed size and this is the first chunk, add it to the blob. # Otherwise, we clear the size from the blob as it was uploaded in multiple chunks. - if size_info is not None and found.chunk_count == 0 and size_info.is_valid: - found.uncompressed_byte_count = size_info.uncompressed_size + if size_info is not None and blob_upload.chunk_count == 0 and size_info.is_valid: + blob_upload.uncompressed_byte_count = size_info.uncompressed_size elif length_written > 0: # Otherwise, if we wrote some bytes and the above conditions were not met, then we don't # know the uncompressed size. - found.uncompressed_byte_count = None + blob_upload.uncompressed_byte_count = None if piece_hasher is not None: - found.piece_hashes = piece_hasher.piece_hashes - found.piece_sha_state = piece_hasher.hash_fragment - - found.storage_metadata = new_metadata - found.byte_count += length_written - found.chunk_count += 1 - return found, upload_error + blob_upload.piece_hashes = piece_hasher.piece_hashes + blob_upload.piece_sha_state = piece_hasher.hash_fragment + blob_upload.storage_metadata = new_metadata + blob_upload.byte_count += length_written + blob_upload.chunk_count += 1 + return blob_upload -def _finish_upload(namespace_name, repo_name, upload_obj, expected_digest): - # Verify that the digest's SHA matches that of the uploaded data. - computed_digest = digest_tools.sha256_digest_from_hashlib(upload_obj.sha_state) +def _validate_digest(blob_upload, expected_digest): + """ + Verifies that the digest's SHA matches that of the uploaded data. + """ + computed_digest = digest_tools.sha256_digest_from_hashlib(blob_upload.sha_state) if not digest_tools.digests_equal(computed_digest, expected_digest): logger.error('Digest mismatch for upload %s: Expected digest %s, found digest %s', upload_obj.uuid, expected_digest, computed_digest) raise BlobUploadInvalid(detail={'reason': 'Digest mismatch on uploaded blob'}) + +def _finalize_blob_storage(blob_upload, expected_digest): + """ + When an upload is successful, this ends the uploading process from the + storage's perspective. + + Returns True if the blob already existed. + """ final_blob_location = digest_tools.content_path(expected_digest) # Move the storage into place, or if this was a re-upload, cancel it with database.CloseForLongOperation(app.config): - already_exists = storage.exists({upload_obj.location.name}, final_blob_location) - if already_exists: - # It already existed, clean up our upload which served as proof that we had the file - storage.cancel_chunked_upload({upload_obj.location.name}, upload_obj.uuid, - upload_obj.storage_metadata) + already_existed = storage.exists({blob_upload.location_name}, final_blob_location) + if already_existed: + # It already existed, clean up our upload which served as proof that the + # uploader had the blob. + storage.cancel_chunked_upload({blob_upload.location_name}, blob_upload.uuid, + blob_upload.storage_metadata) else: # We were the first ones to upload this image (at least to this location) # Let's copy it into place - storage.complete_chunked_upload({upload_obj.location.name}, upload_obj.uuid, - final_blob_location, upload_obj.storage_metadata) - - # Mark the blob as uploaded. - blob_storage = model.blob.store_blob_record_and_temp_link(namespace_name, repo_name, expected_digest, - upload_obj.location, - upload_obj.byte_count, - app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'], - upload_obj.uncompressed_byte_count) - - if upload_obj.piece_sha_state is not None and not already_exists: - piece_bytes = upload_obj.piece_hashes + upload_obj.piece_sha_state.digest() - model.storage.save_torrent_info(blob_storage, app.config['BITTORRENT_PIECE_SIZE'], piece_bytes) - - # Delete the upload tracking row. - upload_obj.delete_instance() - - response = make_response('', 201) - response.headers['Docker-Content-Digest'] = expected_digest - response.headers['Location'] = url_for('v2.download_blob', - repository='%s/%s' % (namespace_name, repo_name), - digest=expected_digest) - return response + storage.complete_chunked_upload({blob_upload.location_name}, blob_upload.uuid, + final_blob_location, blob_upload.storage_metadata) + return already_existed -@v2_bp.route('//blobs/uploads/', methods=['PATCH']) -@parse_repository_name() -@process_registry_jwt_auth(scopes=['pull', 'push']) -@require_repo_write -@anon_protect -def upload_chunk(namespace_name, repo_name, upload_uuid): - blob_upload, upload_error = _upload_chunk(namespace_name, repo_name, upload_uuid) - blob_upload.save() +def _finalize_blob_database(namespace_name, repo_name, blob_upload, digest, already_existed): + """ + When an upload is successful, this ends the uploading process from the + database's perspective. + """ + # Create the blob and temporarily tag it. + blob_storage = v2.create_blob_and_temp_tag( + namespace_name, + repo_name, + digest, + blob_upload.location_name, + blob_upload.byte_count, + blob_upload.uncompressed_byte_count, + app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'], + ) - if upload_error: - logger.error('Got error when uploading chunk for blob %s under repository %s/%s: %s', - namespace_name, repo_name, upload_uuid, upload_error) - _range_not_satisfiable(blob_upload.byte_count) + # If it doesn't already exist, create the BitTorrent pieces for the blob. + if blob_upload.piece_sha_state is not None and not already_existed: + piece_bytes = blob_upload.piece_hashes + blob_upload.piece_sha_state.digest() + v2.create_bittorrent_pieces(blob_storage, app.config['BITTORRENT_PIECE_SIZE'], piece_bytes) - accepted = make_response('', 204) - accepted.headers['Location'] = _current_request_path() - accepted.headers['Range'] = _render_range(blob_upload.byte_count, with_bytes_prefix=False) - accepted.headers['Docker-Upload-UUID'] = upload_uuid - return accepted + # Delete the blob upload. + v2.delete_upload(blob_upload.uuid) -@v2_bp.route('//blobs/uploads/', methods=['PUT']) -@parse_repository_name() -@process_registry_jwt_auth(scopes=['pull', 'push']) -@require_repo_write -@anon_protect -def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid): - digest = request.args.get('digest', None) - if digest is None: - raise BlobUploadInvalid(detail={'reason': 'Missing digest arg on monolithic upload'}) - - blob_upload, upload_error = _upload_chunk(namespace_name, repo_name, upload_uuid) - blob_upload.save() - - if upload_error: - logger.error('Got error when uploading chunk for blob %s under repository %s/%s: %s', - namespace_name, repo_name, upload_uuid, upload_error) - _range_not_satisfiable(blob_upload.byte_count) - - return _finish_upload(namespace_name, repo_name, blob_upload, digest) - - -@v2_bp.route('//blobs/uploads/', methods=['DELETE']) -@parse_repository_name() -@process_registry_jwt_auth(scopes=['pull', 'push']) -@require_repo_write -@anon_protect -def cancel_upload(namespace_name, repo_name, upload_uuid): - try: - found = model.blob.get_blob_upload(namespace_name, repo_name, upload_uuid) - except model.InvalidBlobUpload: - raise BlobUploadUnknown() - - # We delete the record for the upload first, since if the partial upload in - # storage fails to delete, it doesn't break anything - found.delete_instance() - storage.cancel_chunked_upload({found.location.name}, found.uuid, found.storage_metadata) - - return make_response('', 204) - - - -@v2_bp.route('//blobs/', methods=['DELETE']) -@parse_repository_name() -@process_registry_jwt_auth(scopes=['pull', 'push']) -@require_repo_write -@anon_protect -def delete_digest(namespace_name, repo_name, upload_uuid): - # We do not support deleting arbitrary digests, as they break repo images. - raise Unsupported() +def _finish_upload(namespace_name, repo_name, blob_upload, digest): + """ + When an upload is successful, this ends the uploading process. + """ + _validate_digest(blob_upload, digest) + _finalize_blob_database( + namespace_name, + repo_name, + blob_upload, + digest, + _finalize_blob_storage(blob_upload, digest), + )