import logging import re from flask import url_for, request, redirect, Response, abort as flask_abort import resumablehashlib from app import storage, app from auth.registry_jwt_auth import process_registry_jwt_auth from data import database from data.interfaces.v2 import pre_oci_model as model from digest import digest_tools from endpoints.common import parse_repository_name from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream from endpoints.v2.errors import (BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported, NameUnknown) from endpoints.decorators import anon_protect from util.cache import cache_control from util.registry.filelike import wrap_with_handler, StreamSlice from util.registry.gzipstream import calculate_size_handler from util.registry.torrent import PieceHasher logger = logging.getLogger(__name__) BASE_BLOB_ROUTE = '//blobs/' BLOB_DIGEST_ROUTE = BASE_BLOB_ROUTE.format(digest_tools.DIGEST_PATTERN) RANGE_HEADER_REGEX = re.compile(r'^bytes=([0-9]+)-([0-9]+)$') BLOB_CONTENT_TYPE = 'application/octet-stream' class _InvalidRangeHeader(Exception): pass @v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD']) @parse_repository_name() @process_registry_jwt_auth(scopes=['pull']) @require_repo_read @anon_protect @cache_control(max_age=31436000) def check_blob_exists(namespace_name, repo_name, digest): # Find the blob. blob = model.get_blob_by_digest(namespace_name, repo_name, digest) if blob is None: raise BlobUnknown() # Build the response headers. headers = { 'Docker-Content-Digest': digest, 'Content-Length': blob.size, 'Content-Type': BLOB_CONTENT_TYPE, } # If our storage supports range requests, let the client know. if storage.get_supports_resumable_downloads(blob.locations): headers['Accept-Ranges'] = 'bytes' # Write the response to the client. return Response(headers=headers) @v2_bp.route(BLOB_DIGEST_ROUTE, methods=['GET']) @parse_repository_name() @process_registry_jwt_auth(scopes=['pull']) @require_repo_read @anon_protect @cache_control(max_age=31536000) def download_blob(namespace_name, repo_name, digest): # Find the blob. blob = model.get_blob_by_digest(namespace_name, repo_name, digest) if blob is None: raise BlobUnknown() # Build the response headers. headers = {'Docker-Content-Digest': digest} # If our storage supports range requests, let the client know. if storage.get_supports_resumable_downloads(blob.locations): headers['Accept-Ranges'] = 'bytes' # Find the storage path for the blob. path = model.get_blob_path(blob) # Short-circuit by redirecting if the storage supports it. logger.debug('Looking up the direct download URL for path: %s', path) direct_download_url = storage.get_direct_download_url(blob.locations, path) if direct_download_url: logger.debug('Returning direct download URL') resp = redirect(direct_download_url) resp.headers.extend(headers) return resp # Close the database connection before we stream the download. logger.debug('Closing database connection before streaming layer data') with database.CloseForLongOperation(app.config): # Stream the response to the client. return Response( storage.stream_read(blob.locations, path), headers=headers.update({ 'Content-Length': blob.size, 'Content-Type': BLOB_CONTENT_TYPE, }), ) @v2_bp.route('//blobs/uploads/', methods=['POST']) @parse_repository_name() @process_registry_jwt_auth(scopes=['pull', 'push']) @require_repo_write @anon_protect def start_blob_upload(namespace_name, repo_name): # Begin the blob upload process in the database and storage. location_name = storage.preferred_locations[0] new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name) repository_exists = model.create_blob_upload(namespace_name, repo_name, new_upload_uuid, location_name, upload_metadata) if not repository_exists: raise NameUnknown() digest = request.args.get('digest', None) if digest is None: # Short-circuit because the user will send the blob data in another request. return Response( status=202, headers={ 'Docker-Upload-UUID': new_upload_uuid, 'Range': _render_range(0), 'Location': url_for('v2.upload_chunk', repository='%s/%s' % (namespace_name, repo_name), upload_uuid=new_upload_uuid) }, ) # The user plans to send us the entire body right now. # Find the upload. blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, new_upload_uuid) if blob_upload is None: raise BlobUploadUnknown() # Upload the chunk to storage while calculating some metadata and updating # the upload state. updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range')) if updated_blob_upload is None: _abort_range_not_satisfiable(blob_upload.byte_count, new_upload_uuid) # Save the upload state to the database. model.update_blob_upload(updated_blob_upload) # Finalize the upload process in the database and storage. _finish_upload(namespace_name, repo_name, updated_blob_upload, digest) # Write the response to the client. return Response( status=201, headers={ 'Docker-Content-Digest': digest, 'Location': url_for('v2.download_blob', repository='%s/%s' % (namespace_name, repo_name), digest=digest), }, ) @v2_bp.route('//blobs/uploads/', methods=['GET']) @parse_repository_name() @process_registry_jwt_auth(scopes=['pull']) @require_repo_write @anon_protect def fetch_existing_upload(namespace_name, repo_name, upload_uuid): blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid) if blob_upload is None: raise BlobUploadUnknown() return Response( status=204, headers={ 'Docker-Upload-UUID': upload_uuid, 'Range': _render_range(blob_upload.byte_count+1), # byte ranges are exclusive }, ) @v2_bp.route('//blobs/uploads/', methods=['PATCH']) @parse_repository_name() @process_registry_jwt_auth(scopes=['pull', 'push']) @require_repo_write @anon_protect def upload_chunk(namespace_name, repo_name, upload_uuid): # Find the upload. blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid) if blob_upload is None: raise BlobUploadUnknown() # Upload the chunk to storage while calculating some metadata and updating # the upload state. updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range')) if updated_blob_upload is None: _abort_range_not_satisfiable(blob_upload.byte_count, upload_uuid) # Save the upload state to the database. model.update_blob_upload(updated_blob_upload) # Write the response to the client. return Response( status=204, headers={ 'Location': _current_request_path(), 'Range': _render_range(updated_blob_upload.byte_count, with_bytes_prefix=False), 'Docker-Upload-UUID': upload_uuid, }, ) @v2_bp.route('//blobs/uploads/', methods=['PUT']) @parse_repository_name() @process_registry_jwt_auth(scopes=['pull', 'push']) @require_repo_write @anon_protect def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid): # Ensure the digest is present before proceeding. digest = request.args.get('digest', None) if digest is None: raise BlobUploadInvalid(detail={'reason': 'Missing digest arg on monolithic upload'}) # Find the upload. blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid) if blob_upload is None: raise BlobUploadUnknown() # Upload the chunk to storage while calculating some metadata and updating # the upload state. updated_blob_upload = _upload_chunk(blob_upload, request.headers.get('range')) if updated_blob_upload is None: _abort_range_not_satisfiable(blob_upload.byte_count, upload_uuid) # Finalize the upload process in the database and storage. _finish_upload(namespace_name, repo_name, updated_blob_upload, digest) # Write the response to the client. return Response( status=201, headers={ 'Docker-Content-Digest': digest, 'Location': url_for('v2.download_blob', repository='%s/%s' % (namespace_name, repo_name), digest=digest), } ) @v2_bp.route('//blobs/uploads/', methods=['DELETE']) @parse_repository_name() @process_registry_jwt_auth(scopes=['pull', 'push']) @require_repo_write @anon_protect def cancel_upload(namespace_name, repo_name, upload_uuid): blob_upload = model.blob_upload_by_uuid(namespace_name, repo_name, upload_uuid) if blob_upload is None: raise BlobUploadUnknown() # We delete the record for the upload first, since if the partial upload in # storage fails to delete, it doesn't break anything. model.delete_blob_upload(namespace_name, repo_name, upload_uuid) storage.cancel_chunked_upload({blob_upload.location_name}, blob_upload.uuid, blob_upload.storage_metadata) return Response(status=204) @v2_bp.route('//blobs/', methods=['DELETE']) @parse_repository_name() @process_registry_jwt_auth(scopes=['pull', 'push']) @require_repo_write @anon_protect def delete_digest(namespace_name, repo_name, upload_uuid): # We do not support deleting arbitrary digests, as they break repo images. raise Unsupported() def _render_range(num_uploaded_bytes, with_bytes_prefix=True): """ Returns a string formatted to be used in the Range header. """ return '{0}0-{1}'.format('bytes=' if with_bytes_prefix else '', num_uploaded_bytes - 1) def _current_request_path(): return '{0}{1}'.format(request.script_root, request.path) def _abort_range_not_satisfiable(valid_end, upload_uuid): """ Writes a failure response for scenarios where the registry cannot function with the provided range. TODO(jzelinskie): Unify this with the V2RegistryException class. """ flask_abort(Response(status=416, headers={'Location': _current_request_path(), 'Range': '0-{0}'.format(valid_end), 'Docker-Upload-UUID': upload_uuid})) def _parse_range_header(range_header_text): """ Parses the range header. Returns a tuple of the start offset and the length. If the parse fails, raises _InvalidRangeHeader. """ found = RANGE_HEADER_REGEX.match(range_header_text) if found is None: raise _InvalidRangeHeader() start = int(found.group(1)) length = int(found.group(2)) - start if length <= 0: raise _InvalidRangeHeader() return (start, length) def _start_offset_and_length(range_header): """ Returns a tuple of the start offset and the length. If the range header doesn't exist, defaults to (0, -1). If parsing fails, returns (None, None). """ start_offset, length = 0, -1 if range_header is not None: try: start_offset, length = _parse_range_header(range_header) except _InvalidRangeHeader: return None, None return start_offset, length def _upload_chunk(blob_upload, range_header): """ Calculates metadata while uploading a chunk to storage. Returns a BlobUpload object or None if there was a failure. """ # Get the offset and length of the current chunk. start_offset, length = _start_offset_and_length(range_header) if blob_upload is None or None in {start_offset, length}: logger.error('Invalid arguments provided to _upload_chunk') return None if start_offset > 0 and start_offset > blob_upload.byte_count: logger.error('start_offset provided to _upload_chunk greater than blob.upload.byte_count') return None location_set = {blob_upload.location_name} upload_error = None with database.CloseForLongOperation(app.config): input_fp = get_input_stream(request) if start_offset > 0 and start_offset < blob_upload.byte_count: # Skip the bytes which were received on a previous push, which are already stored and # included in the sha calculation overlap_size = blob_upload.byte_count - start_offset input_fp = StreamSlice(input_fp, overlap_size) # Update our upload bounds to reflect the skipped portion of the overlap start_offset = blob_upload.byte_count length = max(length - overlap_size, 0) # We use this to escape early in case we have already processed all of the bytes the user # wants to upload if length == 0: return blob_upload input_fp = wrap_with_handler(input_fp, blob_upload.sha_state.update) # Add a hasher for calculating SHA1s for torrents if this is the first chunk and/or we have # already calculated hash data for the previous chunk(s). piece_hasher = None if blob_upload.chunk_count == 0 or blob_upload.piece_sha_state: initial_sha1_value = blob_upload.piece_sha_state or resumablehashlib.sha1() initial_sha1_pieces_value = blob_upload.piece_hashes or '' piece_hasher = PieceHasher(app.config['BITTORRENT_PIECE_SIZE'], start_offset, initial_sha1_pieces_value, initial_sha1_value) input_fp = wrap_with_handler(input_fp, piece_hasher.update) # If this is the first chunk and we're starting at the 0 offset, add a handler to gunzip the # stream so we can determine the uncompressed size. We'll throw out this data if another chunk # comes in, but in the common case the docker client only sends one chunk. size_info = None if start_offset == 0 and blob_upload.chunk_count == 0: size_info, fn = calculate_size_handler() input_fp = wrap_with_handler(input_fp, fn) length_written, new_metadata, upload_error = storage.stream_upload_chunk( location_set, blob_upload.uuid, start_offset, length, input_fp, blob_upload.storage_metadata, content_type=BLOB_CONTENT_TYPE, ) if upload_error is not None: logger.error('storage.stream_upload_chunk returned error %s', upload_error) return None # If we determined an uncompressed size and this is the first chunk, add it to the blob. # Otherwise, we clear the size from the blob as it was uploaded in multiple chunks. if size_info is not None and blob_upload.chunk_count == 0 and size_info.is_valid: blob_upload.uncompressed_byte_count = size_info.uncompressed_size elif length_written > 0: # Otherwise, if we wrote some bytes and the above conditions were not met, then we don't # know the uncompressed size. blob_upload.uncompressed_byte_count = None if piece_hasher is not None: blob_upload.piece_hashes = piece_hasher.piece_hashes blob_upload.piece_sha_state = piece_hasher.hash_fragment blob_upload.storage_metadata = new_metadata blob_upload.byte_count += length_written blob_upload.chunk_count += 1 return blob_upload def _validate_digest(blob_upload, expected_digest): """ Verifies that the digest's SHA matches that of the uploaded data. """ computed_digest = digest_tools.sha256_digest_from_hashlib(blob_upload.sha_state) if not digest_tools.digests_equal(computed_digest, expected_digest): logger.error('Digest mismatch for upload %s: Expected digest %s, found digest %s', blob_upload.uuid, expected_digest, computed_digest) raise BlobUploadInvalid(detail={'reason': 'Digest mismatch on uploaded blob'}) def _finalize_blob_storage(blob_upload, expected_digest): """ When an upload is successful, this ends the uploading process from the storage's perspective. Returns True if the blob already existed. """ final_blob_location = digest_tools.content_path(expected_digest) # Move the storage into place, or if this was a re-upload, cancel it with database.CloseForLongOperation(app.config): already_existed = storage.exists({blob_upload.location_name}, final_blob_location) if already_existed: # It already existed, clean up our upload which served as proof that the # uploader had the blob. storage.cancel_chunked_upload({blob_upload.location_name}, blob_upload.uuid, blob_upload.storage_metadata) else: # We were the first ones to upload this image (at least to this location) # Let's copy it into place storage.complete_chunked_upload({blob_upload.location_name}, blob_upload.uuid, final_blob_location, blob_upload.storage_metadata) return already_existed def _finalize_blob_database(namespace_name, repo_name, blob_upload, digest, already_existed): """ When an upload is successful, this ends the uploading process from the database's perspective. """ # Create the blob and temporarily tag it. blob_storage = model.create_blob_and_temp_tag( namespace_name, repo_name, digest, blob_upload, app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'], ) # If it doesn't already exist, create the BitTorrent pieces for the blob. if blob_upload.piece_sha_state is not None and not already_existed: piece_bytes = blob_upload.piece_hashes + blob_upload.piece_sha_state.digest() model.save_bittorrent_pieces(blob_storage, app.config['BITTORRENT_PIECE_SIZE'], piece_bytes) # Delete the blob upload. model.delete_blob_upload(namespace_name, repo_name, blob_upload.uuid) def _finish_upload(namespace_name, repo_name, blob_upload, digest): """ When an upload is successful, this ends the uploading process. """ _validate_digest(blob_upload, digest) _finalize_blob_database( namespace_name, repo_name, blob_upload, digest, _finalize_blob_storage(blob_upload, digest), )