v2: refactor blob.py to use data.types
This commit is contained in:
parent
21cbe0bd07
commit
3de6000428
3 changed files with 387 additions and 207 deletions
|
@ -1,8 +1,10 @@
|
||||||
from data.types import (
|
from data.types import (
|
||||||
|
Blob,
|
||||||
|
BlobUpload,
|
||||||
|
DockerV1Metadata,
|
||||||
|
ManifestJSON,
|
||||||
Repository,
|
Repository,
|
||||||
Tag,
|
Tag,
|
||||||
ManifestJSON,
|
|
||||||
DockerV1Metadata,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_repository(namespace_name, repo_name):
|
def get_repository(namespace_name, repo_name):
|
||||||
|
@ -108,3 +110,73 @@ def repository_tags(namespace_name, repo_name, limit, offset):
|
||||||
|
|
||||||
def get_visible_repositories(username, limit, offset):
|
def get_visible_repositories(username, limit, offset):
|
||||||
return [Repository()]
|
return [Repository()]
|
||||||
|
|
||||||
|
|
||||||
|
def create_blob_upload(namespace_name, repo_name, upload_uuid, location_name, storage_metadata):
|
||||||
|
"""
|
||||||
|
Creates a blob upload.
|
||||||
|
|
||||||
|
Returns False if the upload's repository does not exist.
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
model.blob.initiate_upload(namespace_name, repo_name, new_upload_uuid, location_name,
|
||||||
|
upload_metadata)
|
||||||
|
return True
|
||||||
|
except database.Repository.DoesNotExist:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def blob_upload_by_uuid(uuid):
|
||||||
|
try:
|
||||||
|
found = model.blob.get_blob_upload(namespace_name, repo_name, upload_uuid)
|
||||||
|
except model.InvalidBlobUpload:
|
||||||
|
raise BlobUploadUnknown()
|
||||||
|
|
||||||
|
return BlobUpload(
|
||||||
|
uuid=uuid,
|
||||||
|
byte_count=found.byte_count,
|
||||||
|
uncompressed_byte_count=found.uncompressed_byte_count,
|
||||||
|
chunk_count=found.chunk_count,
|
||||||
|
location_name=found.location.name,
|
||||||
|
storage_metadata=found.storage_metadata,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def update_blob_upload(blob_upload):
|
||||||
|
# old implementation:
|
||||||
|
# blob_upload.save()
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def delete_blob_upload(uuid):
|
||||||
|
try:
|
||||||
|
found = model.blob.get_blob_upload(namespace_name, repo_name, upload_uuid)
|
||||||
|
except model.InvalidBlobUpload:
|
||||||
|
raise BlobUploadUnknown()
|
||||||
|
|
||||||
|
found.delete_instance()
|
||||||
|
|
||||||
|
def create_blob_and_temp_tag(namespace_name, repo_name, expected_digest, upload_obj):
|
||||||
|
return model.blob.store_blob_record_and_temp_link(namespace_name, repo_name, expected_digest,
|
||||||
|
upload_obj.location,
|
||||||
|
upload_obj.byte_count,
|
||||||
|
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'],
|
||||||
|
upload_obj.uncompressed_byte_count)
|
||||||
|
|
||||||
|
|
||||||
|
def blob_by_digest(namespace_name, repo_name, digest):
|
||||||
|
try:
|
||||||
|
return model.blob.get_repo_blob_by_digest(namespace_name, repo_name, digest)
|
||||||
|
except model.BlobDoesNotExist:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def create_bittorrent_pieces(blob_storage, piece_size, piece_bytes)
|
||||||
|
model.storage.save_torrent_info(blob_storage.id, piece_size, piece_bytes)
|
||||||
|
|
||||||
|
|
||||||
|
def get_blob_path(blob):
|
||||||
|
# Once everything is moved over, this could be in util.registry and not even
|
||||||
|
# touch the database.
|
||||||
|
model.storage.get_layer_path(blob)
|
||||||
|
|
|
@ -344,3 +344,15 @@ DockerV1Metadata = namedtuple('DockerV1Metadata', ['namespace_name',
|
||||||
'command',
|
'command',
|
||||||
'parent_image_id',
|
'parent_image_id',
|
||||||
'compat_json'])
|
'compat_json'])
|
||||||
|
|
||||||
|
BlobUpload = namedtuple('BlobUpload', ['uuid',
|
||||||
|
'byte_count',
|
||||||
|
'uncompressed_byte_count',
|
||||||
|
'chunk_count',
|
||||||
|
'sha_state',
|
||||||
|
'location_name',
|
||||||
|
'storage_metadata',
|
||||||
|
'piece_sha_state',
|
||||||
|
'piece_hashes'])
|
||||||
|
|
||||||
|
Blob = namedtuple('Blob', ['digest', 'size', 'locations'])
|
||||||
|
|
|
@ -7,7 +7,7 @@ import resumablehashlib
|
||||||
|
|
||||||
from app import storage, app
|
from app import storage, app
|
||||||
from auth.registry_jwt_auth import process_registry_jwt_auth
|
from auth.registry_jwt_auth import process_registry_jwt_auth
|
||||||
from data import model, database
|
from data import database
|
||||||
from digest import digest_tools
|
from digest import digest_tools
|
||||||
from endpoints.common import parse_repository_name
|
from endpoints.common import parse_repository_name
|
||||||
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
|
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
|
||||||
|
@ -33,28 +33,6 @@ class _InvalidRangeHeader(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def _base_blob_fetch(namespace_name, repo_name, digest):
|
|
||||||
""" Some work that is common to both GET and HEAD requests. Callers MUST check for proper
|
|
||||||
authorization before calling this method.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
found = model.blob.get_repo_blob_by_digest(namespace_name, repo_name, digest)
|
|
||||||
except model.BlobDoesNotExist:
|
|
||||||
raise BlobUnknown()
|
|
||||||
|
|
||||||
headers = {
|
|
||||||
'Docker-Content-Digest': digest,
|
|
||||||
}
|
|
||||||
|
|
||||||
# Add the Accept-Ranges header if the storage engine supports resumable
|
|
||||||
# downloads.
|
|
||||||
if storage.get_supports_resumable_downloads(found.locations):
|
|
||||||
logger.debug('Storage supports resumable downloads')
|
|
||||||
headers['Accept-Ranges'] = 'bytes'
|
|
||||||
|
|
||||||
return found, headers
|
|
||||||
|
|
||||||
|
|
||||||
@v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD'])
|
@v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD'])
|
||||||
@parse_repository_name()
|
@parse_repository_name()
|
||||||
@process_registry_jwt_auth(scopes=['pull'])
|
@process_registry_jwt_auth(scopes=['pull'])
|
||||||
|
@ -62,12 +40,25 @@ def _base_blob_fetch(namespace_name, repo_name, digest):
|
||||||
@anon_protect
|
@anon_protect
|
||||||
@cache_control(max_age=31436000)
|
@cache_control(max_age=31436000)
|
||||||
def check_blob_exists(namespace_name, repo_name, digest):
|
def check_blob_exists(namespace_name, repo_name, digest):
|
||||||
found, headers = _base_blob_fetch(namespace_name, repo_name, digest)
|
# Find the blob.
|
||||||
|
blob = v2.blob_by_digest(namespace_name, repo_name, digest)
|
||||||
|
if blob is None:
|
||||||
|
raise BlobUnknown()
|
||||||
|
|
||||||
|
# Build the response headers.
|
||||||
|
headers = {
|
||||||
|
'Docker-Content-Digest': digest,
|
||||||
|
'Content-Length': blob.size,
|
||||||
|
'Content-Type': BLOB_CONTENT_TYPE,
|
||||||
|
}
|
||||||
|
|
||||||
|
# If our storage supports range requests, let the Docker client know.
|
||||||
|
if storage.get_supports_resumable_downloads(blob.locations):
|
||||||
|
headers['Accept-Ranges'] = 'bytes'
|
||||||
|
|
||||||
|
# Write the response to the Docker client.
|
||||||
response = make_response('')
|
response = make_response('')
|
||||||
response.headers.extend(headers)
|
response.headers.extend(headers)
|
||||||
response.headers['Content-Length'] = found.image_size
|
|
||||||
response.headers['Content-Type'] = BLOB_CONTENT_TYPE
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
@ -78,31 +69,42 @@ def check_blob_exists(namespace_name, repo_name, digest):
|
||||||
@anon_protect
|
@anon_protect
|
||||||
@cache_control(max_age=31536000)
|
@cache_control(max_age=31536000)
|
||||||
def download_blob(namespace_name, repo_name, digest):
|
def download_blob(namespace_name, repo_name, digest):
|
||||||
found, headers = _base_blob_fetch(namespace_name, repo_name, digest)
|
# Find the blob.
|
||||||
|
blob = v2.blob_by_digest(namespace_name, repo_name, digest)
|
||||||
|
if blob is None:
|
||||||
|
raise BlobUnknown()
|
||||||
|
|
||||||
path = model.storage.get_layer_path(found)
|
# Build the response headers.
|
||||||
|
headers = {'Docker-Content-Digest': digest}
|
||||||
|
|
||||||
|
# If our storage supports range requests, let the Docker client know.
|
||||||
|
if storage.get_supports_resumable_downloads(blob.locations):
|
||||||
|
headers['Accept-Ranges'] = 'bytes'
|
||||||
|
|
||||||
|
# Find the storage path for the blob.
|
||||||
|
path = v2.get_blob_path(blob)
|
||||||
|
|
||||||
|
# Short-circuit by redirecting if the storage supports it.
|
||||||
logger.debug('Looking up the direct download URL for path: %s', path)
|
logger.debug('Looking up the direct download URL for path: %s', path)
|
||||||
direct_download_url = storage.get_direct_download_url(found.locations, path)
|
direct_download_url = storage.get_direct_download_url(blob.locations, path)
|
||||||
|
|
||||||
if direct_download_url:
|
if direct_download_url:
|
||||||
logger.debug('Returning direct download URL')
|
logger.debug('Returning direct download URL')
|
||||||
resp = redirect(direct_download_url)
|
resp = redirect(direct_download_url)
|
||||||
resp.headers.extend(headers)
|
resp.headers.extend(headers)
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
logger.debug('Streaming layer data')
|
# Close the database connection before we stream the download.
|
||||||
|
logger.debug('Closing database connection before streaming layer data')
|
||||||
# Close the database handle here for this process before we send the long download.
|
|
||||||
database.close_db_filter(None)
|
database.close_db_filter(None)
|
||||||
|
|
||||||
headers['Content-Length'] = found.image_size
|
# Stream the response to the Docker client.
|
||||||
headers['Content-Type'] = BLOB_CONTENT_TYPE
|
return Response(
|
||||||
|
storage.stream_read(blob.locations, path),
|
||||||
return Response(storage.stream_read(found.locations, path), headers=headers)
|
headers=headers.update({
|
||||||
|
'Content-Length': blob.size,
|
||||||
|
'Content-Type': BLOB_CONTENT_TYPE,
|
||||||
def _render_range(num_uploaded_bytes, with_bytes_prefix=True):
|
}),
|
||||||
return '{0}0-{1}'.format('bytes=' if with_bytes_prefix else '', num_uploaded_bytes - 1)
|
)
|
||||||
|
|
||||||
|
|
||||||
@v2_bp.route('/<repopath:repository>/blobs/uploads/', methods=['POST'])
|
@v2_bp.route('/<repopath:repository>/blobs/uploads/', methods=['POST'])
|
||||||
|
@ -111,37 +113,50 @@ def _render_range(num_uploaded_bytes, with_bytes_prefix=True):
|
||||||
@require_repo_write
|
@require_repo_write
|
||||||
@anon_protect
|
@anon_protect
|
||||||
def start_blob_upload(namespace_name, repo_name):
|
def start_blob_upload(namespace_name, repo_name):
|
||||||
|
# Begin the blob upload process in the database and storage.
|
||||||
location_name = storage.preferred_locations[0]
|
location_name = storage.preferred_locations[0]
|
||||||
new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name)
|
new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name)
|
||||||
|
repository_exists = v2.create_blob_upload(namespace_name, repo_name, new_upload_uuid,
|
||||||
try:
|
location_name, upload_metadata)
|
||||||
model.blob.initiate_upload(namespace_name, repo_name, new_upload_uuid, location_name,
|
if not repository_exists:
|
||||||
upload_metadata)
|
|
||||||
except database.Repository.DoesNotExist:
|
|
||||||
raise NameUnknown()
|
raise NameUnknown()
|
||||||
|
|
||||||
digest = request.args.get('digest', None)
|
digest = request.args.get('digest', None)
|
||||||
if digest is None:
|
if digest is None:
|
||||||
# The user will send the blob data in another request
|
# Short-circuit because the user will send the blob data in another request.
|
||||||
accepted = make_response('', 202)
|
accepted = make_response('', 202)
|
||||||
accepted.headers['Location'] = url_for('v2.upload_chunk',
|
accepted.headers['Location'] = url_for('v2.upload_chunk',
|
||||||
repository='%s/%s' % (namespace_name, repo_name),
|
repository='%s/%s' % (namespace_name, repo_name),
|
||||||
upload_uuid=new_upload_uuid)
|
upload_uuid=new_upload_uuid)
|
||||||
|
|
||||||
accepted.headers['Range'] = _render_range(0)
|
accepted.headers['Range'] = _render_range(0)
|
||||||
accepted.headers['Docker-Upload-UUID'] = new_upload_uuid
|
accepted.headers['Docker-Upload-UUID'] = new_upload_uuid
|
||||||
return accepted
|
return accepted
|
||||||
else:
|
|
||||||
# The user plans to send us the entire body right now
|
|
||||||
blob_upload, upload_error = _upload_chunk(namespace_name, repo_name, new_upload_uuid)
|
|
||||||
blob_upload.save()
|
|
||||||
|
|
||||||
if upload_error:
|
# The user plans to send us the entire body right now.
|
||||||
logger.error('Got error when uploading chunk for blob %s under repository %s/%s: %s',
|
# Find the upload.
|
||||||
namespace_name, repo_name, new_upload_uuid, upload_error)
|
blob_upload = v2.blob_upload_by_uuid(new_upload_uuid)
|
||||||
_range_not_satisfiable(blob_upload.byte_count)
|
if blob_upload is None:
|
||||||
|
raise BlobUploadUnknown()
|
||||||
|
|
||||||
return _finish_upload(namespace_name, repo_name, blob_upload, digest)
|
# Upload the chunk to storage while calculating some metadata and updating
|
||||||
|
# the upload state.
|
||||||
|
updated_blob_upload = _upload_chunk(blob_upload, *_start_offset_and_length(request.headers))
|
||||||
|
if updated_blob_upload is None:
|
||||||
|
_abort_range_not_satisfiable(updated_blob_upload.byte_count, new_upload_uuid)
|
||||||
|
|
||||||
|
# Save the upload state to the database.
|
||||||
|
v2.update_blob_upload(updated_blob_upload)
|
||||||
|
|
||||||
|
# Finalize the upload process in the database and storage.
|
||||||
|
_finish_upload(namespace_name, repo_name, updated_blob_upload, digest)
|
||||||
|
|
||||||
|
# Write the response to the docker client.
|
||||||
|
response = make_response('', 201)
|
||||||
|
response.headers['Docker-Content-Digest'] = digest
|
||||||
|
response.headers['Location'] = url_for('v2.download_blob',
|
||||||
|
repository='%s/%s' % (namespace_name, repo_name),
|
||||||
|
digest=digest)
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['GET'])
|
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['GET'])
|
||||||
|
@ -150,33 +165,141 @@ def start_blob_upload(namespace_name, repo_name):
|
||||||
@require_repo_write
|
@require_repo_write
|
||||||
@anon_protect
|
@anon_protect
|
||||||
def fetch_existing_upload(namespace_name, repo_name, upload_uuid):
|
def fetch_existing_upload(namespace_name, repo_name, upload_uuid):
|
||||||
try:
|
blob_upload = v2.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid)
|
||||||
found = model.blob.get_blob_upload(namespace_name, repo_name, upload_uuid)
|
if blob_upload is None:
|
||||||
except model.InvalidBlobUpload:
|
|
||||||
raise BlobUploadUnknown()
|
raise BlobUploadUnknown()
|
||||||
|
|
||||||
# Note: Docker byte ranges are exclusive so we have to add one to the byte count.
|
|
||||||
accepted = make_response('', 204)
|
accepted = make_response('', 204)
|
||||||
accepted.headers['Range'] = _render_range(found.byte_count + 1)
|
accepted.headers.extend({
|
||||||
accepted.headers['Docker-Upload-UUID'] = upload_uuid
|
'Docker-Upload-UUID': upload_uuid,
|
||||||
|
'Range': _render_range(blob_upload.byte_count+1), # Docker byte ranges are exclusive
|
||||||
|
})
|
||||||
return accepted
|
return accepted
|
||||||
|
|
||||||
|
|
||||||
|
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['PATCH'])
|
||||||
|
@parse_repository_name()
|
||||||
|
@process_registry_jwt_auth(scopes=['pull', 'push'])
|
||||||
|
@require_repo_write
|
||||||
|
@anon_protect
|
||||||
|
def upload_chunk(namespace_name, repo_name, upload_uuid):
|
||||||
|
# Find the upload.
|
||||||
|
blob_upload = v2.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid)
|
||||||
|
if blob_upload is None:
|
||||||
|
raise BlobUploadUnknown()
|
||||||
|
|
||||||
|
# Upload the chunk to storage while calculating some metadata and updating
|
||||||
|
# the upload state.
|
||||||
|
updated_blob_upload = _upload_chunk(blob_upload, *_start_offset_and_length(request.headers))
|
||||||
|
if updated_blob_upload is None:
|
||||||
|
_abort_range_not_satisfiable(updated_blob_upload.byte_count, upload_uuid)
|
||||||
|
|
||||||
|
# Save the upload state to the database.
|
||||||
|
v2.update_blob_upload(updated_blob_upload)
|
||||||
|
|
||||||
|
# Write the response to the Docker client.
|
||||||
|
accepted = make_response('', 204)
|
||||||
|
accepted.headers.extend({
|
||||||
|
'Location': _current_request_path(),
|
||||||
|
'Range': _render_range(updated_blob_upload.byte_count, with_bytes_prefix=False),
|
||||||
|
'Docker-Upload-UUID': upload_uuid,
|
||||||
|
})
|
||||||
|
return accepted
|
||||||
|
|
||||||
|
|
||||||
|
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['PUT'])
|
||||||
|
@parse_repository_name()
|
||||||
|
@process_registry_jwt_auth(scopes=['pull', 'push'])
|
||||||
|
@require_repo_write
|
||||||
|
@anon_protect
|
||||||
|
def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid):
|
||||||
|
# Ensure the digest is present before proceeding.
|
||||||
|
digest = request.args.get('digest', None)
|
||||||
|
if digest is None:
|
||||||
|
raise BlobUploadInvalid()
|
||||||
|
|
||||||
|
# Find the upload.
|
||||||
|
blob_upload = v2.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid)
|
||||||
|
if blob_upload is None:
|
||||||
|
raise BlobUploadUnknown()
|
||||||
|
|
||||||
|
# Upload the chunk to storage while calculating some metadata and updating
|
||||||
|
# the upload state.
|
||||||
|
updated_blob_upload = _upload_chunk(blob_upload, *_start_offset_and_length(request.headers))
|
||||||
|
if updated_blob_upload is None:
|
||||||
|
_abort_range_not_satisfiable(updated_blob_upload.byte_count, upload_uuid)
|
||||||
|
|
||||||
|
# Finalize the upload process in the database and storage.
|
||||||
|
_finish_upload(namespace_name, repo_name, updated_blob_upload, digest)
|
||||||
|
|
||||||
|
# Write the response to the Docker client.
|
||||||
|
response = make_response('', 201)
|
||||||
|
response.headers.extend({
|
||||||
|
'Docker-Content-Digest': digest,
|
||||||
|
'Location': url_for('v2.download_blob', repository='%s/%s' % (namespace_name, repo_name),
|
||||||
|
digest=digest)
|
||||||
|
})
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['DELETE'])
|
||||||
|
@parse_repository_name()
|
||||||
|
@process_registry_jwt_auth(scopes=['pull', 'push'])
|
||||||
|
@require_repo_write
|
||||||
|
@anon_protect
|
||||||
|
def cancel_upload(namespace_name, repo_name, upload_uuid):
|
||||||
|
upload = v2.blob_upload_by_uuid(upload_uuid)
|
||||||
|
if upload is None:
|
||||||
|
raise BlobUploadUnknown()
|
||||||
|
|
||||||
|
# We delete the record for the upload first, since if the partial upload in
|
||||||
|
# storage fails to delete, it doesn't break anything
|
||||||
|
v2.delete_blob_upload(upload_uuid)
|
||||||
|
storage.cancel_chunked_upload({upload.location_name}, upload.uuid, upload.storage_metadata)
|
||||||
|
|
||||||
|
return make_response('', 204)
|
||||||
|
|
||||||
|
|
||||||
|
@v2_bp.route('/<repopath:repository>/blobs/<digest>', methods=['DELETE'])
|
||||||
|
@parse_repository_name()
|
||||||
|
@process_registry_jwt_auth(scopes=['pull', 'push'])
|
||||||
|
@require_repo_write
|
||||||
|
@anon_protect
|
||||||
|
def delete_digest(namespace_name, repo_name, upload_uuid):
|
||||||
|
# We do not support deleting arbitrary digests, as they break repo images.
|
||||||
|
raise Unsupported()
|
||||||
|
|
||||||
|
|
||||||
|
def _render_range(num_uploaded_bytes, with_bytes_prefix=True):
|
||||||
|
return '{0}0-{1}'.format('bytes=' if with_bytes_prefix else '', num_uploaded_bytes - 1)
|
||||||
|
|
||||||
|
|
||||||
def _current_request_path():
|
def _current_request_path():
|
||||||
return '{0}{1}'.format(request.script_root, request.path)
|
return '{0}{1}'.format(request.script_root, request.path)
|
||||||
|
|
||||||
|
|
||||||
def _range_not_satisfiable(valid_end):
|
def _abort_range_not_satisfiable(valid_end, upload_uuid):
|
||||||
|
"""
|
||||||
|
Writes a failure response for scenarios where the registry cannot function
|
||||||
|
with the provided range.
|
||||||
|
|
||||||
|
TODO(jzelinskie): Unify this with the V2RegistryException class.
|
||||||
|
"""
|
||||||
invalid_range = make_response('', 416)
|
invalid_range = make_response('', 416)
|
||||||
invalid_range.headers['Location'] = _current_request_path()
|
invalid_range.headers.extend({
|
||||||
invalid_range.headers['Range'] = '0-{0}'.format(valid_end)
|
'Location': _current_request_path(),
|
||||||
invalid_range.headers['Docker-Upload-UUID'] = request.view_args['upload_uuid']
|
'Range': '0-{0}'.format(valid_end),
|
||||||
|
'Docker-Upload-UUID': upload_uuid,
|
||||||
|
})
|
||||||
flask_abort(invalid_range)
|
flask_abort(invalid_range)
|
||||||
|
|
||||||
|
|
||||||
def _parse_range_header(range_header_text):
|
def _parse_range_header(range_header_text):
|
||||||
""" Parses the range header, and returns a tuple of the start offset and the length,
|
"""
|
||||||
or raises an _InvalidRangeHeader exception.
|
Parses the range header.
|
||||||
|
|
||||||
|
Returns a tuple of the start offset and the length.
|
||||||
|
If the parse fails, raises _InvalidRangeHeader.
|
||||||
"""
|
"""
|
||||||
found = RANGE_HEADER_REGEX.match(range_header_text)
|
found = RANGE_HEADER_REGEX.match(range_header_text)
|
||||||
if found is None:
|
if found is None:
|
||||||
|
@ -191,60 +314,66 @@ def _parse_range_header(range_header_text):
|
||||||
return (start, length)
|
return (start, length)
|
||||||
|
|
||||||
|
|
||||||
def _upload_chunk(namespace_name, repo_name, upload_uuid):
|
def _start_offset_and_length(headers):
|
||||||
""" Common code among the various uploading paths for appending data to blobs.
|
"""
|
||||||
Callers MUST call .save() or .delete_instance() on the returned database object.
|
Returns a tuple of the start offset and the length.
|
||||||
Returns the BlobUpload object and the error that occurred, if any (or None if none).
|
If the range header doesn't exist, defaults to (0, -1).
|
||||||
|
If parsing fails, returns (None, None).
|
||||||
"""
|
"""
|
||||||
try:
|
|
||||||
found = model.blob.get_blob_upload(namespace_name, repo_name, upload_uuid)
|
|
||||||
except model.InvalidBlobUpload:
|
|
||||||
raise BlobUploadUnknown()
|
|
||||||
|
|
||||||
start_offset, length = 0, -1
|
start_offset, length = 0, -1
|
||||||
range_header = request.headers.get('range', None)
|
range_header = headers.get('range', None)
|
||||||
if range_header is not None:
|
if range_header is not None:
|
||||||
try:
|
try:
|
||||||
start_offset, length = _parse_range_header(range_header)
|
start_offset, length = _parse_range_header(range_header)
|
||||||
except _InvalidRangeHeader:
|
except _InvalidRangeHeader:
|
||||||
_range_not_satisfiable(found.byte_count)
|
return None, None
|
||||||
|
return start_offset, length
|
||||||
|
|
||||||
if start_offset > 0 and start_offset > found.byte_count:
|
|
||||||
_range_not_satisfiable(found.byte_count)
|
|
||||||
|
|
||||||
location_set = {found.location.name}
|
def _upload_chunk(blob_upload, start_offset, length):
|
||||||
|
"""
|
||||||
|
Calculates metadata while uploading a chunk to storage.
|
||||||
|
|
||||||
|
Returns a BlobUpload object or None if there was a failure.
|
||||||
|
"""
|
||||||
|
# Check for invalidate arguments.
|
||||||
|
if None in {blob_upload, start_offset, length}:
|
||||||
|
return None
|
||||||
|
if start_offset > 0 and start_offset > blob_upload.byte_count:
|
||||||
|
return None
|
||||||
|
|
||||||
|
location_set = {blob_upload.location_name}
|
||||||
|
|
||||||
upload_error = None
|
upload_error = None
|
||||||
with database.CloseForLongOperation(app.config):
|
with database.CloseForLongOperation(app.config):
|
||||||
input_fp = get_input_stream(request)
|
input_fp = get_input_stream(request)
|
||||||
|
|
||||||
if start_offset > 0 and start_offset < found.byte_count:
|
if start_offset > 0 and start_offset < blob_upload.byte_count:
|
||||||
# Skip the bytes which were received on a previous push, which are already stored and
|
# Skip the bytes which were received on a previous push, which are already stored and
|
||||||
# included in the sha calculation
|
# included in the sha calculation
|
||||||
overlap_size = found.byte_count - start_offset
|
overlap_size = blob_upload.byte_count - start_offset
|
||||||
input_fp = StreamSlice(input_fp, overlap_size)
|
input_fp = StreamSlice(input_fp, overlap_size)
|
||||||
|
|
||||||
# Update our upload bounds to reflect the skipped portion of the overlap
|
# Update our upload bounds to reflect the skipped portion of the overlap
|
||||||
start_offset = found.byte_count
|
start_offset = blob_upload.byte_count
|
||||||
length = max(length - overlap_size, 0)
|
length = max(length - overlap_size, 0)
|
||||||
|
|
||||||
# We use this to escape early in case we have already processed all of the bytes the user
|
# We use this to escape early in case we have already processed all of the bytes the user
|
||||||
# wants to upload
|
# wants to upload
|
||||||
if length == 0:
|
if length == 0:
|
||||||
return found, None
|
return blob_upload
|
||||||
|
|
||||||
input_fp = wrap_with_handler(input_fp, found.sha_state.update)
|
input_fp = wrap_with_handler(input_fp, blob_upload.sha_state.update)
|
||||||
|
|
||||||
# Add a hasher for calculating SHA1s for torrents if this is the first chunk and/or we have
|
# Add a hasher for calculating SHA1s for torrents if this is the first chunk and/or we have
|
||||||
# already calculated hash data for the previous chunk(s).
|
# already calculated hash data for the previous chunk(s).
|
||||||
piece_hasher = None
|
piece_hasher = None
|
||||||
if found.chunk_count == 0 or found.piece_sha_state:
|
if blob_upload.chunk_count == 0 or blob_upload.piece_sha_state:
|
||||||
initial_sha1_value = found.piece_sha_state or resumablehashlib.sha1()
|
initial_sha1_value = blob_upload.piece_sha_state or resumablehashlib.sha1()
|
||||||
initial_sha1_pieces_value = found.piece_hashes or ''
|
initial_sha1_pieces_value = blob_upload.piece_hashes or ''
|
||||||
|
|
||||||
piece_hasher = PieceHasher(app.config['BITTORRENT_PIECE_SIZE'], start_offset,
|
piece_hasher = PieceHasher(app.config['BITTORRENT_PIECE_SIZE'], start_offset,
|
||||||
initial_sha1_pieces_value,
|
initial_sha1_pieces_value, initial_sha1_value)
|
||||||
initial_sha1_value)
|
|
||||||
|
|
||||||
input_fp = wrap_with_handler(input_fp, piece_hasher.update)
|
input_fp = wrap_with_handler(input_fp, piece_hasher.update)
|
||||||
|
|
||||||
|
@ -252,147 +381,114 @@ def _upload_chunk(namespace_name, repo_name, upload_uuid):
|
||||||
# stream so we can determine the uncompressed size. We'll throw out this data if another chunk
|
# stream so we can determine the uncompressed size. We'll throw out this data if another chunk
|
||||||
# comes in, but in the common case Docker only sends one chunk.
|
# comes in, but in the common case Docker only sends one chunk.
|
||||||
size_info = None
|
size_info = None
|
||||||
if start_offset == 0 and found.chunk_count == 0:
|
if start_offset == 0 and blob_upload.chunk_count == 0:
|
||||||
size_info, fn = calculate_size_handler()
|
size_info, fn = calculate_size_handler()
|
||||||
input_fp = wrap_with_handler(input_fp, fn)
|
input_fp = wrap_with_handler(input_fp, fn)
|
||||||
|
|
||||||
chunk_result = storage.stream_upload_chunk(location_set, upload_uuid, start_offset, length,
|
try:
|
||||||
input_fp, found.storage_metadata,
|
length_written, new_metadata, error = storage.stream_upload_chunk(
|
||||||
content_type=BLOB_CONTENT_TYPE)
|
location_set,
|
||||||
length_written, new_metadata, upload_error = chunk_result
|
blob_upload.uuid,
|
||||||
|
start_offset,
|
||||||
|
length,
|
||||||
|
input_fp,
|
||||||
|
blob_upload.storage_metadata,
|
||||||
|
content_type=BLOB_CONTENT_TYPE,
|
||||||
|
)
|
||||||
|
if error is not None:
|
||||||
|
return None
|
||||||
|
except InvalidChunkException:
|
||||||
|
return None
|
||||||
|
|
||||||
# If we determined an uncompressed size and this is the first chunk, add it to the blob.
|
# If we determined an uncompressed size and this is the first chunk, add it to the blob.
|
||||||
# Otherwise, we clear the size from the blob as it was uploaded in multiple chunks.
|
# Otherwise, we clear the size from the blob as it was uploaded in multiple chunks.
|
||||||
if size_info is not None and found.chunk_count == 0 and size_info.is_valid:
|
if size_info is not None and blob_upload.chunk_count == 0 and size_info.is_valid:
|
||||||
found.uncompressed_byte_count = size_info.uncompressed_size
|
blob_upload.uncompressed_byte_count = size_info.uncompressed_size
|
||||||
elif length_written > 0:
|
elif length_written > 0:
|
||||||
# Otherwise, if we wrote some bytes and the above conditions were not met, then we don't
|
# Otherwise, if we wrote some bytes and the above conditions were not met, then we don't
|
||||||
# know the uncompressed size.
|
# know the uncompressed size.
|
||||||
found.uncompressed_byte_count = None
|
blob_upload.uncompressed_byte_count = None
|
||||||
|
|
||||||
if piece_hasher is not None:
|
if piece_hasher is not None:
|
||||||
found.piece_hashes = piece_hasher.piece_hashes
|
blob_upload.piece_hashes = piece_hasher.piece_hashes
|
||||||
found.piece_sha_state = piece_hasher.hash_fragment
|
blob_upload.piece_sha_state = piece_hasher.hash_fragment
|
||||||
|
blob_upload.storage_metadata = new_metadata
|
||||||
found.storage_metadata = new_metadata
|
blob_upload.byte_count += length_written
|
||||||
found.byte_count += length_written
|
blob_upload.chunk_count += 1
|
||||||
found.chunk_count += 1
|
return blob_upload
|
||||||
return found, upload_error
|
|
||||||
|
|
||||||
|
|
||||||
def _finish_upload(namespace_name, repo_name, upload_obj, expected_digest):
|
def _validate_digest(blob_upload, expected_digest):
|
||||||
# Verify that the digest's SHA matches that of the uploaded data.
|
"""
|
||||||
computed_digest = digest_tools.sha256_digest_from_hashlib(upload_obj.sha_state)
|
Verifies that the digest's SHA matches that of the uploaded data.
|
||||||
|
"""
|
||||||
|
computed_digest = digest_tools.sha256_digest_from_hashlib(blob_upload.sha_state)
|
||||||
if not digest_tools.digests_equal(computed_digest, expected_digest):
|
if not digest_tools.digests_equal(computed_digest, expected_digest):
|
||||||
logger.error('Digest mismatch for upload %s: Expected digest %s, found digest %s',
|
logger.error('Digest mismatch for upload %s: Expected digest %s, found digest %s',
|
||||||
upload_obj.uuid, expected_digest, computed_digest)
|
upload_obj.uuid, expected_digest, computed_digest)
|
||||||
raise BlobUploadInvalid(detail={'reason': 'Digest mismatch on uploaded blob'})
|
raise BlobUploadInvalid(detail={'reason': 'Digest mismatch on uploaded blob'})
|
||||||
|
|
||||||
|
|
||||||
|
def _finalize_blob_storage(blob_upload, expected_digest):
|
||||||
|
"""
|
||||||
|
When an upload is successful, this ends the uploading process from the
|
||||||
|
storage's perspective.
|
||||||
|
|
||||||
|
Returns True if the blob already existed.
|
||||||
|
"""
|
||||||
final_blob_location = digest_tools.content_path(expected_digest)
|
final_blob_location = digest_tools.content_path(expected_digest)
|
||||||
|
|
||||||
# Move the storage into place, or if this was a re-upload, cancel it
|
# Move the storage into place, or if this was a re-upload, cancel it
|
||||||
with database.CloseForLongOperation(app.config):
|
with database.CloseForLongOperation(app.config):
|
||||||
already_exists = storage.exists({upload_obj.location.name}, final_blob_location)
|
already_existed = storage.exists({blob_upload.location_name}, final_blob_location)
|
||||||
if already_exists:
|
if already_existed:
|
||||||
# It already existed, clean up our upload which served as proof that we had the file
|
# It already existed, clean up our upload which served as proof that the
|
||||||
storage.cancel_chunked_upload({upload_obj.location.name}, upload_obj.uuid,
|
# uploader had the blob.
|
||||||
upload_obj.storage_metadata)
|
storage.cancel_chunked_upload({blob_upload.location_name}, blob_upload.uuid,
|
||||||
|
blob_upload.storage_metadata)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# We were the first ones to upload this image (at least to this location)
|
# We were the first ones to upload this image (at least to this location)
|
||||||
# Let's copy it into place
|
# Let's copy it into place
|
||||||
storage.complete_chunked_upload({upload_obj.location.name}, upload_obj.uuid,
|
storage.complete_chunked_upload({blob_upload.location_name}, blob_upload.uuid,
|
||||||
final_blob_location, upload_obj.storage_metadata)
|
final_blob_location, blob_upload.storage_metadata)
|
||||||
|
return already_existed
|
||||||
|
|
||||||
# Mark the blob as uploaded.
|
|
||||||
blob_storage = model.blob.store_blob_record_and_temp_link(namespace_name, repo_name, expected_digest,
|
def _finalize_blob_database(namespace_name, repo_name, blob_upload, digest, already_existed):
|
||||||
upload_obj.location,
|
"""
|
||||||
upload_obj.byte_count,
|
When an upload is successful, this ends the uploading process from the
|
||||||
|
database's perspective.
|
||||||
|
"""
|
||||||
|
# Create the blob and temporarily tag it.
|
||||||
|
blob_storage = v2.create_blob_and_temp_tag(
|
||||||
|
namespace_name,
|
||||||
|
repo_name,
|
||||||
|
digest,
|
||||||
|
blob_upload.location_name,
|
||||||
|
blob_upload.byte_count,
|
||||||
|
blob_upload.uncompressed_byte_count,
|
||||||
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'],
|
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'],
|
||||||
upload_obj.uncompressed_byte_count)
|
)
|
||||||
|
|
||||||
if upload_obj.piece_sha_state is not None and not already_exists:
|
# If it doesn't already exist, create the BitTorrent pieces for the blob.
|
||||||
piece_bytes = upload_obj.piece_hashes + upload_obj.piece_sha_state.digest()
|
if blob_upload.piece_sha_state is not None and not already_existed:
|
||||||
model.storage.save_torrent_info(blob_storage, app.config['BITTORRENT_PIECE_SIZE'], piece_bytes)
|
piece_bytes = blob_upload.piece_hashes + blob_upload.piece_sha_state.digest()
|
||||||
|
v2.create_bittorrent_pieces(blob_storage, app.config['BITTORRENT_PIECE_SIZE'], piece_bytes)
|
||||||
|
|
||||||
# Delete the upload tracking row.
|
# Delete the blob upload.
|
||||||
upload_obj.delete_instance()
|
v2.delete_upload(blob_upload.uuid)
|
||||||
|
|
||||||
response = make_response('', 201)
|
|
||||||
response.headers['Docker-Content-Digest'] = expected_digest
|
|
||||||
response.headers['Location'] = url_for('v2.download_blob',
|
|
||||||
repository='%s/%s' % (namespace_name, repo_name),
|
|
||||||
digest=expected_digest)
|
|
||||||
return response
|
|
||||||
|
|
||||||
|
|
||||||
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['PATCH'])
|
def _finish_upload(namespace_name, repo_name, blob_upload, digest):
|
||||||
@parse_repository_name()
|
"""
|
||||||
@process_registry_jwt_auth(scopes=['pull', 'push'])
|
When an upload is successful, this ends the uploading process.
|
||||||
@require_repo_write
|
"""
|
||||||
@anon_protect
|
_validate_digest(blob_upload, digest)
|
||||||
def upload_chunk(namespace_name, repo_name, upload_uuid):
|
_finalize_blob_database(
|
||||||
blob_upload, upload_error = _upload_chunk(namespace_name, repo_name, upload_uuid)
|
namespace_name,
|
||||||
blob_upload.save()
|
repo_name,
|
||||||
|
blob_upload,
|
||||||
if upload_error:
|
digest,
|
||||||
logger.error('Got error when uploading chunk for blob %s under repository %s/%s: %s',
|
_finalize_blob_storage(blob_upload, digest),
|
||||||
namespace_name, repo_name, upload_uuid, upload_error)
|
)
|
||||||
_range_not_satisfiable(blob_upload.byte_count)
|
|
||||||
|
|
||||||
accepted = make_response('', 204)
|
|
||||||
accepted.headers['Location'] = _current_request_path()
|
|
||||||
accepted.headers['Range'] = _render_range(blob_upload.byte_count, with_bytes_prefix=False)
|
|
||||||
accepted.headers['Docker-Upload-UUID'] = upload_uuid
|
|
||||||
return accepted
|
|
||||||
|
|
||||||
|
|
||||||
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['PUT'])
|
|
||||||
@parse_repository_name()
|
|
||||||
@process_registry_jwt_auth(scopes=['pull', 'push'])
|
|
||||||
@require_repo_write
|
|
||||||
@anon_protect
|
|
||||||
def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid):
|
|
||||||
digest = request.args.get('digest', None)
|
|
||||||
if digest is None:
|
|
||||||
raise BlobUploadInvalid(detail={'reason': 'Missing digest arg on monolithic upload'})
|
|
||||||
|
|
||||||
blob_upload, upload_error = _upload_chunk(namespace_name, repo_name, upload_uuid)
|
|
||||||
blob_upload.save()
|
|
||||||
|
|
||||||
if upload_error:
|
|
||||||
logger.error('Got error when uploading chunk for blob %s under repository %s/%s: %s',
|
|
||||||
namespace_name, repo_name, upload_uuid, upload_error)
|
|
||||||
_range_not_satisfiable(blob_upload.byte_count)
|
|
||||||
|
|
||||||
return _finish_upload(namespace_name, repo_name, blob_upload, digest)
|
|
||||||
|
|
||||||
|
|
||||||
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['DELETE'])
|
|
||||||
@parse_repository_name()
|
|
||||||
@process_registry_jwt_auth(scopes=['pull', 'push'])
|
|
||||||
@require_repo_write
|
|
||||||
@anon_protect
|
|
||||||
def cancel_upload(namespace_name, repo_name, upload_uuid):
|
|
||||||
try:
|
|
||||||
found = model.blob.get_blob_upload(namespace_name, repo_name, upload_uuid)
|
|
||||||
except model.InvalidBlobUpload:
|
|
||||||
raise BlobUploadUnknown()
|
|
||||||
|
|
||||||
# We delete the record for the upload first, since if the partial upload in
|
|
||||||
# storage fails to delete, it doesn't break anything
|
|
||||||
found.delete_instance()
|
|
||||||
storage.cancel_chunked_upload({found.location.name}, found.uuid, found.storage_metadata)
|
|
||||||
|
|
||||||
return make_response('', 204)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@v2_bp.route('/<repopath:repository>/blobs/<digest>', methods=['DELETE'])
|
|
||||||
@parse_repository_name()
|
|
||||||
@process_registry_jwt_auth(scopes=['pull', 'push'])
|
|
||||||
@require_repo_write
|
|
||||||
@anon_protect
|
|
||||||
def delete_digest(namespace_name, repo_name, upload_uuid):
|
|
||||||
# We do not support deleting arbitrary digests, as they break repo images.
|
|
||||||
raise Unsupported()
|
|
||||||
|
|
Reference in a new issue