Convert V2 to use the blob uploader interface
This commit is contained in:
parent
0ae062be62
commit
7a68c41f1c
3 changed files with 170 additions and 308 deletions
|
@ -64,6 +64,20 @@ def retrieve_blob_upload_manager(repository_ref, blob_upload_id, storage, settin
|
||||||
|
|
||||||
return _BlobUploadManager(repository_ref, blob_upload, settings, storage)
|
return _BlobUploadManager(repository_ref, blob_upload, settings, storage)
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def complete_when_uploaded(blob_upload):
|
||||||
|
""" Wraps the given blob upload in a context manager that completes the upload when the context
|
||||||
|
closes.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
yield blob_upload
|
||||||
|
except Exception as ex:
|
||||||
|
logger.exception('Exception when uploading blob `%s`', blob_upload.blob_upload_id)
|
||||||
|
raise ex
|
||||||
|
finally:
|
||||||
|
# Cancel the upload if something went wrong or it was not commit to a blob.
|
||||||
|
if blob_upload.committed_blob is None:
|
||||||
|
blob_upload.cancel_upload()
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def upload_blob(repository_ref, storage, settings, extra_blob_stream_handlers=None):
|
def upload_blob(repository_ref, storage, settings, extra_blob_stream_handlers=None):
|
||||||
|
@ -120,7 +134,7 @@ class _BlobUploadManager(object):
|
||||||
|
|
||||||
if start_offset > 0 and start_offset > self.blob_upload.byte_count:
|
if start_offset > 0 and start_offset > self.blob_upload.byte_count:
|
||||||
logger.error('start_offset provided greater than blob_upload.byte_count')
|
logger.error('start_offset provided greater than blob_upload.byte_count')
|
||||||
return None
|
raise BlobUploadException()
|
||||||
|
|
||||||
# Ensure that we won't go over the allowed maximum size for blobs.
|
# Ensure that we won't go over the allowed maximum size for blobs.
|
||||||
max_blob_size = bitmath.parse_string_unsafe(self.settings.maximum_blob_size)
|
max_blob_size = bitmath.parse_string_unsafe(self.settings.maximum_blob_size)
|
||||||
|
|
|
@ -266,7 +266,7 @@ class RegistryDataInterface(object):
|
||||||
Mounts the blob from another repository into the specified target repository, and adds an
|
Mounts the blob from another repository into the specified target repository, and adds an
|
||||||
expiration before that blob is automatically GCed. This function is useful during push
|
expiration before that blob is automatically GCed. This function is useful during push
|
||||||
operations if an existing blob from another repositroy is being pushed. Returns False if
|
operations if an existing blob from another repositroy is being pushed. Returns False if
|
||||||
the mounting fails.
|
the mounting fails. Note that this function does *not* check security for mounting the blob.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
|
|
|
@ -1,30 +1,24 @@
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
import time
|
|
||||||
|
|
||||||
from flask import url_for, request, redirect, Response, abort as flask_abort
|
from flask import url_for, request, redirect, Response, abort as flask_abort
|
||||||
|
|
||||||
import bitmath
|
from app import storage, app, get_app_url, metric_queue
|
||||||
import resumablehashlib
|
|
||||||
|
|
||||||
from app import storage, app, get_app_url, metric_queue, model_cache
|
|
||||||
from auth.registry_jwt_auth import process_registry_jwt_auth
|
from auth.registry_jwt_auth import process_registry_jwt_auth
|
||||||
from auth.permissions import ReadRepositoryPermission
|
from auth.permissions import ReadRepositoryPermission
|
||||||
from data import database
|
from data import database
|
||||||
from data.cache import cache_key
|
from data.registry_model import registry_model
|
||||||
|
from data.registry_model.blobuploader import (create_blob_upload, retrieve_blob_upload_manager,
|
||||||
|
complete_when_uploaded, BlobUploadSettings,
|
||||||
|
BlobUploadException, BlobTooLargeException)
|
||||||
from digest import digest_tools
|
from digest import digest_tools
|
||||||
from endpoints.decorators import anon_protect, parse_repository_name
|
from endpoints.decorators import anon_protect, parse_repository_name
|
||||||
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
|
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
|
||||||
from endpoints.v2.errors import (
|
from endpoints.v2.errors import (
|
||||||
BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported, NameUnknown, LayerTooLarge,
|
BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported, NameUnknown, LayerTooLarge,
|
||||||
InvalidRequest)
|
InvalidRequest)
|
||||||
from endpoints.v2.models_interface import Blob
|
|
||||||
from endpoints.v2.models_pre_oci import data_model as model
|
|
||||||
from util.cache import cache_control
|
from util.cache import cache_control
|
||||||
from util.names import parse_namespace_repository
|
from util.names import parse_namespace_repository
|
||||||
from util.registry.filelike import wrap_with_handler, StreamSlice
|
|
||||||
from util.registry.gzipstream import calculate_size_handler
|
|
||||||
from util.registry.torrent import PieceHasher
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -38,23 +32,6 @@ class _InvalidRangeHeader(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def _get_repository_blob(namespace_name, repo_name, digest):
|
|
||||||
""" Returns the blob with the given digest under the repository with the given
|
|
||||||
name. If one does not exist (or it is still uploading), returns None.
|
|
||||||
Automatically handles caching.
|
|
||||||
"""
|
|
||||||
def load_blob():
|
|
||||||
blob = model.get_blob_by_digest(namespace_name, repo_name, digest)
|
|
||||||
if blob is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
return blob._asdict()
|
|
||||||
|
|
||||||
blob_cache_key = cache_key.for_repository_blob(namespace_name, repo_name, digest)
|
|
||||||
blob_dict = model_cache.retrieve(blob_cache_key, load_blob)
|
|
||||||
return Blob(**blob_dict) if blob_dict is not None else None
|
|
||||||
|
|
||||||
|
|
||||||
@v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD'])
|
@v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD'])
|
||||||
@parse_repository_name()
|
@parse_repository_name()
|
||||||
@process_registry_jwt_auth(scopes=['pull'])
|
@process_registry_jwt_auth(scopes=['pull'])
|
||||||
|
@ -62,20 +39,24 @@ def _get_repository_blob(namespace_name, repo_name, digest):
|
||||||
@anon_protect
|
@anon_protect
|
||||||
@cache_control(max_age=31436000)
|
@cache_control(max_age=31436000)
|
||||||
def check_blob_exists(namespace_name, repo_name, digest):
|
def check_blob_exists(namespace_name, repo_name, digest):
|
||||||
|
repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
|
||||||
|
if repository_ref is None:
|
||||||
|
raise NameUnknown()
|
||||||
|
|
||||||
# Find the blob.
|
# Find the blob.
|
||||||
blob = _get_repository_blob(namespace_name, repo_name, digest)
|
blob = registry_model.get_repo_blob_by_digest(repository_ref, digest, include_placements=True)
|
||||||
if blob is None:
|
if blob is None:
|
||||||
raise BlobUnknown()
|
raise BlobUnknown()
|
||||||
|
|
||||||
# Build the response headers.
|
# Build the response headers.
|
||||||
headers = {
|
headers = {
|
||||||
'Docker-Content-Digest': digest,
|
'Docker-Content-Digest': digest,
|
||||||
'Content-Length': blob.size,
|
'Content-Length': blob.compressed_size,
|
||||||
'Content-Type': BLOB_CONTENT_TYPE,
|
'Content-Type': BLOB_CONTENT_TYPE,
|
||||||
}
|
}
|
||||||
|
|
||||||
# If our storage supports range requests, let the client know.
|
# If our storage supports range requests, let the client know.
|
||||||
if storage.get_supports_resumable_downloads(blob.locations):
|
if storage.get_supports_resumable_downloads(blob.placements):
|
||||||
headers['Accept-Ranges'] = 'bytes'
|
headers['Accept-Ranges'] = 'bytes'
|
||||||
|
|
||||||
# Write the response to the client.
|
# Write the response to the client.
|
||||||
|
@ -89,8 +70,12 @@ def check_blob_exists(namespace_name, repo_name, digest):
|
||||||
@anon_protect
|
@anon_protect
|
||||||
@cache_control(max_age=31536000)
|
@cache_control(max_age=31536000)
|
||||||
def download_blob(namespace_name, repo_name, digest):
|
def download_blob(namespace_name, repo_name, digest):
|
||||||
|
repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
|
||||||
|
if repository_ref is None:
|
||||||
|
raise NameUnknown()
|
||||||
|
|
||||||
# Find the blob.
|
# Find the blob.
|
||||||
blob = _get_repository_blob(namespace_name, repo_name, digest)
|
blob = registry_model.get_repo_blob_by_digest(repository_ref, digest, include_placements=True)
|
||||||
if blob is None:
|
if blob is None:
|
||||||
raise BlobUnknown()
|
raise BlobUnknown()
|
||||||
|
|
||||||
|
@ -98,15 +83,13 @@ def download_blob(namespace_name, repo_name, digest):
|
||||||
headers = {'Docker-Content-Digest': digest}
|
headers = {'Docker-Content-Digest': digest}
|
||||||
|
|
||||||
# If our storage supports range requests, let the client know.
|
# If our storage supports range requests, let the client know.
|
||||||
if storage.get_supports_resumable_downloads(blob.locations):
|
if storage.get_supports_resumable_downloads(blob.placements):
|
||||||
headers['Accept-Ranges'] = 'bytes'
|
headers['Accept-Ranges'] = 'bytes'
|
||||||
|
|
||||||
# Find the storage path for the blob.
|
|
||||||
path = model.get_blob_path(blob)
|
|
||||||
|
|
||||||
# Short-circuit by redirecting if the storage supports it.
|
# Short-circuit by redirecting if the storage supports it.
|
||||||
|
path = blob.storage_path
|
||||||
logger.debug('Looking up the direct download URL for path: %s', path)
|
logger.debug('Looking up the direct download URL for path: %s', path)
|
||||||
direct_download_url = storage.get_direct_download_url(blob.locations, path, request.remote_addr)
|
direct_download_url = storage.get_direct_download_url(blob.placements, path, request.remote_addr)
|
||||||
if direct_download_url:
|
if direct_download_url:
|
||||||
logger.debug('Returning direct download URL')
|
logger.debug('Returning direct download URL')
|
||||||
resp = redirect(direct_download_url)
|
resp = redirect(direct_download_url)
|
||||||
|
@ -118,63 +101,77 @@ def download_blob(namespace_name, repo_name, digest):
|
||||||
with database.CloseForLongOperation(app.config):
|
with database.CloseForLongOperation(app.config):
|
||||||
# Stream the response to the client.
|
# Stream the response to the client.
|
||||||
return Response(
|
return Response(
|
||||||
storage.stream_read(blob.locations, path),
|
storage.stream_read(blob.placements, path),
|
||||||
headers=headers.update({
|
headers=headers.update({
|
||||||
'Content-Length': blob.size,
|
'Content-Length': blob.compressed_size,
|
||||||
'Content-Type': BLOB_CONTENT_TYPE,}),)
|
'Content-Type': BLOB_CONTENT_TYPE,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _try_to_mount_blob(namespace_name, repo_name, mount_blob_digest):
|
def _try_to_mount_blob(repository_ref, mount_blob_digest):
|
||||||
""" Attempts to mount a blob requested by the user from another repository. """
|
""" Attempts to mount a blob requested by the user from another repository. """
|
||||||
logger.debug('Got mount request for blob `%s` into `%s/%s`', mount_blob_digest, namespace_name,
|
logger.debug('Got mount request for blob `%s` into `%s`', mount_blob_digest, repository_ref)
|
||||||
repo_name)
|
|
||||||
from_repo = request.args.get('from', None)
|
from_repo = request.args.get('from', None)
|
||||||
if from_repo is None:
|
if from_repo is None:
|
||||||
raise InvalidRequest
|
raise InvalidRequest
|
||||||
|
|
||||||
# Ensure the user has access to the repository.
|
# Ensure the user has access to the repository.
|
||||||
logger.debug('Got mount request for blob `%s` under repository `%s` into `%s/%s`',
|
logger.debug('Got mount request for blob `%s` under repository `%s` into `%s`',
|
||||||
mount_blob_digest, from_repo, namespace_name, repo_name)
|
mount_blob_digest, from_repo, repository_ref)
|
||||||
from_namespace, from_repo_name = parse_namespace_repository(from_repo,
|
from_namespace, from_repo_name = parse_namespace_repository(from_repo,
|
||||||
app.config['LIBRARY_NAMESPACE'],
|
app.config['LIBRARY_NAMESPACE'],
|
||||||
include_tag=False)
|
include_tag=False)
|
||||||
|
|
||||||
# First check permission. This does not hit the DB so we do it first.
|
from_repository_ref = registry_model.lookup_repository(from_namespace, from_repo_name)
|
||||||
|
if from_repository_ref is None:
|
||||||
|
logger.debug('Could not find from repo: `%s/%s`', from_namespace, from_repo_name)
|
||||||
|
return None
|
||||||
|
|
||||||
|
# First check permission.
|
||||||
read_permission = ReadRepositoryPermission(from_namespace, from_repo_name).can()
|
read_permission = ReadRepositoryPermission(from_namespace, from_repo_name).can()
|
||||||
if not read_permission:
|
if not read_permission:
|
||||||
# If no direct permission, check if the repostory is public.
|
# If no direct permission, check if the repostory is public.
|
||||||
if not model.is_repository_public(from_namespace, from_repo_name):
|
if not from_repository_ref.is_public:
|
||||||
logger.debug('No permission to mount blob `%s` under repository `%s` into `%s/%s`',
|
logger.debug('No permission to mount blob `%s` under repository `%s` into `%s`',
|
||||||
mount_blob_digest, from_repo, namespace_name, repo_name)
|
mount_blob_digest, from_repo, repository_ref)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Lookup if the mount blob's digest exists in the repository.
|
# Lookup if the mount blob's digest exists in the repository.
|
||||||
mount_blob = model.get_blob_by_digest(from_namespace, from_repo_name, mount_blob_digest)
|
mount_blob = registry_model.get_repo_blob_by_digest(from_repository_ref, mount_blob_digest)
|
||||||
if mount_blob is None:
|
if mount_blob is None:
|
||||||
logger.debug('Blob `%s` under repository `%s` not found', mount_blob_digest, from_repo)
|
logger.debug('Blob `%s` under repository `%s` not found', mount_blob_digest, from_repo)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
logger.debug('Mounting blob `%s` under repository `%s` into `%s/%s`', mount_blob_digest,
|
logger.debug('Mounting blob `%s` under repository `%s` into `%s`', mount_blob_digest,
|
||||||
from_repo, namespace_name, repo_name)
|
from_repo, repository_ref)
|
||||||
|
|
||||||
# Mount the blob into the current repository and return that we've completed the operation.
|
# Mount the blob into the current repository and return that we've completed the operation.
|
||||||
expiration_sec = app.config['PUSH_TEMP_TAG_EXPIRATION_SEC']
|
expiration_sec = app.config['PUSH_TEMP_TAG_EXPIRATION_SEC']
|
||||||
if not model.mount_blob_and_temp_tag(namespace_name, repo_name, mount_blob, expiration_sec):
|
mounted = registry_model.mount_blob_into_repository(mount_blob, repository_ref, expiration_sec)
|
||||||
|
if not mounted:
|
||||||
logger.debug('Could not mount blob `%s` under repository `%s` not found', mount_blob_digest,
|
logger.debug('Could not mount blob `%s` under repository `%s` not found', mount_blob_digest,
|
||||||
from_repo)
|
from_repo)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Return the response for the blob indicating that it was mounted, and including its content
|
# Return the response for the blob indicating that it was mounted, and including its content
|
||||||
# digest.
|
# digest.
|
||||||
logger.debug('Mounted blob `%s` under repository `%s` into `%s/%s`', mount_blob_digest,
|
logger.debug('Mounted blob `%s` under repository `%s` into `%s`', mount_blob_digest,
|
||||||
from_repo, namespace_name, repo_name)
|
from_repo, repository_ref)
|
||||||
|
|
||||||
|
namespace_name = repository_ref.namespace_name
|
||||||
|
repo_name = repository_ref.name
|
||||||
|
|
||||||
return Response(
|
return Response(
|
||||||
status=201,
|
status=201,
|
||||||
headers={
|
headers={
|
||||||
'Docker-Content-Digest': mount_blob_digest,
|
'Docker-Content-Digest': mount_blob_digest,
|
||||||
'Location':
|
'Location':
|
||||||
get_app_url() + url_for('v2.download_blob', repository='%s/%s' %
|
get_app_url() + url_for('v2.download_blob',
|
||||||
(namespace_name, repo_name), digest=mount_blob_digest),},)
|
repository='%s/%s' % (namespace_name, repo_name),
|
||||||
|
digest=mount_blob_digest),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@v2_bp.route('/<repopath:repository>/blobs/uploads/', methods=['POST'])
|
@v2_bp.route('/<repopath:repository>/blobs/uploads/', methods=['POST'])
|
||||||
|
@ -183,63 +180,56 @@ def _try_to_mount_blob(namespace_name, repo_name, mount_blob_digest):
|
||||||
@require_repo_write
|
@require_repo_write
|
||||||
@anon_protect
|
@anon_protect
|
||||||
def start_blob_upload(namespace_name, repo_name):
|
def start_blob_upload(namespace_name, repo_name):
|
||||||
# Begin the blob upload process in the database and storage.
|
repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
|
||||||
location_name = storage.preferred_locations[0]
|
if repository_ref is None:
|
||||||
new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name)
|
|
||||||
repository_exists = model.create_blob_upload(namespace_name, repo_name, new_upload_uuid,
|
|
||||||
location_name, upload_metadata)
|
|
||||||
if not repository_exists:
|
|
||||||
raise NameUnknown()
|
raise NameUnknown()
|
||||||
|
|
||||||
# Check for mounting of a blob from another repository.
|
# Check for mounting of a blob from another repository.
|
||||||
mount_blob_digest = request.args.get('mount', None)
|
mount_blob_digest = request.args.get('mount', None)
|
||||||
if mount_blob_digest is not None:
|
if mount_blob_digest is not None:
|
||||||
response = _try_to_mount_blob(namespace_name, repo_name, mount_blob_digest)
|
response = _try_to_mount_blob(repository_ref, mount_blob_digest)
|
||||||
if response is not None:
|
if response is not None:
|
||||||
return response
|
return response
|
||||||
|
|
||||||
# Check for a normal blob upload.
|
# Begin the blob upload process.
|
||||||
|
blob_uploader = create_blob_upload(repository_ref, storage, _upload_settings())
|
||||||
|
if blob_uploader is None:
|
||||||
|
logger.debug('Could not create a blob upload for `%s/%s`', namespace_name, repo_name)
|
||||||
|
raise InvalidRequest()
|
||||||
|
|
||||||
|
# Check if the blob will be uploaded now or in followup calls. If the `digest` is given, then
|
||||||
|
# the upload will occur as a monolithic chunk in this call. Ohterwise, we return a redirect
|
||||||
|
# for the client to upload the chunks as distinct operations.
|
||||||
digest = request.args.get('digest', None)
|
digest = request.args.get('digest', None)
|
||||||
if digest is None:
|
if digest is None:
|
||||||
# Short-circuit because the user will send the blob data in another request.
|
# Short-circuit because the user will send the blob data in another request.
|
||||||
return Response(
|
return Response(
|
||||||
status=202,
|
status=202,
|
||||||
headers={
|
headers={
|
||||||
'Docker-Upload-UUID':
|
'Docker-Upload-UUID': blob_uploader.blob_upload_id,
|
||||||
new_upload_uuid,
|
'Range': _render_range(0),
|
||||||
'Range':
|
|
||||||
_render_range(0),
|
|
||||||
'Location':
|
'Location':
|
||||||
get_app_url() + url_for('v2.upload_chunk', repository='%s/%s' %
|
get_app_url() + url_for('v2.upload_chunk',
|
||||||
(namespace_name, repo_name), upload_uuid=new_upload_uuid)},)
|
repository='%s/%s' % (namespace_name, repo_name),
|
||||||
|
upload_uuid=blob_uploader.blob_upload_id)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
# The user plans to send us the entire body right now.
|
# Upload the data sent and commit it to a blob.
|
||||||
# Find the upload.
|
with complete_when_uploaded(blob_uploader):
|
||||||
blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, new_upload_uuid)
|
_upload_chunk(blob_uploader, digest)
|
||||||
if blob_upload is None:
|
|
||||||
raise BlobUploadUnknown()
|
|
||||||
|
|
||||||
# Upload the chunk to storage while calculating some metadata and updating
|
|
||||||
# the upload state.
|
|
||||||
updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range'))
|
|
||||||
if updated_blob_upload is None:
|
|
||||||
_abort_range_not_satisfiable(blob_upload.byte_count, new_upload_uuid)
|
|
||||||
|
|
||||||
# Save the upload state to the database.
|
|
||||||
model.update_blob_upload(updated_blob_upload)
|
|
||||||
|
|
||||||
# Finalize the upload process in the database and storage.
|
|
||||||
_finish_upload(namespace_name, repo_name, updated_blob_upload, digest)
|
|
||||||
|
|
||||||
# Write the response to the client.
|
# Write the response to the client.
|
||||||
return Response(
|
return Response(
|
||||||
status=201,
|
status=201,
|
||||||
headers={
|
headers={
|
||||||
'Docker-Content-Digest':
|
'Docker-Content-Digest': digest,
|
||||||
digest,
|
|
||||||
'Location':
|
'Location':
|
||||||
get_app_url() + url_for('v2.download_blob', repository='%s/%s' %
|
get_app_url() + url_for('v2.download_blob',
|
||||||
(namespace_name, repo_name), digest=digest),},)
|
repository='%s/%s' % (namespace_name, repo_name),
|
||||||
|
digest=digest),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['GET'])
|
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['GET'])
|
||||||
|
@ -248,16 +238,21 @@ def start_blob_upload(namespace_name, repo_name):
|
||||||
@require_repo_write
|
@require_repo_write
|
||||||
@anon_protect
|
@anon_protect
|
||||||
def fetch_existing_upload(namespace_name, repo_name, upload_uuid):
|
def fetch_existing_upload(namespace_name, repo_name, upload_uuid):
|
||||||
blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid)
|
repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
|
||||||
if blob_upload is None:
|
if repository_ref is None:
|
||||||
|
raise NameUnknown()
|
||||||
|
|
||||||
|
uploader = retrieve_blob_upload_manager(repository_ref, upload_uuid, storage, _upload_settings())
|
||||||
|
if uploader is None:
|
||||||
raise BlobUploadUnknown()
|
raise BlobUploadUnknown()
|
||||||
|
|
||||||
return Response(
|
return Response(
|
||||||
status=204,
|
status=204,
|
||||||
headers={
|
headers={
|
||||||
'Docker-Upload-UUID': upload_uuid,
|
'Docker-Upload-UUID': upload_uuid,
|
||||||
'Range': _render_range(blob_upload.byte_count + 1), # byte ranges are exclusive
|
'Range': _render_range(uploader.blob_upload.byte_count + 1), # byte ranges are exclusive
|
||||||
},)
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['PATCH'])
|
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['PATCH'])
|
||||||
|
@ -266,27 +261,26 @@ def fetch_existing_upload(namespace_name, repo_name, upload_uuid):
|
||||||
@require_repo_write
|
@require_repo_write
|
||||||
@anon_protect
|
@anon_protect
|
||||||
def upload_chunk(namespace_name, repo_name, upload_uuid):
|
def upload_chunk(namespace_name, repo_name, upload_uuid):
|
||||||
# Find the upload.
|
repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
|
||||||
blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid)
|
if repository_ref is None:
|
||||||
if blob_upload is None:
|
raise NameUnknown()
|
||||||
|
|
||||||
|
uploader = retrieve_blob_upload_manager(repository_ref, upload_uuid, storage, _upload_settings())
|
||||||
|
if uploader is None:
|
||||||
raise BlobUploadUnknown()
|
raise BlobUploadUnknown()
|
||||||
|
|
||||||
# Upload the chunk to storage while calculating some metadata and updating
|
# Upload the chunk for the blob.
|
||||||
# the upload state.
|
_upload_chunk(uploader)
|
||||||
updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range'))
|
|
||||||
if updated_blob_upload is None:
|
|
||||||
_abort_range_not_satisfiable(blob_upload.byte_count, upload_uuid)
|
|
||||||
|
|
||||||
# Save the upload state to the database.
|
|
||||||
model.update_blob_upload(updated_blob_upload)
|
|
||||||
|
|
||||||
# Write the response to the client.
|
# Write the response to the client.
|
||||||
return Response(
|
return Response(
|
||||||
status=204,
|
status=204,
|
||||||
headers={
|
headers={
|
||||||
'Location': _current_request_url(),
|
'Location': _current_request_url(),
|
||||||
'Range': _render_range(updated_blob_upload.byte_count, with_bytes_prefix=False),
|
'Range': _render_range(uploader.blob_upload.byte_count, with_bytes_prefix=False),
|
||||||
'Docker-Upload-UUID': upload_uuid,},)
|
'Docker-Upload-UUID': upload_uuid,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['PUT'])
|
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['PUT'])
|
||||||
|
@ -301,26 +295,27 @@ def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid):
|
||||||
raise BlobUploadInvalid(detail={'reason': 'Missing digest arg on monolithic upload'})
|
raise BlobUploadInvalid(detail={'reason': 'Missing digest arg on monolithic upload'})
|
||||||
|
|
||||||
# Find the upload.
|
# Find the upload.
|
||||||
blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid)
|
repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
|
||||||
if blob_upload is None:
|
if repository_ref is None:
|
||||||
|
raise NameUnknown()
|
||||||
|
|
||||||
|
uploader = retrieve_blob_upload_manager(repository_ref, upload_uuid, storage, _upload_settings())
|
||||||
|
if uploader is None:
|
||||||
raise BlobUploadUnknown()
|
raise BlobUploadUnknown()
|
||||||
|
|
||||||
# Upload the chunk to storage while calculating some metadata and updating
|
# Upload the chunk for the blob and commit it once complete.
|
||||||
# the upload state.
|
with complete_when_uploaded(uploader):
|
||||||
updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range'))
|
_upload_chunk(uploader, digest)
|
||||||
if updated_blob_upload is None:
|
|
||||||
_abort_range_not_satisfiable(blob_upload.byte_count, upload_uuid)
|
|
||||||
|
|
||||||
# Finalize the upload process in the database and storage.
|
|
||||||
_finish_upload(namespace_name, repo_name, updated_blob_upload, digest)
|
|
||||||
|
|
||||||
# Write the response to the client.
|
# Write the response to the client.
|
||||||
return Response(status=201, headers={
|
return Response(status=201, headers={
|
||||||
'Docker-Content-Digest':
|
'Docker-Content-Digest': digest,
|
||||||
digest,
|
|
||||||
'Location':
|
'Location':
|
||||||
get_app_url() + url_for('v2.download_blob', repository='%s/%s' %
|
get_app_url() + url_for('v2.download_blob',
|
||||||
(namespace_name, repo_name), digest=digest),})
|
repository='%s/%s' % (namespace_name, repo_name),
|
||||||
|
digest=digest),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['DELETE'])
|
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['DELETE'])
|
||||||
|
@ -329,16 +324,15 @@ def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid):
|
||||||
@require_repo_write
|
@require_repo_write
|
||||||
@anon_protect
|
@anon_protect
|
||||||
def cancel_upload(namespace_name, repo_name, upload_uuid):
|
def cancel_upload(namespace_name, repo_name, upload_uuid):
|
||||||
blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid)
|
repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
|
||||||
if blob_upload is None:
|
if repository_ref is None:
|
||||||
|
raise NameUnknown()
|
||||||
|
|
||||||
|
uploader = retrieve_blob_upload_manager(repository_ref, upload_uuid, storage, _upload_settings())
|
||||||
|
if uploader is None:
|
||||||
raise BlobUploadUnknown()
|
raise BlobUploadUnknown()
|
||||||
|
|
||||||
# We delete the record for the upload first, since if the partial upload in
|
uploader.cancel_upload()
|
||||||
# storage fails to delete, it doesn't break anything.
|
|
||||||
model.delete_blob_upload(namespace_name, repo_name, upload_uuid)
|
|
||||||
storage.cancel_chunked_upload({blob_upload.location_name}, blob_upload.uuid,
|
|
||||||
blob_upload.storage_metadata)
|
|
||||||
|
|
||||||
return Response(status=204)
|
return Response(status=204)
|
||||||
|
|
||||||
|
|
||||||
|
@ -413,182 +407,36 @@ def _start_offset_and_length(range_header):
|
||||||
return start_offset, length
|
return start_offset, length
|
||||||
|
|
||||||
|
|
||||||
def _upload_chunk(blob_upload, range_header):
|
def _upload_settings():
|
||||||
|
""" Returns the settings for instantiating a blob upload manager. """
|
||||||
|
expiration_sec = app.config['PUSH_TEMP_TAG_EXPIRATION_SEC']
|
||||||
|
settings = BlobUploadSettings(maximum_blob_size=app.config['MAXIMUM_LAYER_SIZE'],
|
||||||
|
bittorrent_piece_size=app.config['BITTORRENT_PIECE_SIZE'],
|
||||||
|
committed_blob_expiration=expiration_sec)
|
||||||
|
return settings
|
||||||
|
|
||||||
|
|
||||||
|
def _upload_chunk(blob_uploader, commit_digest=None):
|
||||||
|
""" Performs uploading of a chunk of data in the current request's stream, via the blob uploader
|
||||||
|
given. If commit_digest is specified, the upload is committed to a blob once the stream's
|
||||||
|
data has been read and stored.
|
||||||
"""
|
"""
|
||||||
Calculates metadata while uploading a chunk to storage.
|
start_offset, length = _start_offset_and_length(request.headers.get('range'))
|
||||||
|
if None in {start_offset, length}:
|
||||||
|
raise InvalidRequest()
|
||||||
|
|
||||||
Returns a BlobUpload object or None if there was a failure.
|
input_fp = get_input_stream(request)
|
||||||
"""
|
|
||||||
max_layer_size = bitmath.parse_string_unsafe(app.config['MAXIMUM_LAYER_SIZE'])
|
|
||||||
|
|
||||||
# Get the offset and length of the current chunk.
|
try:
|
||||||
start_offset, length = _start_offset_and_length(range_header)
|
# Upload the data received.
|
||||||
if blob_upload is None or None in {start_offset, length}:
|
blob_uploader.upload_chunk(app.config, input_fp, start_offset, length, metric_queue)
|
||||||
logger.error('Invalid arguments provided to _upload_chunk')
|
|
||||||
return None
|
|
||||||
|
|
||||||
if start_offset > 0 and start_offset > blob_upload.byte_count:
|
if commit_digest is not None:
|
||||||
logger.error('start_offset provided to _upload_chunk greater than blob.upload.byte_count')
|
# Commit the upload to a blob.
|
||||||
return None
|
return blob_uploader.commit_to_blob(app.config, commit_digest)
|
||||||
|
except BlobTooLargeException as ble:
|
||||||
# Check if we should raise 413 before accepting the data.
|
raise LayerTooLarge(uploaded=ble.uploaded, max_allowed=ble.max_allowed)
|
||||||
uploaded = bitmath.Byte(length + start_offset)
|
except BlobUploadException:
|
||||||
if length > -1 and uploaded > max_layer_size:
|
logger.exception('Exception when uploading blob to %s', blob_uploader.blob_upload_id)
|
||||||
raise LayerTooLarge(uploaded=uploaded.bytes, max_allowed=max_layer_size.bytes)
|
_abort_range_not_satisfiable(blob_uploader.blob_upload.byte_count,
|
||||||
|
blob_uploader.blob_upload_id)
|
||||||
location_set = {blob_upload.location_name}
|
|
||||||
|
|
||||||
upload_error = None
|
|
||||||
with database.CloseForLongOperation(app.config):
|
|
||||||
input_fp = get_input_stream(request)
|
|
||||||
|
|
||||||
if start_offset > 0 and start_offset < blob_upload.byte_count:
|
|
||||||
# Skip the bytes which were received on a previous push, which are already stored and
|
|
||||||
# included in the sha calculation
|
|
||||||
overlap_size = blob_upload.byte_count - start_offset
|
|
||||||
input_fp = StreamSlice(input_fp, overlap_size)
|
|
||||||
|
|
||||||
# Update our upload bounds to reflect the skipped portion of the overlap
|
|
||||||
start_offset = blob_upload.byte_count
|
|
||||||
length = max(length - overlap_size, 0)
|
|
||||||
|
|
||||||
# We use this to escape early in case we have already processed all of the bytes the user
|
|
||||||
# wants to upload
|
|
||||||
if length == 0:
|
|
||||||
return blob_upload
|
|
||||||
|
|
||||||
input_fp = wrap_with_handler(input_fp, blob_upload.sha_state.update)
|
|
||||||
|
|
||||||
# Add a hasher for calculating SHA1s for torrents if this is the first chunk and/or we have
|
|
||||||
# already calculated hash data for the previous chunk(s).
|
|
||||||
piece_hasher = None
|
|
||||||
if blob_upload.chunk_count == 0 or blob_upload.piece_sha_state:
|
|
||||||
initial_sha1_value = blob_upload.piece_sha_state or resumablehashlib.sha1()
|
|
||||||
initial_sha1_pieces_value = blob_upload.piece_hashes or ''
|
|
||||||
|
|
||||||
piece_hasher = PieceHasher(app.config['BITTORRENT_PIECE_SIZE'], start_offset,
|
|
||||||
initial_sha1_pieces_value, initial_sha1_value)
|
|
||||||
|
|
||||||
input_fp = wrap_with_handler(input_fp, piece_hasher.update)
|
|
||||||
|
|
||||||
# If this is the first chunk and we're starting at the 0 offset, add a handler to gunzip the
|
|
||||||
# stream so we can determine the uncompressed size. We'll throw out this data if another chunk
|
|
||||||
# comes in, but in the common case the docker client only sends one chunk.
|
|
||||||
size_info = None
|
|
||||||
if start_offset == 0 and blob_upload.chunk_count == 0:
|
|
||||||
size_info, fn = calculate_size_handler()
|
|
||||||
input_fp = wrap_with_handler(input_fp, fn)
|
|
||||||
|
|
||||||
start_time = time.time()
|
|
||||||
length_written, new_metadata, upload_error = storage.stream_upload_chunk(
|
|
||||||
location_set,
|
|
||||||
blob_upload.uuid,
|
|
||||||
start_offset,
|
|
||||||
length,
|
|
||||||
input_fp,
|
|
||||||
blob_upload.storage_metadata,
|
|
||||||
content_type=BLOB_CONTENT_TYPE,)
|
|
||||||
|
|
||||||
if upload_error is not None:
|
|
||||||
logger.error('storage.stream_upload_chunk returned error %s', upload_error)
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Update the chunk upload time metric.
|
|
||||||
metric_queue.chunk_upload_time.Observe(time.time() - start_time, labelvalues=[
|
|
||||||
length_written, list(location_set)[0]])
|
|
||||||
|
|
||||||
# If we determined an uncompressed size and this is the first chunk, add it to the blob.
|
|
||||||
# Otherwise, we clear the size from the blob as it was uploaded in multiple chunks.
|
|
||||||
if size_info is not None and blob_upload.chunk_count == 0 and size_info.is_valid:
|
|
||||||
blob_upload.uncompressed_byte_count = size_info.uncompressed_size
|
|
||||||
elif length_written > 0:
|
|
||||||
# Otherwise, if we wrote some bytes and the above conditions were not met, then we don't
|
|
||||||
# know the uncompressed size.
|
|
||||||
blob_upload.uncompressed_byte_count = None
|
|
||||||
|
|
||||||
if piece_hasher is not None:
|
|
||||||
blob_upload.piece_hashes = piece_hasher.piece_hashes
|
|
||||||
blob_upload.piece_sha_state = piece_hasher.hash_fragment
|
|
||||||
|
|
||||||
blob_upload.storage_metadata = new_metadata
|
|
||||||
blob_upload.byte_count += length_written
|
|
||||||
blob_upload.chunk_count += 1
|
|
||||||
|
|
||||||
# Ensure we have not gone beyond the max layer size.
|
|
||||||
upload_size = bitmath.Byte(blob_upload.byte_count)
|
|
||||||
if upload_size > max_layer_size:
|
|
||||||
raise LayerTooLarge(uploaded=upload_size.bytes, max_allowed=max_layer_size.bytes)
|
|
||||||
|
|
||||||
return blob_upload
|
|
||||||
|
|
||||||
|
|
||||||
def _validate_digest(blob_upload, expected_digest):
|
|
||||||
"""
|
|
||||||
Verifies that the digest's SHA matches that of the uploaded data.
|
|
||||||
"""
|
|
||||||
computed_digest = digest_tools.sha256_digest_from_hashlib(blob_upload.sha_state)
|
|
||||||
if not digest_tools.digests_equal(computed_digest, expected_digest):
|
|
||||||
logger.error('Digest mismatch for upload %s: Expected digest %s, found digest %s',
|
|
||||||
blob_upload.uuid, expected_digest, computed_digest)
|
|
||||||
raise BlobUploadInvalid(detail={'reason': 'Digest mismatch on uploaded blob'})
|
|
||||||
|
|
||||||
|
|
||||||
def _finalize_blob_storage(blob_upload, expected_digest):
|
|
||||||
"""
|
|
||||||
When an upload is successful, this ends the uploading process from the
|
|
||||||
storage's perspective.
|
|
||||||
|
|
||||||
Returns True if the blob already existed.
|
|
||||||
"""
|
|
||||||
final_blob_location = digest_tools.content_path(expected_digest)
|
|
||||||
|
|
||||||
# Move the storage into place, or if this was a re-upload, cancel it
|
|
||||||
with database.CloseForLongOperation(app.config):
|
|
||||||
already_existed = storage.exists({blob_upload.location_name}, final_blob_location)
|
|
||||||
if already_existed:
|
|
||||||
# It already existed, clean up our upload which served as proof that the
|
|
||||||
# uploader had the blob.
|
|
||||||
storage.cancel_chunked_upload({blob_upload.location_name}, blob_upload.uuid,
|
|
||||||
blob_upload.storage_metadata)
|
|
||||||
|
|
||||||
else:
|
|
||||||
# We were the first ones to upload this image (at least to this location)
|
|
||||||
# Let's copy it into place
|
|
||||||
storage.complete_chunked_upload({blob_upload.location_name}, blob_upload.uuid,
|
|
||||||
final_blob_location, blob_upload.storage_metadata)
|
|
||||||
return already_existed
|
|
||||||
|
|
||||||
|
|
||||||
def _finalize_blob_database(namespace_name, repo_name, blob_upload, digest, already_existed):
|
|
||||||
"""
|
|
||||||
When an upload is successful, this ends the uploading process from the
|
|
||||||
database's perspective.
|
|
||||||
"""
|
|
||||||
# Create the blob and temporarily tag it.
|
|
||||||
blob_storage = model.create_blob_and_temp_tag(
|
|
||||||
namespace_name,
|
|
||||||
repo_name,
|
|
||||||
digest,
|
|
||||||
blob_upload,
|
|
||||||
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'],)
|
|
||||||
|
|
||||||
# If it doesn't already exist, create the BitTorrent pieces for the blob.
|
|
||||||
if blob_upload.piece_sha_state is not None and not already_existed:
|
|
||||||
piece_bytes = blob_upload.piece_hashes + blob_upload.piece_sha_state.digest()
|
|
||||||
model.save_bittorrent_pieces(blob_storage, app.config['BITTORRENT_PIECE_SIZE'], piece_bytes)
|
|
||||||
|
|
||||||
# Delete the blob upload.
|
|
||||||
model.delete_blob_upload(namespace_name, repo_name, blob_upload.uuid)
|
|
||||||
|
|
||||||
|
|
||||||
def _finish_upload(namespace_name, repo_name, blob_upload, digest):
|
|
||||||
"""
|
|
||||||
When an upload is successful, this ends the uploading process.
|
|
||||||
"""
|
|
||||||
_validate_digest(blob_upload, digest)
|
|
||||||
_finalize_blob_database(
|
|
||||||
namespace_name,
|
|
||||||
repo_name,
|
|
||||||
blob_upload,
|
|
||||||
digest,
|
|
||||||
_finalize_blob_storage(blob_upload, digest),)
|
|
||||||
|
|
Reference in a new issue