This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/v2/blob.py

78 lines
2.8 KiB
Python
Raw Normal View History

import logging
from flask import make_response, url_for, request
from app import storage, app
from data import model
from digest import digest_tools
from endpoints.v2 import v2_bp, require_repo_read, require_repo_write, get_input_stream
from auth.jwt_auth import process_jwt_auth
from endpoints.decorators import anon_protect
from util.http import abort
logger = logging.getLogger(__name__)
@v2_bp.route('/<namespace>/<repo_name>/blobs/<regex("' + digest_tools.DIGEST_PATTERN + '"):digest>',
methods=['HEAD'])
@process_jwt_auth
@require_repo_read
@anon_protect
def check_blob_existence(namespace, repo_name, digest):
try:
found = model.blob.get_repo_blob_by_digest(namespace, repo_name, digest)
# The response body must be empty for a successful HEAD request
return make_response('')
except model.BlobDoesNotExist:
abort(404)
@v2_bp.route('/<namespace>/<repo_name>/blobs/<regex("' + digest_tools.DIGEST_PATTERN + '"):digest>',
methods=['GET'])
@process_jwt_auth
@require_repo_read
@anon_protect
def download_blob(namespace, repo_name, digest):
return make_response('')
@v2_bp.route('/<namespace>/<repo_name>/blobs/uploads/', methods=['POST'])
@process_jwt_auth
@require_repo_write
@anon_protect
def start_blob_upload(namespace, repo_name):
new_upload_uuid = storage.initiate_chunked_upload(storage.preferred_locations[0])
accepted = make_response('', 202)
accepted.headers['Location'] = url_for('v2.upload_chunk', namespace=namespace,
repo_name=repo_name, upload_uuid=new_upload_uuid)
accepted.headers['Range'] = 'bytes=0-0'
accepted.headers['Docker-Upload-UUID'] = new_upload_uuid
return accepted
@v2_bp.route('/<namespace>/<repo_name>/blobs/uploads/<upload_uuid>', methods=['PUT'])
@process_jwt_auth
@require_repo_write
@anon_protect
def upload_chunk(namespace, repo_name, upload_uuid):
digest = request.args.get('digest', None)
upload_location = storage.preferred_locations[0]
bytes_written = storage.stream_upload_chunk(upload_location, upload_uuid, 0, -1,
get_input_stream(request))
if digest is not None:
final_blob_location = digest_tools.content_path(digest)
storage.complete_chunked_upload(upload_location, upload_uuid, final_blob_location, digest)
model.blob.store_blob_record_and_temp_link(namespace, repo_name, digest, upload_location,
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'])
response = make_response('', 201)
response.headers['Docker-Content-Digest'] = digest
response.headers['Location'] = url_for('v2.download_blob', namespace=namespace,
repo_name=repo_name, digest=digest)
return response
return make_response('', 202)