This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/v2/blob.py

512 lines
18 KiB
Python
Raw Normal View History

import logging
import re
import time
2016-08-09 16:28:00 +00:00
from flask import url_for, request, redirect, Response, abort as flask_abort
import bitmath
import resumablehashlib
from app import storage, app, get_app_url, metric_queue
from auth.registry_jwt_auth import process_registry_jwt_auth
2016-08-02 00:48:34 +00:00
from data import database
from digest import digest_tools
2016-03-09 21:20:28 +00:00
from endpoints.common import parse_repository_name
from endpoints.decorators import anon_protect
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
2017-06-26 22:16:15 +00:00
from endpoints.v2.errors import (
BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported, NameUnknown, LayerTooLarge)
from endpoints.v2.models_pre_oci import data_model as model
from util.cache import cache_control
from util.registry.filelike import wrap_with_handler, StreamSlice
from util.registry.gzipstream import calculate_size_handler
from util.registry.torrent import PieceHasher
logger = logging.getLogger(__name__)
BASE_BLOB_ROUTE = '/<repopath:repository>/blobs/<regex("{0}"):digest>'
BLOB_DIGEST_ROUTE = BASE_BLOB_ROUTE.format(digest_tools.DIGEST_PATTERN)
RANGE_HEADER_REGEX = re.compile(r'^bytes=([0-9]+)-([0-9]+)$')
2015-11-30 20:45:45 +00:00
BLOB_CONTENT_TYPE = 'application/octet-stream'
class _InvalidRangeHeader(Exception):
pass
@v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD'])
2016-03-09 21:20:28 +00:00
@parse_repository_name()
@process_registry_jwt_auth(scopes=['pull'])
@require_repo_read
@anon_protect
@cache_control(max_age=31436000)
2016-03-09 21:20:28 +00:00
def check_blob_exists(namespace_name, repo_name, digest):
2016-08-02 00:48:34 +00:00
# Find the blob.
blob = model.get_blob_by_digest(namespace_name, repo_name, digest)
2016-08-02 00:48:34 +00:00
if blob is None:
raise BlobUnknown()
2016-08-02 00:48:34 +00:00
# Build the response headers.
headers = {
'Docker-Content-Digest': digest,
'Content-Length': blob.size,
2017-06-26 22:16:15 +00:00
'Content-Type': BLOB_CONTENT_TYPE,}
2016-08-02 00:48:34 +00:00
# If our storage supports range requests, let the client know.
2016-08-02 00:48:34 +00:00
if storage.get_supports_resumable_downloads(blob.locations):
headers['Accept-Ranges'] = 'bytes'
# Write the response to the client.
2016-08-09 16:28:00 +00:00
return Response(headers=headers)
@v2_bp.route(BLOB_DIGEST_ROUTE, methods=['GET'])
2016-03-09 21:20:28 +00:00
@parse_repository_name()
@process_registry_jwt_auth(scopes=['pull'])
@require_repo_read
@anon_protect
@cache_control(max_age=31536000)
2016-03-09 21:20:28 +00:00
def download_blob(namespace_name, repo_name, digest):
2016-08-02 00:48:34 +00:00
# Find the blob.
blob = model.get_blob_by_digest(namespace_name, repo_name, digest)
2016-08-02 00:48:34 +00:00
if blob is None:
raise BlobUnknown()
2016-08-02 00:48:34 +00:00
# Build the response headers.
headers = {'Docker-Content-Digest': digest}
# If our storage supports range requests, let the client know.
2016-08-02 00:48:34 +00:00
if storage.get_supports_resumable_downloads(blob.locations):
headers['Accept-Ranges'] = 'bytes'
# Find the storage path for the blob.
path = model.get_blob_path(blob)
2016-08-02 00:48:34 +00:00
# Short-circuit by redirecting if the storage supports it.
logger.debug('Looking up the direct download URL for path: %s', path)
direct_download_url = storage.get_direct_download_url(blob.locations, path)
if direct_download_url:
logger.debug('Returning direct download URL')
resp = redirect(direct_download_url)
resp.headers.extend(headers)
return resp
2016-08-02 00:48:34 +00:00
# Close the database connection before we stream the download.
logger.debug('Closing database connection before streaming layer data')
with database.CloseForLongOperation(app.config):
# Stream the response to the client.
return Response(
storage.stream_read(blob.locations, path),
headers=headers.update({
'Content-Length': blob.size,
2017-06-26 22:16:15 +00:00
'Content-Type': BLOB_CONTENT_TYPE,}),)
@v2_bp.route('/<repopath:repository>/blobs/uploads/', methods=['POST'])
2016-03-09 21:20:28 +00:00
@parse_repository_name()
@process_registry_jwt_auth(scopes=['pull', 'push'])
@require_repo_write
@anon_protect
2016-03-09 21:20:28 +00:00
def start_blob_upload(namespace_name, repo_name):
2016-08-02 00:48:34 +00:00
# Begin the blob upload process in the database and storage.
location_name = storage.preferred_locations[0]
new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name)
repository_exists = model.create_blob_upload(namespace_name, repo_name, new_upload_uuid,
location_name, upload_metadata)
2016-08-02 00:48:34 +00:00
if not repository_exists:
raise NameUnknown()
digest = request.args.get('digest', None)
if digest is None:
2016-08-02 00:48:34 +00:00
# Short-circuit because the user will send the blob data in another request.
2016-08-09 16:28:00 +00:00
return Response(
status=202,
headers={
2017-06-26 22:16:15 +00:00
'Docker-Upload-UUID':
new_upload_uuid,
'Range':
_render_range(0),
'Location':
get_app_url() + url_for('v2.upload_chunk', repository='%s/%s' %
(namespace_name, repo_name), upload_uuid=new_upload_uuid)},)
2016-08-02 00:48:34 +00:00
# The user plans to send us the entire body right now.
# Find the upload.
blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, new_upload_uuid)
2016-08-02 00:48:34 +00:00
if blob_upload is None:
raise BlobUploadUnknown()
# Upload the chunk to storage while calculating some metadata and updating
# the upload state.
updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range'))
2016-08-02 00:48:34 +00:00
if updated_blob_upload is None:
_abort_range_not_satisfiable(blob_upload.byte_count, new_upload_uuid)
2016-08-02 00:48:34 +00:00
# Save the upload state to the database.
model.update_blob_upload(updated_blob_upload)
2016-08-02 00:48:34 +00:00
# Finalize the upload process in the database and storage.
_finish_upload(namespace_name, repo_name, updated_blob_upload, digest)
# Write the response to the client.
2016-08-09 16:28:00 +00:00
return Response(
status=201,
headers={
2017-06-26 22:16:15 +00:00
'Docker-Content-Digest':
digest,
'Location':
get_app_url() + url_for('v2.download_blob', repository='%s/%s' %
(namespace_name, repo_name), digest=digest),},)
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['GET'])
2016-03-09 21:20:28 +00:00
@parse_repository_name()
@process_registry_jwt_auth(scopes=['pull'])
@require_repo_write
@anon_protect
2016-03-09 21:20:28 +00:00
def fetch_existing_upload(namespace_name, repo_name, upload_uuid):
blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid)
2016-08-02 00:48:34 +00:00
if blob_upload is None:
raise BlobUploadUnknown()
2016-08-09 16:28:00 +00:00
return Response(
status=204,
headers={
'Docker-Upload-UUID': upload_uuid,
2017-06-26 22:16:15 +00:00
'Range': _render_range(blob_upload.byte_count + 1), # byte ranges are exclusive
},)
2016-08-02 00:48:34 +00:00
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['PATCH'])
@parse_repository_name()
@process_registry_jwt_auth(scopes=['pull', 'push'])
@require_repo_write
@anon_protect
def upload_chunk(namespace_name, repo_name, upload_uuid):
# Find the upload.
blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid)
2016-08-02 00:48:34 +00:00
if blob_upload is None:
raise BlobUploadUnknown()
2016-08-02 00:48:34 +00:00
# Upload the chunk to storage while calculating some metadata and updating
# the upload state.
updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range'))
2016-08-02 00:48:34 +00:00
if updated_blob_upload is None:
_abort_range_not_satisfiable(blob_upload.byte_count, upload_uuid)
2016-08-02 00:48:34 +00:00
# Save the upload state to the database.
model.update_blob_upload(updated_blob_upload)
2016-08-02 00:48:34 +00:00
# Write the response to the client.
2016-08-09 16:28:00 +00:00
return Response(
status=204,
headers={
'Location': _current_request_url(),
2016-08-09 16:28:00 +00:00
'Range': _render_range(updated_blob_upload.byte_count, with_bytes_prefix=False),
2017-06-26 22:16:15 +00:00
'Docker-Upload-UUID': upload_uuid,},)
2016-08-02 00:48:34 +00:00
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['PUT'])
@parse_repository_name()
@process_registry_jwt_auth(scopes=['pull', 'push'])
@require_repo_write
@anon_protect
def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid):
# Ensure the digest is present before proceeding.
digest = request.args.get('digest', None)
if digest is None:
raise BlobUploadInvalid(detail={'reason': 'Missing digest arg on monolithic upload'})
2016-08-02 00:48:34 +00:00
# Find the upload.
blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid)
2016-08-02 00:48:34 +00:00
if blob_upload is None:
raise BlobUploadUnknown()
# Upload the chunk to storage while calculating some metadata and updating
# the upload state.
updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range'))
2016-08-02 00:48:34 +00:00
if updated_blob_upload is None:
_abort_range_not_satisfiable(blob_upload.byte_count, upload_uuid)
2016-08-02 00:48:34 +00:00
# Finalize the upload process in the database and storage.
_finish_upload(namespace_name, repo_name, updated_blob_upload, digest)
# Write the response to the client.
2017-06-26 22:16:15 +00:00
return Response(status=201, headers={
'Docker-Content-Digest':
digest,
'Location':
get_app_url() + url_for('v2.download_blob', repository='%s/%s' %
(namespace_name, repo_name), digest=digest),})
2016-08-02 00:48:34 +00:00
@v2_bp.route('/<repopath:repository>/blobs/uploads/<upload_uuid>', methods=['DELETE'])
@parse_repository_name()
@process_registry_jwt_auth(scopes=['pull', 'push'])
@require_repo_write
@anon_protect
def cancel_upload(namespace_name, repo_name, upload_uuid):
blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid)
if blob_upload is None:
2016-08-02 00:48:34 +00:00
raise BlobUploadUnknown()
# We delete the record for the upload first, since if the partial upload in
# storage fails to delete, it doesn't break anything.
model.delete_blob_upload(namespace_name, repo_name, upload_uuid)
storage.cancel_chunked_upload({blob_upload.location_name}, blob_upload.uuid,
blob_upload.storage_metadata)
2016-08-02 00:48:34 +00:00
2016-08-09 16:28:00 +00:00
return Response(status=204)
2016-08-02 00:48:34 +00:00
@v2_bp.route('/<repopath:repository>/blobs/<digest>', methods=['DELETE'])
@parse_repository_name()
@process_registry_jwt_auth(scopes=['pull', 'push'])
@require_repo_write
@anon_protect
def delete_digest(namespace_name, repo_name, upload_uuid):
# We do not support deleting arbitrary digests, as they break repo images.
raise Unsupported()
def _render_range(num_uploaded_bytes, with_bytes_prefix=True):
"""
Returns a string formatted to be used in the Range header.
"""
2016-08-02 00:48:34 +00:00
return '{0}0-{1}'.format('bytes=' if with_bytes_prefix else '', num_uploaded_bytes - 1)
def _current_request_url():
return '{0}{1}{2}'.format(get_app_url(), request.script_root, request.path)
2016-08-02 00:48:34 +00:00
def _abort_range_not_satisfiable(valid_end, upload_uuid):
"""
Writes a failure response for scenarios where the registry cannot function
with the provided range.
TODO(jzelinskie): Unify this with the V2RegistryException class.
"""
2017-06-26 22:16:15 +00:00
flask_abort(
Response(status=416, headers={
'Location': _current_request_url(),
'Range': '0-{0}'.format(valid_end),
'Docker-Upload-UUID': upload_uuid}))
def _parse_range_header(range_header_text):
2016-08-02 00:48:34 +00:00
"""
Parses the range header.
Returns a tuple of the start offset and the length.
If the parse fails, raises _InvalidRangeHeader.
"""
found = RANGE_HEADER_REGEX.match(range_header_text)
if found is None:
raise _InvalidRangeHeader()
start = int(found.group(1))
length = int(found.group(2)) - start
if length <= 0:
raise _InvalidRangeHeader()
return (start, length)
def _start_offset_and_length(range_header):
2016-08-02 00:48:34 +00:00
"""
Returns a tuple of the start offset and the length.
If the range header doesn't exist, defaults to (0, -1).
If parsing fails, returns (None, None).
"""
start_offset, length = 0, -1
if range_header is not None:
try:
start_offset, length = _parse_range_header(range_header)
except _InvalidRangeHeader:
2016-08-02 00:48:34 +00:00
return None, None
2016-08-02 00:48:34 +00:00
return start_offset, length
def _upload_chunk(blob_upload, range_header):
2016-08-02 00:48:34 +00:00
"""
Calculates metadata while uploading a chunk to storage.
Returns a BlobUpload object or None if there was a failure.
"""
max_layer_size = bitmath.parse_string_unsafe(app.config['MAXIMUM_LAYER_SIZE'])
# Get the offset and length of the current chunk.
start_offset, length = _start_offset_and_length(range_header)
if blob_upload is None or None in {start_offset, length}:
logger.error('Invalid arguments provided to _upload_chunk')
2016-08-02 00:48:34 +00:00
return None
2016-08-02 00:48:34 +00:00
if start_offset > 0 and start_offset > blob_upload.byte_count:
2016-08-09 19:11:35 +00:00
logger.error('start_offset provided to _upload_chunk greater than blob.upload.byte_count')
2016-08-02 00:48:34 +00:00
return None
# Check if we should raise 413 before accepting the data.
uploaded = bitmath.Byte(length + start_offset)
if length > -1 and uploaded > max_layer_size:
raise LayerTooLarge(uploaded=uploaded.bytes, max_allowed=max_layer_size.bytes)
2016-08-02 00:48:34 +00:00
location_set = {blob_upload.location_name}
upload_error = None
with database.CloseForLongOperation(app.config):
input_fp = get_input_stream(request)
2016-08-02 00:48:34 +00:00
if start_offset > 0 and start_offset < blob_upload.byte_count:
# Skip the bytes which were received on a previous push, which are already stored and
# included in the sha calculation
2016-08-02 00:48:34 +00:00
overlap_size = blob_upload.byte_count - start_offset
input_fp = StreamSlice(input_fp, overlap_size)
# Update our upload bounds to reflect the skipped portion of the overlap
2016-08-02 00:48:34 +00:00
start_offset = blob_upload.byte_count
length = max(length - overlap_size, 0)
# We use this to escape early in case we have already processed all of the bytes the user
# wants to upload
if length == 0:
2016-08-02 00:48:34 +00:00
return blob_upload
2016-08-02 00:48:34 +00:00
input_fp = wrap_with_handler(input_fp, blob_upload.sha_state.update)
2016-01-12 22:32:55 +00:00
# Add a hasher for calculating SHA1s for torrents if this is the first chunk and/or we have
# already calculated hash data for the previous chunk(s).
piece_hasher = None
2016-08-02 00:48:34 +00:00
if blob_upload.chunk_count == 0 or blob_upload.piece_sha_state:
initial_sha1_value = blob_upload.piece_sha_state or resumablehashlib.sha1()
initial_sha1_pieces_value = blob_upload.piece_hashes or ''
2016-01-12 22:32:55 +00:00
2016-01-22 20:52:28 +00:00
piece_hasher = PieceHasher(app.config['BITTORRENT_PIECE_SIZE'], start_offset,
2016-08-02 00:48:34 +00:00
initial_sha1_pieces_value, initial_sha1_value)
2016-01-12 22:32:55 +00:00
input_fp = wrap_with_handler(input_fp, piece_hasher.update)
# If this is the first chunk and we're starting at the 0 offset, add a handler to gunzip the
# stream so we can determine the uncompressed size. We'll throw out this data if another chunk
# comes in, but in the common case the docker client only sends one chunk.
size_info = None
2016-08-02 00:48:34 +00:00
if start_offset == 0 and blob_upload.chunk_count == 0:
size_info, fn = calculate_size_handler()
input_fp = wrap_with_handler(input_fp, fn)
start_time = time.time()
length_written, new_metadata, upload_error = storage.stream_upload_chunk(
2016-08-09 19:11:35 +00:00
location_set,
blob_upload.uuid,
start_offset,
length,
input_fp,
blob_upload.storage_metadata,
2017-06-26 22:16:15 +00:00
content_type=BLOB_CONTENT_TYPE,)
if upload_error is not None:
logger.error('storage.stream_upload_chunk returned error %s', upload_error)
2016-08-02 00:48:34 +00:00
return None
# Update the chunk upload time metric.
2017-06-26 22:16:15 +00:00
metric_queue.chunk_upload_time.Observe(time.time() - start_time, labelvalues=[
length_written, list(location_set)[0]])
# If we determined an uncompressed size and this is the first chunk, add it to the blob.
# Otherwise, we clear the size from the blob as it was uploaded in multiple chunks.
2016-08-02 00:48:34 +00:00
if size_info is not None and blob_upload.chunk_count == 0 and size_info.is_valid:
blob_upload.uncompressed_byte_count = size_info.uncompressed_size
elif length_written > 0:
# Otherwise, if we wrote some bytes and the above conditions were not met, then we don't
# know the uncompressed size.
2016-08-02 00:48:34 +00:00
blob_upload.uncompressed_byte_count = None
if piece_hasher is not None:
2016-08-02 00:48:34 +00:00
blob_upload.piece_hashes = piece_hasher.piece_hashes
blob_upload.piece_sha_state = piece_hasher.hash_fragment
2016-08-02 00:48:34 +00:00
blob_upload.storage_metadata = new_metadata
blob_upload.byte_count += length_written
blob_upload.chunk_count += 1
# Ensure we have not gone beyond the max layer size.
upload_size = bitmath.Byte(blob_upload.byte_count)
if upload_size > max_layer_size:
raise LayerTooLarge(uploaded=upload_size.bytes, max_allowed=max_layer_size.bytes)
2016-08-02 00:48:34 +00:00
return blob_upload
2016-08-02 00:48:34 +00:00
def _validate_digest(blob_upload, expected_digest):
"""
Verifies that the digest's SHA matches that of the uploaded data.
"""
computed_digest = digest_tools.sha256_digest_from_hashlib(blob_upload.sha_state)
if not digest_tools.digests_equal(computed_digest, expected_digest):
logger.error('Digest mismatch for upload %s: Expected digest %s, found digest %s',
blob_upload.uuid, expected_digest, computed_digest)
raise BlobUploadInvalid(detail={'reason': 'Digest mismatch on uploaded blob'})
2016-08-02 00:48:34 +00:00
def _finalize_blob_storage(blob_upload, expected_digest):
"""
When an upload is successful, this ends the uploading process from the
storage's perspective.
Returns True if the blob already existed.
"""
final_blob_location = digest_tools.content_path(expected_digest)
# Move the storage into place, or if this was a re-upload, cancel it
with database.CloseForLongOperation(app.config):
2016-08-02 00:48:34 +00:00
already_existed = storage.exists({blob_upload.location_name}, final_blob_location)
if already_existed:
# It already existed, clean up our upload which served as proof that the
# uploader had the blob.
storage.cancel_chunked_upload({blob_upload.location_name}, blob_upload.uuid,
blob_upload.storage_metadata)
else:
# We were the first ones to upload this image (at least to this location)
# Let's copy it into place
2016-08-02 00:48:34 +00:00
storage.complete_chunked_upload({blob_upload.location_name}, blob_upload.uuid,
final_blob_location, blob_upload.storage_metadata)
return already_existed
2016-08-02 00:48:34 +00:00
def _finalize_blob_database(namespace_name, repo_name, blob_upload, digest, already_existed):
"""
When an upload is successful, this ends the uploading process from the
database's perspective.
"""
# Create the blob and temporarily tag it.
blob_storage = model.create_blob_and_temp_tag(
2016-08-02 00:48:34 +00:00
namespace_name,
repo_name,
digest,
blob_upload,
2017-06-26 22:16:15 +00:00
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'],)
2016-08-02 00:48:34 +00:00
# If it doesn't already exist, create the BitTorrent pieces for the blob.
if blob_upload.piece_sha_state is not None and not already_existed:
piece_bytes = blob_upload.piece_hashes + blob_upload.piece_sha_state.digest()
model.save_bittorrent_pieces(blob_storage, app.config['BITTORRENT_PIECE_SIZE'], piece_bytes)
2016-08-02 00:48:34 +00:00
# Delete the blob upload.
model.delete_blob_upload(namespace_name, repo_name, blob_upload.uuid)
2016-08-02 00:48:34 +00:00
def _finish_upload(namespace_name, repo_name, blob_upload, digest):
"""
When an upload is successful, this ends the uploading process.
"""
_validate_digest(blob_upload, digest)
_finalize_blob_database(
namespace_name,
repo_name,
blob_upload,
digest,
2017-06-26 22:16:15 +00:00
_finalize_blob_storage(blob_upload, digest),)