This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.

326 lines
11 KiB
Raw Normal View History

import logging
import re
from flask import make_response, url_for, request, redirect, Response, abort as flask_abort
from app import storage, app
from data import model, database
from digest import digest_tools
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
from endpoints.v2.errors import (BlobUnknown, BlobUploadInvalid, BlobUploadUnknown, Unsupported,
from auth.jwt_auth import process_jwt_auth
from endpoints.decorators import anon_protect
from util.cache import cache_control
from util.registry.filelike import wrap_with_handler, StreamSlice
from storage.basestorage import InvalidChunkException
logger = logging.getLogger(__name__)
BASE_BLOB_ROUTE = '/<namespace>/<repo_name>/blobs/<regex("{0}"):digest>'
RANGE_HEADER_REGEX = re.compile(r'^bytes=([0-9]+)-([0-9]+)$')
2015-11-30 15:45:45 -05:00
BLOB_CONTENT_TYPE = 'application/octet-stream'
class _InvalidRangeHeader(Exception):
def _base_blob_fetch(namespace, repo_name, digest):
""" Some work that is common to both GET and HEAD requests. Callers MUST check for proper
authorization before calling this method.
found = model.blob.get_repo_blob_by_digest(namespace, repo_name, digest)
except model.BlobDoesNotExist:
raise BlobUnknown()
headers = {
'Docker-Content-Digest': digest,
# Add the Accept-Ranges header if the storage engine supports resumable
# downloads.
2015-08-27 14:55:33 -04:00
if storage.get_supports_resumable_downloads(found.locations):
logger.debug('Storage supports resumable downloads')
headers['Accept-Ranges'] = 'bytes'
return found, headers
@v2_bp.route(BLOB_DIGEST_ROUTE, methods=['HEAD'])
def check_blob_exists(namespace, repo_name, digest):
found, headers = _base_blob_fetch(namespace, repo_name, digest)
response = make_response('')
response.headers['Content-Length'] = found.image_size
2015-11-30 15:45:45 -05:00
response.headers['Content-Type'] = BLOB_CONTENT_TYPE
return response
@v2_bp.route(BLOB_DIGEST_ROUTE, methods=['GET'])
def download_blob(namespace, repo_name, digest):
found, headers = _base_blob_fetch(namespace, repo_name, digest)
path =
logger.debug('Looking up the direct download URL for path: %s', path)
direct_download_url = storage.get_direct_download_url(found.locations, path)
if direct_download_url:
logger.debug('Returning direct download URL')
resp = redirect(direct_download_url)
return resp
logger.debug('Streaming layer data')
# Close the database handle here for this process before we send the long download.
headers['Content-Length'] = found.image_size
2015-11-30 15:45:45 -05:00
headers['Content-Type'] = BLOB_CONTENT_TYPE
return Response(storage.stream_read(found.locations, path), headers=headers)
2015-08-13 15:43:49 -04:00
def _render_range(num_uploaded_bytes, with_bytes_prefix=True):
return '{0}0-{1}'.format('bytes=' if with_bytes_prefix else '', num_uploaded_bytes - 1)
@v2_bp.route('/<namespace>/<repo_name>/blobs/uploads/', methods=['POST'])
def start_blob_upload(namespace, repo_name):
location_name = storage.preferred_locations[0]
new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name)
model.blob.initiate_upload(namespace, repo_name, new_upload_uuid, location_name,
except database.Repository.DoesNotExist:
raise NameUnknown()
digest = request.args.get('digest', None)
if digest is None:
# The user will send the blob data in another request
accepted = make_response('', 202)
accepted.headers['Location'] = url_for('v2.upload_chunk', namespace=namespace,
repo_name=repo_name, upload_uuid=new_upload_uuid)
accepted.headers['Range'] = _render_range(0)
accepted.headers['Docker-Upload-UUID'] = new_upload_uuid
return accepted
# The user plans to send us the entire body right now
2015-08-13 15:43:49 -04:00
uploaded = _upload_chunk(namespace, repo_name, new_upload_uuid)
return _finish_upload(namespace, repo_name, uploaded, digest)
@v2_bp.route('/<namespace>/<repo_name>/blobs/uploads/<upload_uuid>', methods=['GET'])
def fetch_existing_upload(namespace, repo_name, upload_uuid):
found = model.blob.get_blob_upload(namespace, repo_name, upload_uuid)
except model.InvalidBlobUpload:
raise BlobUploadUnknown()
# Note: Docker byte ranges are exclusive so we have to add one to the byte count.
accepted = make_response('', 204)
accepted.headers['Range'] = _render_range(found.byte_count + 1)
accepted.headers['Docker-Upload-UUID'] = upload_uuid
return accepted
def _current_request_path():
return '{0}{1}'.format(request.script_root, request.path)
def _range_not_satisfiable(valid_end):
invalid_range = make_response('', 416)
invalid_range.headers['Location'] = _current_request_path()
invalid_range.headers['Range'] = '0-{0}'.format(valid_end)
invalid_range.headers['Docker-Upload-UUID'] = request.view_args['upload_uuid']
def _parse_range_header(range_header_text):
""" Parses the range header, and returns a tuple of the start offset and the length,
or raises an _InvalidRangeHeader exception.
found = RANGE_HEADER_REGEX.match(range_header_text)
if found is None:
raise _InvalidRangeHeader()
start = int(
length = int( - start
if length <= 0:
raise _InvalidRangeHeader()
return (start, length)
2015-08-13 15:43:49 -04:00
def _upload_chunk(namespace, repo_name, upload_uuid):
""" Common code among the various uploading paths for appending data to blobs.
Callers MUST call .save() or .delete_instance() on the returned database object.
found = model.blob.get_blob_upload(namespace, repo_name, upload_uuid)
except model.InvalidBlobUpload:
raise BlobUploadUnknown()
start_offset, length = 0, -1
range_header = request.headers.get('range', None)
if range_header is not None:
start_offset, length = _parse_range_header(range_header)
except _InvalidRangeHeader:
if start_offset > 0 and start_offset > found.byte_count:
location_set = {}
with database.CloseForLongOperation(app.config):
input_fp = get_input_stream(request)
if start_offset > 0 and start_offset < found.byte_count:
# Skip the bytes which were received on a previous push, which are already stored and
# included in the sha calculation
overlap_size = found.byte_count - start_offset
input_fp = StreamSlice(input_fp, overlap_size)
# Update our upload bounds to reflect the skipped portion of the overlap
start_offset = found.byte_count
length = max(length - overlap_size, 0)
# We use this to escape early in case we have already processed all of the bytes the user
# wants to upload
if length == 0:
return found
input_fp = wrap_with_handler(input_fp, found.sha_state.update)
length_written, new_metadata = storage.stream_upload_chunk(location_set, upload_uuid,
start_offset, length, input_fp,
except InvalidChunkException:
found.storage_metadata = new_metadata
found.byte_count += length_written
return found
def _finish_upload(namespace, repo_name, upload_obj, expected_digest):
# Verify that the digest's SHA matches that of the uploaded data.
computed_digest = digest_tools.sha256_digest_from_hashlib(upload_obj.sha_state)
if not digest_tools.digests_equal(computed_digest, expected_digest):
raise BlobUploadInvalid()
final_blob_location = digest_tools.content_path(expected_digest)
# Move the storage into place, or if this was a re-upload, cancel it
with database.CloseForLongOperation(app.config):
if storage.exists({}, final_blob_location):
# It already existed, clean up our upload which served as proof that we had the file
storage.cancel_chunked_upload({}, upload_obj.uuid,
# We were the first ones to upload this image (at least to this location)
# Let's copy it into place
storage.complete_chunked_upload({}, upload_obj.uuid,
final_blob_location, upload_obj.storage_metadata)
# Mark the blob as uploaded.
model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest,
upload_obj.location, upload_obj.byte_count,
# Delete the upload tracking row.
response = make_response('', 201)
response.headers['Docker-Content-Digest'] = expected_digest
response.headers['Location'] = url_for('v2.download_blob', namespace=namespace,
repo_name=repo_name, digest=expected_digest)
return response
@v2_bp.route('/<namespace>/<repo_name>/blobs/uploads/<upload_uuid>', methods=['PATCH'])
def upload_chunk(namespace, repo_name, upload_uuid):
2015-08-13 15:43:49 -04:00
upload = _upload_chunk(namespace, repo_name, upload_uuid)
accepted = make_response('', 204)
accepted.headers['Location'] = _current_request_path()
accepted.headers['Range'] = _render_range(upload.byte_count, with_bytes_prefix=False)
accepted.headers['Docker-Upload-UUID'] = upload_uuid
return accepted
@v2_bp.route('/<namespace>/<repo_name>/blobs/uploads/<upload_uuid>', methods=['PUT'])
def monolithic_upload_or_last_chunk(namespace, repo_name, upload_uuid):
digest = request.args.get('digest', None)
if digest is None:
raise BlobUploadInvalid()
2015-08-13 15:43:49 -04:00
found = _upload_chunk(namespace, repo_name, upload_uuid)
return _finish_upload(namespace, repo_name, found, digest)
@v2_bp.route('/<namespace>/<repo_name>/blobs/uploads/<upload_uuid>', methods=['DELETE'])
def cancel_upload(namespace, repo_name, upload_uuid):
found = model.blob.get_blob_upload(namespace, repo_name, upload_uuid)
except model.InvalidBlobUpload:
raise BlobUploadUnknown()
# We delete the record for the upload first, since if the partial upload in
# storage fails to delete, it doesn't break anything
storage.cancel_chunked_upload({}, found.uuid, found.storage_metadata)
return make_response('', 204)
@v2_bp.route('/<namespace>/<repo_name>/blobs/<digest>', methods=['DELETE'])
def delete_digest(namespace, repo_name, upload_uuid):
# We do not support deleting arbitrary digests, as they break repo images.
raise Unsupported()