This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/v2/blob.py
2016-07-20 17:53:43 -04:00

396 lines
15 KiB
Python

import logging
import re
from flask import make_response, url_for, request, redirect, Response, abort as flask_abort
import resumablehashlib
from app import storage, app
from auth.registry_jwt_auth import process_registry_jwt_auth
from data import model, database
from digest import digest_tools
from endpoints.common import parse_repository_name
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
from endpoints.v2.errors import (BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported,
NameUnknown)
from endpoints.decorators import anon_protect
from util.cache import cache_control
from util.registry.filelike import wrap_with_handler, StreamSlice
from util.registry.gzipstream import calculate_size_handler
from util.registry.torrent import PieceHasher
logger = logging.getLogger(__name__)
BASE_BLOB_ROUTE = '/<repopath:repository>/blobs/<regex("{0}"):digest>'
BLOB_DIGEST_ROUTE = BASE_BLOB_ROUTE.format(digest_tools.DIGEST_PATTERN)
RANGE_HEADER_REGEX = re.compile(r'^bytes=([0-9]+)-([0-9]+)$')
BLOB_CONTENT_TYPE = 'application/octet-stream'
class _InvalidRangeHeader(Exception):
pass
def _base_blob_fetch(namespace_name, repo_name, digest):
""" Some work that is common to both GET and HEAD requests. Callers MUST check for proper
authorization before calling this method.
"""
try:
found = model.blob.get_repo_blob_by_digest(namespace_name, repo_name, digest)
except model.BlobDoesNotExist:
raise BlobUnknown()
headers = {
'Docker-Content-Digest': digest,
}
# Add the Accept-Ranges header if the storage engine supports resumable
# downloads.
if storage.get_supports_resumable_downloads(found.locations):
logger.debug('Storage supports resumable downloads')
headers['Accept-Ranges'] = 'bytes'
return found, headers
@v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD'])
@parse_repository_name()
@process_registry_jwt_auth(scopes=['pull'])
@require_repo_read
@anon_protect
@cache_control(max_age=31436000)
def check_blob_exists(namespace_name, repo_name, digest):
found, headers = _base_blob_fetch(namespace_name, repo_name, digest)
response = make_response('')
response.headers.extend(headers)
response.headers['Content-Length'] = found.image_size
response.headers['Content-Type'] = BLOB_CONTENT_TYPE
return response
@v2_bp.route(BLOB_DIGEST_ROUTE, methods=['GET'])
@parse_repository_name()
@process_registry_jwt_auth(scopes=['pull'])
@require_repo_read
@anon_protect
@cache_control(max_age=31536000)
def download_blob(namespace_name, repo_name, digest):
found, headers = _base_blob_fetch(namespace_name, repo_name, digest)
path = model.storage.get_layer_path(found)
logger.debug('Looking up the direct download URL for path: %s', path)
direct_download_url = storage.get_direct_download_url(found.locations, path)
if direct_download_url:
logger.debug('Returning direct download URL')
resp = redirect(direct_download_url)
resp.headers.extend(headers)
return resp
logger.debug('Streaming layer data')
# Close the database handle here for this process before we send the long download.
database.close_db_filter(None)
headers['Content-Length'] = found.image_size
headers['Content-Type'] = BLOB_CONTENT_TYPE
return Response(storage.stream_read(found.locations, path), headers=headers)
def _render_range(num_uploaded_bytes, with_bytes_prefix=True):
return '{0}0-{1}'.format('bytes=' if with_bytes_prefix else '', num_uploaded_bytes - 1)
@v2_bp.route('/<repopath:repository>/blobs/uploads/', methods=['POST'])
@parse_repository_name()
@process_registry_jwt_auth(scopes=['pull', 'push'])
@require_repo_write
@anon_protect
def start_blob_upload(namespace_name, repo_name):
location_name = storage.preferred_locations[0]
new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name)
try:
model.blob.initiate_upload(namespace_name, repo_name, new_upload_uuid, location_name,
upload_metadata)
except database.Repository.DoesNotExist:
raise NameUnknown()
digest = request.args.get('digest', None)
if digest is None:
# The user will send the blob data in another request
accepted = make_response('', 202)
accepted.headers['Location'] = url_for('v2.upload_chunk',
repository='%s/%s' % (namespace_name, repo_name),
upload_uuid=new_upload_uuid)
accepted.headers['Range'] = _render_range(0)
accepted.headers['Docker-Upload-UUID'] = new_upload_uuid
return accepted
else:
# The user plans to send us the entire body right now
blob_upload, upload_error = _upload_chunk(namespace_name, repo_name, new_upload_uuid)
blob_upload.save()
if upload_error:
logger.error('Got error when uploading chunk for blob %s under repository %s/%s: %s',
namespace_name, repo_name, new_upload_uuid, upload_error)
_range_not_satisfiable(blob_upload.byte_count)
return _finish_upload(namespace_name, repo_name, blob_upload, digest)
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['GET'])
@parse_repository_name()
@process_registry_jwt_auth(scopes=['pull'])
@require_repo_write
@anon_protect
def fetch_existing_upload(namespace_name, repo_name, upload_uuid):
try:
found = model.blob.get_blob_upload(namespace_name, repo_name, upload_uuid)
except model.InvalidBlobUpload:
raise BlobUploadUnknown()
# Note: Docker byte ranges are exclusive so we have to add one to the byte count.
accepted = make_response('', 204)
accepted.headers['Range'] = _render_range(found.byte_count + 1)
accepted.headers['Docker-Upload-UUID'] = upload_uuid
return accepted
def _current_request_path():
return '{0}{1}'.format(request.script_root, request.path)
def _range_not_satisfiable(valid_end):
invalid_range = make_response('', 416)
invalid_range.headers['Location'] = _current_request_path()
invalid_range.headers['Range'] = '0-{0}'.format(valid_end)
invalid_range.headers['Docker-Upload-UUID'] = request.view_args['upload_uuid']
flask_abort(invalid_range)
def _parse_range_header(range_header_text):
""" Parses the range header, and returns a tuple of the start offset and the length,
or raises an _InvalidRangeHeader exception.
"""
found = RANGE_HEADER_REGEX.match(range_header_text)
if found is None:
raise _InvalidRangeHeader()
start = int(found.group(1))
length = int(found.group(2)) - start
if length <= 0:
raise _InvalidRangeHeader()
return (start, length)
def _upload_chunk(namespace_name, repo_name, upload_uuid):
""" Common code among the various uploading paths for appending data to blobs.
Callers MUST call .save() or .delete_instance() on the returned database object.
Returns the BlobUpload object and the error that occurred, if any (or None if none).
"""
try:
found = model.blob.get_blob_upload(namespace_name, repo_name, upload_uuid)
except model.InvalidBlobUpload:
raise BlobUploadUnknown()
start_offset, length = 0, -1
range_header = request.headers.get('range', None)
if range_header is not None:
try:
start_offset, length = _parse_range_header(range_header)
except _InvalidRangeHeader:
_range_not_satisfiable(found.byte_count)
if start_offset > 0 and start_offset > found.byte_count:
_range_not_satisfiable(found.byte_count)
location_set = {found.location.name}
upload_error = None
with database.CloseForLongOperation(app.config):
input_fp = get_input_stream(request)
if start_offset > 0 and start_offset < found.byte_count:
# Skip the bytes which were received on a previous push, which are already stored and
# included in the sha calculation
overlap_size = found.byte_count - start_offset
input_fp = StreamSlice(input_fp, overlap_size)
# Update our upload bounds to reflect the skipped portion of the overlap
start_offset = found.byte_count
length = max(length - overlap_size, 0)
# We use this to escape early in case we have already processed all of the bytes the user
# wants to upload
if length == 0:
return found, None
input_fp = wrap_with_handler(input_fp, found.sha_state.update)
# Add a hasher for calculating SHA1s for torrents if this is the first chunk and/or we have
# already calculated hash data for the previous chunk(s).
piece_hasher = None
if found.chunk_count == 0 or found.piece_sha_state:
initial_sha1_value = found.piece_sha_state or resumablehashlib.sha1()
initial_sha1_pieces_value = found.piece_hashes or ''
piece_hasher = PieceHasher(app.config['BITTORRENT_PIECE_SIZE'], start_offset,
initial_sha1_pieces_value,
initial_sha1_value)
input_fp = wrap_with_handler(input_fp, piece_hasher.update)
# If this is the first chunk and we're starting at the 0 offset, add a handler to gunzip the
# stream so we can determine the uncompressed size. We'll throw out this data if another chunk
# comes in, but in the common case Docker only sends one chunk.
size_info = None
if start_offset == 0 and found.chunk_count == 0:
size_info, fn = calculate_size_handler()
input_fp = wrap_with_handler(input_fp, fn)
chunk_result = storage.stream_upload_chunk(location_set, upload_uuid, start_offset, length,
input_fp, found.storage_metadata,
content_type=BLOB_CONTENT_TYPE)
length_written, new_metadata, upload_error = chunk_result
# If we determined an uncompressed size and this is the first chunk, add it to the blob.
# Otherwise, we clear the size from the blob as it was uploaded in multiple chunks.
if size_info is not None and found.chunk_count == 0 and size_info.is_valid:
found.uncompressed_byte_count = size_info.uncompressed_size
elif length_written > 0:
# Otherwise, if we wrote some bytes and the above conditions were not met, then we don't
# know the uncompressed size.
found.uncompressed_byte_count = None
if piece_hasher is not None:
found.piece_hashes = piece_hasher.piece_hashes
found.piece_sha_state = piece_hasher.hash_fragment
found.storage_metadata = new_metadata
found.byte_count += length_written
found.chunk_count += 1
return found, upload_error
def _finish_upload(namespace_name, repo_name, upload_obj, expected_digest):
# Verify that the digest's SHA matches that of the uploaded data.
computed_digest = digest_tools.sha256_digest_from_hashlib(upload_obj.sha_state)
if not digest_tools.digests_equal(computed_digest, expected_digest):
raise BlobUploadInvalid()
final_blob_location = digest_tools.content_path(expected_digest)
# Move the storage into place, or if this was a re-upload, cancel it
with database.CloseForLongOperation(app.config):
already_exists = storage.exists({upload_obj.location.name}, final_blob_location)
if already_exists:
# It already existed, clean up our upload which served as proof that we had the file
storage.cancel_chunked_upload({upload_obj.location.name}, upload_obj.uuid,
upload_obj.storage_metadata)
else:
# We were the first ones to upload this image (at least to this location)
# Let's copy it into place
storage.complete_chunked_upload({upload_obj.location.name}, upload_obj.uuid,
final_blob_location, upload_obj.storage_metadata)
# Mark the blob as uploaded.
blob_storage = model.blob.store_blob_record_and_temp_link(namespace_name, repo_name, expected_digest,
upload_obj.location,
upload_obj.byte_count,
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'],
upload_obj.uncompressed_byte_count)
if upload_obj.piece_sha_state is not None and not already_exists:
piece_bytes = upload_obj.piece_hashes + upload_obj.piece_sha_state.digest()
model.storage.save_torrent_info(blob_storage, app.config['BITTORRENT_PIECE_SIZE'], piece_bytes)
# Delete the upload tracking row.
upload_obj.delete_instance()
response = make_response('', 201)
response.headers['Docker-Content-Digest'] = expected_digest
response.headers['Location'] = url_for('v2.download_blob',
repository='%s/%s' % (namespace_name, repo_name),
digest=expected_digest)
return response
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['PATCH'])
@parse_repository_name()
@process_registry_jwt_auth(scopes=['pull', 'push'])
@require_repo_write
@anon_protect
def upload_chunk(namespace_name, repo_name, upload_uuid):
blob_upload, upload_error = _upload_chunk(namespace_name, repo_name, upload_uuid)
blob_upload.save()
if upload_error:
logger.error('Got error when uploading chunk for blob %s under repository %s/%s: %s',
namespace_name, repo_name, upload_uuid, upload_error)
_range_not_satisfiable(blob_upload.byte_count)
accepted = make_response('', 204)
accepted.headers['Location'] = _current_request_path()
accepted.headers['Range'] = _render_range(blob_upload.byte_count, with_bytes_prefix=False)
accepted.headers['Docker-Upload-UUID'] = upload_uuid
return accepted
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['PUT'])
@parse_repository_name()
@process_registry_jwt_auth(scopes=['pull', 'push'])
@require_repo_write
@anon_protect
def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid):
digest = request.args.get('digest', None)
if digest is None:
raise BlobUploadInvalid()
blob_upload, upload_error = _upload_chunk(namespace_name, repo_name, upload_uuid)
blob_upload.save()
if upload_error:
logger.error('Got error when uploading chunk for blob %s under repository %s/%s: %s',
namespace_name, repo_name, upload_uuid, upload_error)
_range_not_satisfiable(blob_upload.byte_count)
return _finish_upload(namespace_name, repo_name, blob_upload, digest)
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['DELETE'])
@parse_repository_name()
@process_registry_jwt_auth(scopes=['pull', 'push'])
@require_repo_write
@anon_protect
def cancel_upload(namespace_name, repo_name, upload_uuid):
try:
found = model.blob.get_blob_upload(namespace_name, repo_name, upload_uuid)
except model.InvalidBlobUpload:
raise BlobUploadUnknown()
# We delete the record for the upload first, since if the partial upload in
# storage fails to delete, it doesn't break anything
found.delete_instance()
storage.cancel_chunked_upload({found.location.name}, found.uuid, found.storage_metadata)
return make_response('', 204)
@v2_bp.route('/<repopath:repository>/blobs/<digest>', methods=['DELETE'])
@parse_repository_name()
@process_registry_jwt_auth(scopes=['pull', 'push'])
@require_repo_write
@anon_protect
def delete_digest(namespace_name, repo_name, upload_uuid):
# We do not support deleting arbitrary digests, as they break repo images.
raise Unsupported()