Implement V2 interfaces and remaining V1 interfaces
Also adds some tests to registry tests for V1 stuff. Note: All *registry* tests currently pass, but as verbs are not yet converted, the verb tests in registry_tests.py currently fail.
This commit is contained in:
parent
d67991987b
commit
db60df827d
21 changed files with 588 additions and 338 deletions
|
@ -8,6 +8,7 @@ import resumablehashlib
|
|||
from app import storage, app
|
||||
from auth.registry_jwt_auth import process_registry_jwt_auth
|
||||
from data import database
|
||||
from data.interfaces import v2
|
||||
from digest import digest_tools
|
||||
from endpoints.common import parse_repository_name
|
||||
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
|
||||
|
@ -134,7 +135,7 @@ def start_blob_upload(namespace_name, repo_name):
|
|||
|
||||
# The user plans to send us the entire body right now.
|
||||
# Find the upload.
|
||||
blob_upload = v2.blob_upload_by_uuid(new_upload_uuid)
|
||||
blob_upload = v2.blob_upload_by_uuid(namespace_name, repo_name, new_upload_uuid)
|
||||
if blob_upload is None:
|
||||
raise BlobUploadUnknown()
|
||||
|
||||
|
@ -142,7 +143,7 @@ def start_blob_upload(namespace_name, repo_name):
|
|||
# the upload state.
|
||||
updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range'))
|
||||
if updated_blob_upload is None:
|
||||
_abort_range_not_satisfiable(updated_blob_upload.byte_count, new_upload_uuid)
|
||||
_abort_range_not_satisfiable(blob_upload.byte_count, new_upload_uuid)
|
||||
|
||||
# Save the upload state to the database.
|
||||
v2.update_blob_upload(updated_blob_upload)
|
||||
|
@ -195,7 +196,7 @@ def upload_chunk(namespace_name, repo_name, upload_uuid):
|
|||
# the upload state.
|
||||
updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range'))
|
||||
if updated_blob_upload is None:
|
||||
_abort_range_not_satisfiable(updated_blob_upload.byte_count, upload_uuid)
|
||||
_abort_range_not_satisfiable(blob_upload.byte_count, upload_uuid)
|
||||
|
||||
# Save the upload state to the database.
|
||||
v2.update_blob_upload(updated_blob_upload)
|
||||
|
@ -231,7 +232,7 @@ def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid):
|
|||
# the upload state.
|
||||
updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range'))
|
||||
if updated_blob_upload is None:
|
||||
_abort_range_not_satisfiable(updated_blob_upload.byte_count, upload_uuid)
|
||||
_abort_range_not_satisfiable(blob_upload.byte_count, upload_uuid)
|
||||
|
||||
# Finalize the upload process in the database and storage.
|
||||
_finish_upload(namespace_name, repo_name, updated_blob_upload, digest)
|
||||
|
@ -253,14 +254,15 @@ def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid):
|
|||
@require_repo_write
|
||||
@anon_protect
|
||||
def cancel_upload(namespace_name, repo_name, upload_uuid):
|
||||
upload = v2.blob_upload_by_uuid(upload_uuid)
|
||||
if upload is None:
|
||||
blob_upload = v2.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid)
|
||||
if blob_upload is None:
|
||||
raise BlobUploadUnknown()
|
||||
|
||||
# We delete the record for the upload first, since if the partial upload in
|
||||
# storage fails to delete, it doesn't break anything
|
||||
v2.delete_blob_upload(upload_uuid)
|
||||
storage.cancel_chunked_upload({upload.location_name}, upload.uuid, upload.storage_metadata)
|
||||
# storage fails to delete, it doesn't break anything.
|
||||
v2.delete_blob_upload(namespace_name, repo_name, upload_uuid)
|
||||
storage.cancel_chunked_upload({blob_upload.location_name}, blob_upload.uuid,
|
||||
blob_upload.storage_metadata)
|
||||
|
||||
return Response(status=204)
|
||||
|
||||
|
@ -342,7 +344,7 @@ def _upload_chunk(blob_upload, range_header):
|
|||
"""
|
||||
# Get the offset and length of the current chunk.
|
||||
start_offset, length = _start_offset_and_length(range_header)
|
||||
if None in {blob_upload, start_offset, length}:
|
||||
if blob_upload is None or None in {start_offset, length}:
|
||||
logger.error('Invalid arguments provided to _upload_chunk')
|
||||
return None
|
||||
|
||||
|
@ -393,7 +395,7 @@ def _upload_chunk(blob_upload, range_header):
|
|||
size_info, fn = calculate_size_handler()
|
||||
input_fp = wrap_with_handler(input_fp, fn)
|
||||
|
||||
length_written, new_metadata, error = storage.stream_upload_chunk(
|
||||
length_written, new_metadata, upload_error = storage.stream_upload_chunk(
|
||||
location_set,
|
||||
blob_upload.uuid,
|
||||
start_offset,
|
||||
|
@ -402,8 +404,9 @@ def _upload_chunk(blob_upload, range_header):
|
|||
blob_upload.storage_metadata,
|
||||
content_type=BLOB_CONTENT_TYPE,
|
||||
)
|
||||
if error is not None:
|
||||
logger.error('storage.stream_upload_chunk returned error %s', error)
|
||||
|
||||
if upload_error is not None:
|
||||
logger.error('storage.stream_upload_chunk returned error %s', upload_error)
|
||||
return None
|
||||
|
||||
# If we determined an uncompressed size and this is the first chunk, add it to the blob.
|
||||
|
@ -418,6 +421,7 @@ def _upload_chunk(blob_upload, range_header):
|
|||
if piece_hasher is not None:
|
||||
blob_upload.piece_hashes = piece_hasher.piece_hashes
|
||||
blob_upload.piece_sha_state = piece_hasher.hash_fragment
|
||||
|
||||
blob_upload.storage_metadata = new_metadata
|
||||
blob_upload.byte_count += length_written
|
||||
blob_upload.chunk_count += 1
|
||||
|
@ -471,19 +475,17 @@ def _finalize_blob_database(namespace_name, repo_name, blob_upload, digest, alre
|
|||
namespace_name,
|
||||
repo_name,
|
||||
digest,
|
||||
blob_upload.location_name,
|
||||
blob_upload.byte_count,
|
||||
blob_upload.uncompressed_byte_count,
|
||||
blob_upload,
|
||||
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'],
|
||||
)
|
||||
|
||||
# If it doesn't already exist, create the BitTorrent pieces for the blob.
|
||||
if blob_upload.piece_sha_state is not None and not already_existed:
|
||||
piece_bytes = blob_upload.piece_hashes + blob_upload.piece_sha_state.digest()
|
||||
v2.create_bittorrent_pieces(blob_storage, app.config['BITTORRENT_PIECE_SIZE'], piece_bytes)
|
||||
v2.save_bittorrent_pieces(blob_storage, app.config['BITTORRENT_PIECE_SIZE'], piece_bytes)
|
||||
|
||||
# Delete the blob upload.
|
||||
v2.delete_upload(blob_upload.uuid)
|
||||
v2.delete_blob_upload(namespace_name, repo_name, blob_upload.uuid)
|
||||
|
||||
|
||||
def _finish_upload(namespace_name, repo_name, blob_upload, digest):
|
||||
|
|
Reference in a new issue