524 lines
		
	
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			524 lines
		
	
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| import re
 | |
| import time
 | |
| 
 | |
| from flask import url_for, request, redirect, Response, abort as flask_abort
 | |
| 
 | |
| import bitmath
 | |
| import resumablehashlib
 | |
| 
 | |
| from app import storage, app, get_app_url, metric_queue, model_cache
 | |
| from auth.registry_jwt_auth import process_registry_jwt_auth
 | |
| from data import database
 | |
| from data.cache import cache_key
 | |
| from digest import digest_tools
 | |
| from endpoints.decorators import anon_protect, parse_repository_name
 | |
| from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
 | |
| from endpoints.v2.errors import (
 | |
|   BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported, NameUnknown, LayerTooLarge)
 | |
| from endpoints.v2.models_pre_oci import data_model as model
 | |
| from util.cache import cache_control
 | |
| from util.registry.filelike import wrap_with_handler, StreamSlice
 | |
| from util.registry.gzipstream import calculate_size_handler
 | |
| from util.registry.torrent import PieceHasher
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| BASE_BLOB_ROUTE = '/<repopath:repository>/blobs/<regex("{0}"):digest>'
 | |
| BLOB_DIGEST_ROUTE = BASE_BLOB_ROUTE.format(digest_tools.DIGEST_PATTERN)
 | |
| RANGE_HEADER_REGEX = re.compile(r'^bytes=([0-9]+)-([0-9]+)$')
 | |
| BLOB_CONTENT_TYPE = 'application/octet-stream'
 | |
| 
 | |
| 
 | |
| class _InvalidRangeHeader(Exception):
 | |
|   pass
 | |
| 
 | |
| 
 | |
| def _get_repository_blob(namespace_name, repo_name, digest):
 | |
|   """ Returns the blob with the given digest under the repository with the given
 | |
|       name. If one does not exist (or it is still uploading), returns None.
 | |
|       Automatically handles caching.
 | |
|   """
 | |
|   def load_blob():
 | |
|     return model.get_blob_by_digest(namespace_name, repo_name, digest)
 | |
| 
 | |
|   blob_cache_key = cache_key.for_repository_blob(namespace_name, repo_name, digest)
 | |
|   return model_cache.retrieve(blob_cache_key, load_blob)
 | |
| 
 | |
| 
 | |
| @v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD'])
 | |
| @parse_repository_name()
 | |
| @process_registry_jwt_auth(scopes=['pull'])
 | |
| @require_repo_read
 | |
| @anon_protect
 | |
| @cache_control(max_age=31436000)
 | |
| def check_blob_exists(namespace_name, repo_name, digest):
 | |
|   # Find the blob.
 | |
|   blob = _get_repository_blob(namespace_name, repo_name, digest)
 | |
|   if blob is None:
 | |
|     raise BlobUnknown()
 | |
| 
 | |
|   # Build the response headers.
 | |
|   headers = {
 | |
|     'Docker-Content-Digest': digest,
 | |
|     'Content-Length': blob.size,
 | |
|     'Content-Type': BLOB_CONTENT_TYPE,
 | |
|   }
 | |
| 
 | |
|   # If our storage supports range requests, let the client know.
 | |
|   if storage.get_supports_resumable_downloads(blob.locations):
 | |
|     headers['Accept-Ranges'] = 'bytes'
 | |
| 
 | |
|   # Write the response to the client.
 | |
|   return Response(headers=headers)
 | |
| 
 | |
| 
 | |
| @v2_bp.route(BLOB_DIGEST_ROUTE, methods=['GET'])
 | |
| @parse_repository_name()
 | |
| @process_registry_jwt_auth(scopes=['pull'])
 | |
| @require_repo_read
 | |
| @anon_protect
 | |
| @cache_control(max_age=31536000)
 | |
| def download_blob(namespace_name, repo_name, digest):
 | |
|   # Find the blob.
 | |
|   blob = _get_repository_blob(namespace_name, repo_name, digest)
 | |
|   if blob is None:
 | |
|     raise BlobUnknown()
 | |
| 
 | |
|   # Build the response headers.
 | |
|   headers = {'Docker-Content-Digest': digest}
 | |
| 
 | |
|   # If our storage supports range requests, let the client know.
 | |
|   if storage.get_supports_resumable_downloads(blob.locations):
 | |
|     headers['Accept-Ranges'] = 'bytes'
 | |
| 
 | |
|   # Find the storage path for the blob.
 | |
|   path = model.get_blob_path(blob)
 | |
| 
 | |
|   # Short-circuit by redirecting if the storage supports it.
 | |
|   logger.debug('Looking up the direct download URL for path: %s', path)
 | |
|   direct_download_url = storage.get_direct_download_url(blob.locations, path, request.remote_addr)
 | |
|   if direct_download_url:
 | |
|     logger.debug('Returning direct download URL')
 | |
|     resp = redirect(direct_download_url)
 | |
|     resp.headers.extend(headers)
 | |
|     return resp
 | |
| 
 | |
|   # Close the database connection before we stream the download.
 | |
|   logger.debug('Closing database connection before streaming layer data')
 | |
|   with database.CloseForLongOperation(app.config):
 | |
|     # Stream the response to the client.
 | |
|     return Response(
 | |
|       storage.stream_read(blob.locations, path),
 | |
|       headers=headers.update({
 | |
|         'Content-Length': blob.size,
 | |
|         'Content-Type': BLOB_CONTENT_TYPE,}),)
 | |
| 
 | |
| 
 | |
| @v2_bp.route('/<repopath:repository>/blobs/uploads/', methods=['POST'])
 | |
| @parse_repository_name()
 | |
| @process_registry_jwt_auth(scopes=['pull', 'push'])
 | |
| @require_repo_write
 | |
| @anon_protect
 | |
| def start_blob_upload(namespace_name, repo_name):
 | |
|   # Begin the blob upload process in the database and storage.
 | |
|   location_name = storage.preferred_locations[0]
 | |
|   new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name)
 | |
|   repository_exists = model.create_blob_upload(namespace_name, repo_name, new_upload_uuid,
 | |
|                                                location_name, upload_metadata)
 | |
|   if not repository_exists:
 | |
|     raise NameUnknown()
 | |
| 
 | |
|   digest = request.args.get('digest', None)
 | |
|   if digest is None:
 | |
|     # Short-circuit because the user will send the blob data in another request.
 | |
|     return Response(
 | |
|       status=202,
 | |
|       headers={
 | |
|         'Docker-Upload-UUID':
 | |
|           new_upload_uuid,
 | |
|         'Range':
 | |
|           _render_range(0),
 | |
|         'Location':
 | |
|           get_app_url() + url_for('v2.upload_chunk', repository='%s/%s' %
 | |
|                                   (namespace_name, repo_name), upload_uuid=new_upload_uuid)},)
 | |
| 
 | |
|   # The user plans to send us the entire body right now.
 | |
|   # Find the upload.
 | |
|   blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, new_upload_uuid)
 | |
|   if blob_upload is None:
 | |
|     raise BlobUploadUnknown()
 | |
| 
 | |
|   # Upload the chunk to storage while calculating some metadata and updating
 | |
|   # the upload state.
 | |
|   updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range'))
 | |
|   if updated_blob_upload is None:
 | |
|     _abort_range_not_satisfiable(blob_upload.byte_count, new_upload_uuid)
 | |
| 
 | |
|   # Save the upload state to the database.
 | |
|   model.update_blob_upload(updated_blob_upload)
 | |
| 
 | |
|   # Finalize the upload process in the database and storage.
 | |
|   _finish_upload(namespace_name, repo_name, updated_blob_upload, digest)
 | |
| 
 | |
|   # Write the response to the client.
 | |
|   return Response(
 | |
|     status=201,
 | |
|     headers={
 | |
|       'Docker-Content-Digest':
 | |
|         digest,
 | |
|       'Location':
 | |
|         get_app_url() + url_for('v2.download_blob', repository='%s/%s' %
 | |
|                                 (namespace_name, repo_name), digest=digest),},)
 | |
| 
 | |
| 
 | |
| @v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['GET'])
 | |
| @parse_repository_name()
 | |
| @process_registry_jwt_auth(scopes=['pull'])
 | |
| @require_repo_write
 | |
| @anon_protect
 | |
| def fetch_existing_upload(namespace_name, repo_name, upload_uuid):
 | |
|   blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid)
 | |
|   if blob_upload is None:
 | |
|     raise BlobUploadUnknown()
 | |
| 
 | |
|   return Response(
 | |
|     status=204,
 | |
|     headers={
 | |
|       'Docker-Upload-UUID': upload_uuid,
 | |
|       'Range': _render_range(blob_upload.byte_count + 1),  # byte ranges are exclusive
 | |
|     },)
 | |
| 
 | |
| 
 | |
| @v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['PATCH'])
 | |
| @parse_repository_name()
 | |
| @process_registry_jwt_auth(scopes=['pull', 'push'])
 | |
| @require_repo_write
 | |
| @anon_protect
 | |
| def upload_chunk(namespace_name, repo_name, upload_uuid):
 | |
|   # Find the upload.
 | |
|   blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid)
 | |
|   if blob_upload is None:
 | |
|     raise BlobUploadUnknown()
 | |
| 
 | |
|   # Upload the chunk to storage while calculating some metadata and updating
 | |
|   # the upload state.
 | |
|   updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range'))
 | |
|   if updated_blob_upload is None:
 | |
|     _abort_range_not_satisfiable(blob_upload.byte_count, upload_uuid)
 | |
| 
 | |
|   # Save the upload state to the database.
 | |
|   model.update_blob_upload(updated_blob_upload)
 | |
| 
 | |
|   # Write the response to the client.
 | |
|   return Response(
 | |
|     status=204,
 | |
|     headers={
 | |
|       'Location': _current_request_url(),
 | |
|       'Range': _render_range(updated_blob_upload.byte_count, with_bytes_prefix=False),
 | |
|       'Docker-Upload-UUID': upload_uuid,},)
 | |
| 
 | |
| 
 | |
| @v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['PUT'])
 | |
| @parse_repository_name()
 | |
| @process_registry_jwt_auth(scopes=['pull', 'push'])
 | |
| @require_repo_write
 | |
| @anon_protect
 | |
| def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid):
 | |
|   # Ensure the digest is present before proceeding.
 | |
|   digest = request.args.get('digest', None)
 | |
|   if digest is None:
 | |
|     raise BlobUploadInvalid(detail={'reason': 'Missing digest arg on monolithic upload'})
 | |
| 
 | |
|   # Find the upload.
 | |
|   blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid)
 | |
|   if blob_upload is None:
 | |
|     raise BlobUploadUnknown()
 | |
| 
 | |
|   # Upload the chunk to storage while calculating some metadata and updating
 | |
|   # the upload state.
 | |
|   updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range'))
 | |
|   if updated_blob_upload is None:
 | |
|     _abort_range_not_satisfiable(blob_upload.byte_count, upload_uuid)
 | |
| 
 | |
|   # Finalize the upload process in the database and storage.
 | |
|   _finish_upload(namespace_name, repo_name, updated_blob_upload, digest)
 | |
| 
 | |
|   # Write the response to the client.
 | |
|   return Response(status=201, headers={
 | |
|     'Docker-Content-Digest':
 | |
|       digest,
 | |
|     'Location':
 | |
|       get_app_url() + url_for('v2.download_blob', repository='%s/%s' %
 | |
|                               (namespace_name, repo_name), digest=digest),})
 | |
| 
 | |
| 
 | |
| @v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['DELETE'])
 | |
| @parse_repository_name()
 | |
| @process_registry_jwt_auth(scopes=['pull', 'push'])
 | |
| @require_repo_write
 | |
| @anon_protect
 | |
| def cancel_upload(namespace_name, repo_name, upload_uuid):
 | |
|   blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid)
 | |
|   if blob_upload is None:
 | |
|     raise BlobUploadUnknown()
 | |
| 
 | |
|   # We delete the record for the upload first, since if the partial upload in
 | |
|   # storage fails to delete, it doesn't break anything.
 | |
|   model.delete_blob_upload(namespace_name, repo_name, upload_uuid)
 | |
|   storage.cancel_chunked_upload({blob_upload.location_name}, blob_upload.uuid,
 | |
|                                 blob_upload.storage_metadata)
 | |
| 
 | |
|   return Response(status=204)
 | |
| 
 | |
| 
 | |
| @v2_bp.route('/<repopath:repository>/blobs/<digest>', methods=['DELETE'])
 | |
| @parse_repository_name()
 | |
| @process_registry_jwt_auth(scopes=['pull', 'push'])
 | |
| @require_repo_write
 | |
| @anon_protect
 | |
| def delete_digest(namespace_name, repo_name, upload_uuid):
 | |
|   # We do not support deleting arbitrary digests, as they break repo images.
 | |
|   raise Unsupported()
 | |
| 
 | |
| 
 | |
| def _render_range(num_uploaded_bytes, with_bytes_prefix=True):
 | |
|   """
 | |
|   Returns a string formatted to be used in the Range header.
 | |
|   """
 | |
|   return '{0}0-{1}'.format('bytes=' if with_bytes_prefix else '', num_uploaded_bytes - 1)
 | |
| 
 | |
| 
 | |
| def _current_request_url():
 | |
|   return '{0}{1}{2}'.format(get_app_url(), request.script_root, request.path)
 | |
| 
 | |
| 
 | |
| def _abort_range_not_satisfiable(valid_end, upload_uuid):
 | |
|   """
 | |
|   Writes a failure response for scenarios where the registry cannot function
 | |
|   with the provided range.
 | |
| 
 | |
|   TODO(jzelinskie): Unify this with the V2RegistryException class.
 | |
|   """
 | |
|   flask_abort(
 | |
|     Response(status=416, headers={
 | |
|       'Location': _current_request_url(),
 | |
|       'Range': '0-{0}'.format(valid_end),
 | |
|       'Docker-Upload-UUID': upload_uuid}))
 | |
| 
 | |
| 
 | |
| def _parse_range_header(range_header_text):
 | |
|   """
 | |
|   Parses the range header.
 | |
| 
 | |
|   Returns a tuple of the start offset and the length.
 | |
|   If the parse fails, raises _InvalidRangeHeader.
 | |
|   """
 | |
|   found = RANGE_HEADER_REGEX.match(range_header_text)
 | |
|   if found is None:
 | |
|     raise _InvalidRangeHeader()
 | |
| 
 | |
|   start = int(found.group(1))
 | |
|   length = int(found.group(2)) - start
 | |
| 
 | |
|   if length <= 0:
 | |
|     raise _InvalidRangeHeader()
 | |
| 
 | |
|   return (start, length)
 | |
| 
 | |
| 
 | |
| def _start_offset_and_length(range_header):
 | |
|   """
 | |
|   Returns a tuple of the start offset and the length.
 | |
|   If the range header doesn't exist, defaults to (0, -1).
 | |
|   If parsing fails, returns (None, None).
 | |
|   """
 | |
|   start_offset, length = 0, -1
 | |
|   if range_header is not None:
 | |
|     try:
 | |
|       start_offset, length = _parse_range_header(range_header)
 | |
|     except _InvalidRangeHeader:
 | |
|       return None, None
 | |
| 
 | |
|   return start_offset, length
 | |
| 
 | |
| 
 | |
| def _upload_chunk(blob_upload, range_header):
 | |
|   """
 | |
|   Calculates metadata while uploading a chunk to storage.
 | |
| 
 | |
|   Returns a BlobUpload object or None if there was a failure.
 | |
|   """
 | |
|   max_layer_size = bitmath.parse_string_unsafe(app.config['MAXIMUM_LAYER_SIZE'])
 | |
| 
 | |
|   # Get the offset and length of the current chunk.
 | |
|   start_offset, length = _start_offset_and_length(range_header)
 | |
|   if blob_upload is None or None in {start_offset, length}:
 | |
|     logger.error('Invalid arguments provided to _upload_chunk')
 | |
|     return None
 | |
| 
 | |
|   if start_offset > 0 and start_offset > blob_upload.byte_count:
 | |
|     logger.error('start_offset provided to _upload_chunk greater than blob.upload.byte_count')
 | |
|     return None
 | |
| 
 | |
|   # Check if we should raise 413 before accepting the data.
 | |
|   uploaded = bitmath.Byte(length + start_offset)
 | |
|   if length > -1 and uploaded > max_layer_size:
 | |
|     raise LayerTooLarge(uploaded=uploaded.bytes, max_allowed=max_layer_size.bytes)
 | |
| 
 | |
|   location_set = {blob_upload.location_name}
 | |
| 
 | |
|   upload_error = None
 | |
|   with database.CloseForLongOperation(app.config):
 | |
|     input_fp = get_input_stream(request)
 | |
| 
 | |
|     if start_offset > 0 and start_offset < blob_upload.byte_count:
 | |
|       # Skip the bytes which were received on a previous push, which are already stored and
 | |
|       # included in the sha calculation
 | |
|       overlap_size = blob_upload.byte_count - start_offset
 | |
|       input_fp = StreamSlice(input_fp, overlap_size)
 | |
| 
 | |
|       # Update our upload bounds to reflect the skipped portion of the overlap
 | |
|       start_offset = blob_upload.byte_count
 | |
|       length = max(length - overlap_size, 0)
 | |
| 
 | |
|     # We use this to escape early in case we have already processed all of the bytes the user
 | |
|     # wants to upload
 | |
|     if length == 0:
 | |
|       return blob_upload
 | |
| 
 | |
|     input_fp = wrap_with_handler(input_fp, blob_upload.sha_state.update)
 | |
| 
 | |
|     # Add a hasher for calculating SHA1s for torrents if this is the first chunk and/or we have
 | |
|     # already calculated hash data for the previous chunk(s).
 | |
|     piece_hasher = None
 | |
|     if blob_upload.chunk_count == 0 or blob_upload.piece_sha_state:
 | |
|       initial_sha1_value = blob_upload.piece_sha_state or resumablehashlib.sha1()
 | |
|       initial_sha1_pieces_value = blob_upload.piece_hashes or ''
 | |
| 
 | |
|       piece_hasher = PieceHasher(app.config['BITTORRENT_PIECE_SIZE'], start_offset,
 | |
|                                  initial_sha1_pieces_value, initial_sha1_value)
 | |
| 
 | |
|       input_fp = wrap_with_handler(input_fp, piece_hasher.update)
 | |
| 
 | |
|     # If this is the first chunk and we're starting at the 0 offset, add a handler to gunzip the
 | |
|     # stream so we can determine the uncompressed size. We'll throw out this data if another chunk
 | |
|     # comes in, but in the common case the docker client only sends one chunk.
 | |
|     size_info = None
 | |
|     if start_offset == 0 and blob_upload.chunk_count == 0:
 | |
|       size_info, fn = calculate_size_handler()
 | |
|       input_fp = wrap_with_handler(input_fp, fn)
 | |
| 
 | |
|     start_time = time.time()
 | |
|     length_written, new_metadata, upload_error = storage.stream_upload_chunk(
 | |
|       location_set,
 | |
|       blob_upload.uuid,
 | |
|       start_offset,
 | |
|       length,
 | |
|       input_fp,
 | |
|       blob_upload.storage_metadata,
 | |
|       content_type=BLOB_CONTENT_TYPE,)
 | |
| 
 | |
|     if upload_error is not None:
 | |
|       logger.error('storage.stream_upload_chunk returned error %s', upload_error)
 | |
|       return None
 | |
| 
 | |
|     # Update the chunk upload time metric.
 | |
|     metric_queue.chunk_upload_time.Observe(time.time() - start_time, labelvalues=[
 | |
|       length_written, list(location_set)[0]])
 | |
| 
 | |
|   # If we determined an uncompressed size and this is the first chunk, add it to the blob.
 | |
|   # Otherwise, we clear the size from the blob as it was uploaded in multiple chunks.
 | |
|   if size_info is not None and blob_upload.chunk_count == 0 and size_info.is_valid:
 | |
|     blob_upload.uncompressed_byte_count = size_info.uncompressed_size
 | |
|   elif length_written > 0:
 | |
|     # Otherwise, if we wrote some bytes and the above conditions were not met, then we don't
 | |
|     # know the uncompressed size.
 | |
|     blob_upload.uncompressed_byte_count = None
 | |
| 
 | |
|   if piece_hasher is not None:
 | |
|     blob_upload.piece_hashes = piece_hasher.piece_hashes
 | |
|     blob_upload.piece_sha_state = piece_hasher.hash_fragment
 | |
| 
 | |
|   blob_upload.storage_metadata = new_metadata
 | |
|   blob_upload.byte_count += length_written
 | |
|   blob_upload.chunk_count += 1
 | |
| 
 | |
|   # Ensure we have not gone beyond the max layer size.
 | |
|   upload_size = bitmath.Byte(blob_upload.byte_count)
 | |
|   if upload_size > max_layer_size:
 | |
|     raise LayerTooLarge(uploaded=upload_size.bytes, max_allowed=max_layer_size.bytes)
 | |
| 
 | |
|   return blob_upload
 | |
| 
 | |
| 
 | |
| def _validate_digest(blob_upload, expected_digest):
 | |
|   """
 | |
|   Verifies that the digest's SHA matches that of the uploaded data.
 | |
|   """
 | |
|   computed_digest = digest_tools.sha256_digest_from_hashlib(blob_upload.sha_state)
 | |
|   if not digest_tools.digests_equal(computed_digest, expected_digest):
 | |
|     logger.error('Digest mismatch for upload %s: Expected digest %s, found digest %s',
 | |
|                  blob_upload.uuid, expected_digest, computed_digest)
 | |
|     raise BlobUploadInvalid(detail={'reason': 'Digest mismatch on uploaded blob'})
 | |
| 
 | |
| 
 | |
| def _finalize_blob_storage(blob_upload, expected_digest):
 | |
|   """
 | |
|   When an upload is successful, this ends the uploading process from the
 | |
|   storage's perspective.
 | |
| 
 | |
|   Returns True if the blob already existed.
 | |
|   """
 | |
|   final_blob_location = digest_tools.content_path(expected_digest)
 | |
| 
 | |
|   # Move the storage into place, or if this was a re-upload, cancel it
 | |
|   with database.CloseForLongOperation(app.config):
 | |
|     already_existed = storage.exists({blob_upload.location_name}, final_blob_location)
 | |
|     if already_existed:
 | |
|       # It already existed, clean up our upload which served as proof that the
 | |
|       # uploader had the blob.
 | |
|       storage.cancel_chunked_upload({blob_upload.location_name}, blob_upload.uuid,
 | |
|                                     blob_upload.storage_metadata)
 | |
| 
 | |
|     else:
 | |
|       # We were the first ones to upload this image (at least to this location)
 | |
|       # Let's copy it into place
 | |
|       storage.complete_chunked_upload({blob_upload.location_name}, blob_upload.uuid,
 | |
|                                       final_blob_location, blob_upload.storage_metadata)
 | |
|   return already_existed
 | |
| 
 | |
| 
 | |
| def _finalize_blob_database(namespace_name, repo_name, blob_upload, digest, already_existed):
 | |
|   """
 | |
|   When an upload is successful, this ends the uploading process from the
 | |
|   database's perspective.
 | |
|   """
 | |
|   # Create the blob and temporarily tag it.
 | |
|   blob_storage = model.create_blob_and_temp_tag(
 | |
|     namespace_name,
 | |
|     repo_name,
 | |
|     digest,
 | |
|     blob_upload,
 | |
|     app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'],)
 | |
| 
 | |
|   # If it doesn't already exist, create the BitTorrent pieces for the blob.
 | |
|   if blob_upload.piece_sha_state is not None and not already_existed:
 | |
|     piece_bytes = blob_upload.piece_hashes + blob_upload.piece_sha_state.digest()
 | |
|     model.save_bittorrent_pieces(blob_storage, app.config['BITTORRENT_PIECE_SIZE'], piece_bytes)
 | |
| 
 | |
|   # Delete the blob upload.
 | |
|   model.delete_blob_upload(namespace_name, repo_name, blob_upload.uuid)
 | |
| 
 | |
| 
 | |
| def _finish_upload(namespace_name, repo_name, blob_upload, digest):
 | |
|   """
 | |
|   When an upload is successful, this ends the uploading process.
 | |
|   """
 | |
|   _validate_digest(blob_upload, digest)
 | |
|   _finalize_blob_database(
 | |
|     namespace_name,
 | |
|     repo_name,
 | |
|     blob_upload,
 | |
|     digest,
 | |
|     _finalize_blob_storage(blob_upload, digest),)
 |