import logging import re from flask import make_response, url_for, request, redirect, Response, abort as flask_abort import resumablehashlib from app import storage, app from auth.registry_jwt_auth import process_registry_jwt_auth from data import model, database from digest import digest_tools from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream from endpoints.v2.errors import (BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported, NameUnknown) from endpoints.decorators import anon_protect from util.cache import cache_control from util.registry.filelike import wrap_with_handler, StreamSlice from util.registry.gzipstream import calculate_size_handler from util.registry.torrent import PieceHasher from endpoints.common import parse_repository_name from storage.basestorage import InvalidChunkException logger = logging.getLogger(__name__) BASE_BLOB_ROUTE = '//blobs/' BLOB_DIGEST_ROUTE = BASE_BLOB_ROUTE.format(digest_tools.DIGEST_PATTERN) RANGE_HEADER_REGEX = re.compile(r'^bytes=([0-9]+)-([0-9]+)$') BLOB_CONTENT_TYPE = 'application/octet-stream' class _InvalidRangeHeader(Exception): pass def _base_blob_fetch(namespace, repo_name, digest): """ Some work that is common to both GET and HEAD requests. Callers MUST check for proper authorization before calling this method. """ try: found = model.blob.get_repo_blob_by_digest(namespace, repo_name, digest) except model.BlobDoesNotExist: raise BlobUnknown() headers = { 'Docker-Content-Digest': digest, } # Add the Accept-Ranges header if the storage engine supports resumable # downloads. if storage.get_supports_resumable_downloads(found.locations): logger.debug('Storage supports resumable downloads') headers['Accept-Ranges'] = 'bytes' return found, headers @v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD']) @process_registry_jwt_auth @parse_repository_name @require_repo_read @anon_protect @cache_control(max_age=31436000) def check_blob_exists(namespace, repo_name, digest): found, headers = _base_blob_fetch(namespace, repo_name, digest) response = make_response('') response.headers.extend(headers) response.headers['Content-Length'] = found.image_size response.headers['Content-Type'] = BLOB_CONTENT_TYPE return response @v2_bp.route(BLOB_DIGEST_ROUTE, methods=['GET']) @process_registry_jwt_auth @parse_repository_name @require_repo_read @anon_protect @cache_control(max_age=31536000) def download_blob(namespace, repo_name, digest): found, headers = _base_blob_fetch(namespace, repo_name, digest) path = model.storage.get_layer_path(found) logger.debug('Looking up the direct download URL for path: %s', path) direct_download_url = storage.get_direct_download_url(found.locations, path) if direct_download_url: logger.debug('Returning direct download URL') resp = redirect(direct_download_url) resp.headers.extend(headers) return resp logger.debug('Streaming layer data') # Close the database handle here for this process before we send the long download. database.close_db_filter(None) headers['Content-Length'] = found.image_size headers['Content-Type'] = BLOB_CONTENT_TYPE return Response(storage.stream_read(found.locations, path), headers=headers) def _render_range(num_uploaded_bytes, with_bytes_prefix=True): return '{0}0-{1}'.format('bytes=' if with_bytes_prefix else '', num_uploaded_bytes - 1) @v2_bp.route('//blobs/uploads/', methods=['POST']) @process_registry_jwt_auth @parse_repository_name @require_repo_write @anon_protect def start_blob_upload(namespace, repo_name): location_name = storage.preferred_locations[0] new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name) try: model.blob.initiate_upload(namespace, repo_name, new_upload_uuid, location_name, upload_metadata) except database.Repository.DoesNotExist: raise NameUnknown() digest = request.args.get('digest', None) if digest is None: # The user will send the blob data in another request accepted = make_response('', 202) accepted.headers['Location'] = url_for('v2.upload_chunk', repository='%s/%s' % (namespace, repo_name), upload_uuid=new_upload_uuid) accepted.headers['Range'] = _render_range(0) accepted.headers['Docker-Upload-UUID'] = new_upload_uuid return accepted else: # The user plans to send us the entire body right now uploaded, error = _upload_chunk(namespace, repo_name, new_upload_uuid) uploaded.save() if error: _range_not_satisfiable(uploaded.byte_count) return _finish_upload(namespace, repo_name, uploaded, digest) @v2_bp.route('//blobs/uploads/', methods=['GET']) @process_registry_jwt_auth @parse_repository_name @require_repo_write @anon_protect def fetch_existing_upload(namespace, repo_name, upload_uuid): try: found = model.blob.get_blob_upload(namespace, repo_name, upload_uuid) except model.InvalidBlobUpload: raise BlobUploadUnknown() # Note: Docker byte ranges are exclusive so we have to add one to the byte count. accepted = make_response('', 204) accepted.headers['Range'] = _render_range(found.byte_count + 1) accepted.headers['Docker-Upload-UUID'] = upload_uuid return accepted def _current_request_path(): return '{0}{1}'.format(request.script_root, request.path) def _range_not_satisfiable(valid_end): invalid_range = make_response('', 416) invalid_range.headers['Location'] = _current_request_path() invalid_range.headers['Range'] = '0-{0}'.format(valid_end) invalid_range.headers['Docker-Upload-UUID'] = request.view_args['upload_uuid'] flask_abort(invalid_range) def _parse_range_header(range_header_text): """ Parses the range header, and returns a tuple of the start offset and the length, or raises an _InvalidRangeHeader exception. """ found = RANGE_HEADER_REGEX.match(range_header_text) if found is None: raise _InvalidRangeHeader() start = int(found.group(1)) length = int(found.group(2)) - start if length <= 0: raise _InvalidRangeHeader() return (start, length) def _upload_chunk(namespace, repo_name, upload_uuid): """ Common code among the various uploading paths for appending data to blobs. Callers MUST call .save() or .delete_instance() on the returned database object. """ try: found = model.blob.get_blob_upload(namespace, repo_name, upload_uuid) except model.InvalidBlobUpload: raise BlobUploadUnknown() start_offset, length = 0, -1 range_header = request.headers.get('range', None) if range_header is not None: try: start_offset, length = _parse_range_header(range_header) except _InvalidRangeHeader: _range_not_satisfiable(found.byte_count) if start_offset > 0 and start_offset > found.byte_count: _range_not_satisfiable(found.byte_count) location_set = {found.location.name} with database.CloseForLongOperation(app.config): input_fp = get_input_stream(request) if start_offset > 0 and start_offset < found.byte_count: # Skip the bytes which were received on a previous push, which are already stored and # included in the sha calculation overlap_size = found.byte_count - start_offset input_fp = StreamSlice(input_fp, overlap_size) # Update our upload bounds to reflect the skipped portion of the overlap start_offset = found.byte_count length = max(length - overlap_size, 0) # We use this to escape early in case we have already processed all of the bytes the user # wants to upload if length == 0: return found input_fp = wrap_with_handler(input_fp, found.sha_state.update) # Add a hasher for calculating SHA1s for torrents if this is the first chunk and/or we have # already calculated hash data for the previous chunk(s). piece_hasher = None if found.chunk_count == 0 or found.piece_sha_state: initial_sha1_value = found.piece_sha_state or resumablehashlib.sha1() initial_sha1_pieces_value = found.piece_hashes or '' piece_hasher = PieceHasher(app.config['TORRENT_PIECE_SIZE'], start_offset, initial_sha1_pieces_value, initial_sha1_value) input_fp = wrap_with_handler(input_fp, piece_hasher.update) # If this is the first chunk and we're starting at the 0 offset, add a handler to gunzip the # stream so we can determine the uncompressed size. We'll throw out this data if another chunk # comes in, but in the common case Docker only sends one chunk. size_info = None if start_offset == 0 and found.chunk_count == 0: size_info, fn = calculate_size_handler() input_fp = wrap_with_handler(input_fp, fn) try: length_written, new_metadata, error = storage.stream_upload_chunk(location_set, upload_uuid, start_offset, length, input_fp, found.storage_metadata, content_type=BLOB_CONTENT_TYPE) except InvalidChunkException: _range_not_satisfiable(found.byte_count) # If we determined an uncompressed size and this is the first chunk, add it to the blob. # Otherwise, we clear the size from the blob as it was uploaded in multiple chunks. if size_info is not None and found.chunk_count == 0 and size_info.is_valid: found.uncompressed_byte_count = size_info.uncompressed_size elif length_written > 0: # Otherwise, if we wrote some bytes and the above conditions were not met, then we don't # know the uncompressed size. found.uncompressed_byte_count = None if piece_hasher is not None: found.piece_hashes = piece_hasher.piece_hashes found.piece_sha_state = piece_hasher.hash_fragment found.storage_metadata = new_metadata found.byte_count += length_written found.chunk_count += 1 return found, error def _finish_upload(namespace, repo_name, upload_obj, expected_digest): # Verify that the digest's SHA matches that of the uploaded data. computed_digest = digest_tools.sha256_digest_from_hashlib(upload_obj.sha_state) if not digest_tools.digests_equal(computed_digest, expected_digest): raise BlobUploadInvalid() final_blob_location = digest_tools.content_path(expected_digest) # Move the storage into place, or if this was a re-upload, cancel it with database.CloseForLongOperation(app.config): already_exists = storage.exists({upload_obj.location.name}, final_blob_location) if already_exists: # It already existed, clean up our upload which served as proof that we had the file storage.cancel_chunked_upload({upload_obj.location.name}, upload_obj.uuid, upload_obj.storage_metadata) else: # We were the first ones to upload this image (at least to this location) # Let's copy it into place storage.complete_chunked_upload({upload_obj.location.name}, upload_obj.uuid, final_blob_location, upload_obj.storage_metadata) # Mark the blob as uploaded. blob_storage = model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest, upload_obj.location, upload_obj.byte_count, app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'], upload_obj.uncompressed_byte_count) if upload_obj.piece_sha_state is not None and not already_exists: piece_bytes = upload_obj.piece_hashes + upload_obj.piece_sha_state.digest() model.storage.save_torrent_info(blob_storage, app.config['TORRENT_PIECE_SIZE'], piece_bytes) # Delete the upload tracking row. upload_obj.delete_instance() response = make_response('', 201) response.headers['Docker-Content-Digest'] = expected_digest response.headers['Location'] = url_for('v2.download_blob', repository='%s/%s' % (namespace, repo_name), digest=expected_digest) return response @v2_bp.route('//blobs/uploads/', methods=['PATCH']) @process_registry_jwt_auth @parse_repository_name @require_repo_write @anon_protect def upload_chunk(namespace, repo_name, upload_uuid): upload, error = _upload_chunk(namespace, repo_name, upload_uuid) upload.save() if error: _range_not_satisfiable(upload.byte_count) accepted = make_response('', 204) accepted.headers['Location'] = _current_request_path() accepted.headers['Range'] = _render_range(upload.byte_count, with_bytes_prefix=False) accepted.headers['Docker-Upload-UUID'] = upload_uuid return accepted @v2_bp.route('//blobs/uploads/', methods=['PUT']) @process_registry_jwt_auth @parse_repository_name @require_repo_write @anon_protect def monolithic_upload_or_last_chunk(namespace, repo_name, upload_uuid): digest = request.args.get('digest', None) if digest is None: raise BlobUploadInvalid() found, error = _upload_chunk(namespace, repo_name, upload_uuid) if error: found.save() _range_not_satisfiable(found.byte_count) return _finish_upload(namespace, repo_name, found, digest) @v2_bp.route('//blobs/uploads/', methods=['DELETE']) @process_registry_jwt_auth @parse_repository_name @require_repo_write @anon_protect def cancel_upload(namespace, repo_name, upload_uuid): try: found = model.blob.get_blob_upload(namespace, repo_name, upload_uuid) except model.InvalidBlobUpload: raise BlobUploadUnknown() # We delete the record for the upload first, since if the partial upload in # storage fails to delete, it doesn't break anything found.delete_instance() storage.cancel_chunked_upload({found.location.name}, found.uuid, found.storage_metadata) return make_response('', 204) @v2_bp.route('//blobs/', methods=['DELETE']) @process_registry_jwt_auth @parse_repository_name @require_repo_write @anon_protect def delete_digest(namespace, repo_name, upload_uuid): # We do not support deleting arbitrary digests, as they break repo images. raise Unsupported()