This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/verbs/__init__.py
Joseph Schorr f67e2baeba Fix verbs for recent storage change
1) Initialize the storage class in verbs with the extra needed args
2) Make the CloudFrontedS3Storage resilient to those extra args being missing
2017-10-07 00:17:10 -04:00

383 lines
14 KiB
Python

import logging
import hashlib
from flask import redirect, Blueprint, abort, send_file, make_response, request
import features
from app import app, signer, storage, metric_queue, license_validator, config_provider, ip_resolver
from auth.auth_context import get_authenticated_user
from auth.decorators import process_auth
from auth.permissions import ReadRepositoryPermission
from data import database
from endpoints.decorators import anon_protect, route_show_if, parse_repository_name
from endpoints.verbs.models_pre_oci import pre_oci_model as model
from endpoints.v2.blob import BLOB_DIGEST_ROUTE
from image.appc import AppCImageFormatter
from image.docker.squashed import SquashedDockerImageFormatter
from storage import Storage
from util.audit import track_and_log
from util.http import exact_abort
from util.registry.filelike import wrap_with_handler
from util.registry.queuefile import QueueFile
from util.registry.queueprocess import QueueProcess
from util.registry.torrent import (
make_torrent, per_user_torrent_filename, public_torrent_filename, PieceHasher)
logger = logging.getLogger(__name__)
verbs = Blueprint('verbs', __name__)
license_validator.enforce_license_before_request(verbs)
LAYER_MIMETYPE = 'binary/octet-stream'
def _open_stream(formatter, repo_image, tag, derived_image_id, handlers):
"""
This method generates a stream of data which will be replicated and read from the queue files.
This method runs in a separate process.
"""
# For performance reasons, we load the full image list here, cache it, then disconnect from
# the database.
with database.UseThenDisconnect(app.config):
image_list = list(model.get_manifest_layers_with_blobs(repo_image))
def get_next_image():
for current_image in image_list:
yield current_image
def get_next_layer():
# Re-Initialize the storage engine because some may not respond well to forking (e.g. S3)
store = Storage(app, metric_queue, config_provider=config_provider, ip_resolver=ip_resolver)
for current_image in image_list:
current_image_path = model.get_blob_path(current_image.blob)
current_image_stream = store.stream_read_file(current_image.blob.locations,
current_image_path)
logger.debug('Returning image layer %s: %s', current_image.image_id, current_image_path)
yield current_image_stream
stream = formatter.build_stream(repo_image, tag, derived_image_id, get_next_image,
get_next_layer)
for handler_fn in handlers:
stream = wrap_with_handler(stream, handler_fn)
return stream.read
def _sign_derived_image(verb, derived_image, queue_file):
""" Read from the queue file and sign the contents which are generated. This method runs in a
separate process. """
signature = None
try:
signature = signer.detached_sign(queue_file)
except:
logger.exception('Exception when signing %s deriving image %s', verb, derived_image.ref)
return
# Setup the database (since this is a new process) and then disconnect immediately
# once the operation completes.
if not queue_file.raised_exception:
with database.UseThenDisconnect(app.config):
model.set_derived_image_signature(derived_image, signer.name, signature)
def _write_derived_image_to_storage(verb, derived_image, queue_file):
""" Read from the generated stream and write it back to the storage engine. This method runs in a
separate process.
"""
def handle_exception(ex):
logger.debug('Exception when building %s derived image %s: %s', verb, derived_image.ref, ex)
with database.UseThenDisconnect(app.config):
model.delete_derived_image(derived_image)
queue_file.add_exception_handler(handle_exception)
# Re-Initialize the storage engine because some may not respond well to forking (e.g. S3)
store = Storage(app, metric_queue, config_provider=config_provider, ip_resolver=ip_resolver)
image_path = model.get_blob_path(derived_image.blob)
store.stream_write(derived_image.blob.locations, image_path, queue_file)
queue_file.close()
def _torrent_for_blob(blob, is_public):
""" Returns a response containing the torrent file contents for the given blob. May abort
with an error if the state is not valid (e.g. non-public, non-user request).
"""
# Make sure the storage has a size.
if not blob.size:
abort(404)
# Lookup the torrent information for the storage.
torrent_info = model.get_torrent_info(blob)
if torrent_info is None:
abort(404)
# Lookup the webseed path for the storage.
path = model.get_blob_path(blob)
webseed = storage.get_direct_download_url(blob.locations, path,
expires_in=app.config['BITTORRENT_WEBSEED_LIFETIME'])
if webseed is None:
# We cannot support webseeds for storages that cannot provide direct downloads.
exact_abort(501, 'Storage engine does not support seeding.')
# Build the filename for the torrent.
if is_public:
name = public_torrent_filename(blob.uuid)
else:
user = get_authenticated_user()
if not user:
abort(403)
name = per_user_torrent_filename(user.uuid, blob.uuid)
# Return the torrent file.
torrent_file = make_torrent(name, webseed, blob.size, torrent_info.piece_length,
torrent_info.pieces)
headers = {
'Content-Type': 'application/x-bittorrent',
'Content-Disposition': 'attachment; filename={0}.torrent'.format(name)}
return make_response(torrent_file, 200, headers)
def _torrent_repo_verb(repo_image, tag, verb, **kwargs):
""" Handles returning a torrent for the given verb on the given image and tag. """
if not features.BITTORRENT:
# Torrent feature is not enabled.
abort(406)
# Lookup an *existing* derived storage for the verb. If the verb's image storage doesn't exist,
# we cannot create it here, so we 406.
derived_image = model.lookup_derived_image(repo_image, verb, varying_metadata={'tag': tag})
if derived_image is None:
abort(406)
# Return the torrent.
repo = model.get_repository(repo_image.repository.namespace_name, repo_image.repository.name)
repo_is_public = repo is not None and repo.is_public
torrent = _torrent_for_blob(derived_image.blob, repo_is_public)
# Log the action.
track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, torrent=True, **kwargs)
return torrent
def _verify_repo_verb(_, namespace, repo_name, tag, verb, checker=None):
permission = ReadRepositoryPermission(namespace, repo_name)
repo = model.get_repository(namespace, repo_name)
repo_is_public = repo is not None and repo.is_public
if not permission.can() and not repo_is_public:
abort(403)
# Lookup the requested tag.
tag_image = model.get_tag_image(namespace, repo_name, tag)
if tag_image is None:
abort(404)
if repo is not None and repo.kind != 'image':
abort(405)
# If there is a data checker, call it first.
if checker is not None:
if not checker(tag_image):
logger.debug('Check mismatch on %s/%s:%s, verb %s', namespace, repo_name, tag, verb)
abort(404)
return tag_image
def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwargs):
# Verify that the image exists and that we have access to it.
repo_image = _verify_repo_verb(storage, namespace, repository, tag, verb, checker)
# derived_image the derived image storage for the verb.
derived_image = model.lookup_derived_image(repo_image, verb, varying_metadata={'tag': tag})
if derived_image is None or derived_image.blob.uploading:
return make_response('', 202)
# Check if we have a valid signer configured.
if not signer.name:
abort(404)
# Lookup the signature for the verb.
signature_value = model.get_derived_image_signature(derived_image, signer.name)
if signature_value is None:
abort(404)
# Return the signature.
return make_response(signature_value)
def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=None, **kwargs):
# Verify that the image exists and that we have access to it.
repo_image = _verify_repo_verb(storage, namespace, repository, tag, verb, checker)
# Check for torrent. If found, we return a torrent for the repo verb image (if the derived
# image already exists).
if request.accept_mimetypes.best == 'application/x-bittorrent':
metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb + '+torrent', True])
return _torrent_repo_verb(repo_image, tag, verb, **kwargs)
# Log the action.
track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, **kwargs)
metric_queue.repository_pull.Inc(labelvalues=[namespace, repository, verb, True])
# Lookup/create the derived image for the verb and repo image.
derived_image = model.lookup_or_create_derived_image(
repo_image, verb, storage.preferred_locations[0], varying_metadata={'tag': tag})
if not derived_image.blob.uploading:
logger.debug('Derived %s image %s exists in storage', verb, derived_image.ref)
derived_layer_path = model.get_blob_path(derived_image.blob)
is_head_request = request.method == 'HEAD'
download_url = storage.get_direct_download_url(derived_image.blob.locations,
derived_layer_path, head=is_head_request)
if download_url:
logger.debug('Redirecting to download URL for derived %s image %s', verb, derived_image.ref)
return redirect(download_url)
# Close the database handle here for this process before we send the long download.
database.close_db_filter(None)
logger.debug('Sending cached derived %s image %s', verb, derived_image.ref)
return send_file(
storage.stream_read_file(derived_image.blob.locations, derived_layer_path),
mimetype=LAYER_MIMETYPE)
logger.debug('Building and returning derived %s image %s', verb, derived_image.ref)
# Calculate a derived image ID.
derived_image_id = hashlib.sha256(repo_image.image_id + ':' + verb).hexdigest()
def _cleanup():
# Close any existing DB connection once the process has exited.
database.close_db_filter(None)
hasher = PieceHasher(app.config['BITTORRENT_PIECE_SIZE'])
def _store_metadata_and_cleanup():
with database.UseThenDisconnect(app.config):
model.set_torrent_info(derived_image.blob, app.config['BITTORRENT_PIECE_SIZE'],
hasher.final_piece_hashes())
model.set_blob_size(derived_image.blob, hasher.hashed_bytes)
# Create a queue process to generate the data. The queue files will read from the process
# and send the results to the client and storage.
handlers = [hasher.update]
args = (formatter, repo_image, tag, derived_image_id, handlers)
queue_process = QueueProcess(
_open_stream,
8 * 1024,
10 * 1024 * 1024, # 8K/10M chunk/max
args,
finished=_store_metadata_and_cleanup)
client_queue_file = QueueFile(queue_process.create_queue(), 'client')
storage_queue_file = QueueFile(queue_process.create_queue(), 'storage')
# If signing is required, add a QueueFile for signing the image as we stream it out.
signing_queue_file = None
if sign and signer.name:
signing_queue_file = QueueFile(queue_process.create_queue(), 'signing')
# Start building.
queue_process.run()
# Start the storage saving.
storage_args = (verb, derived_image, storage_queue_file)
QueueProcess.run_process(_write_derived_image_to_storage, storage_args, finished=_cleanup)
if sign and signer.name:
signing_args = (verb, derived_image, signing_queue_file)
QueueProcess.run_process(_sign_derived_image, signing_args, finished=_cleanup)
# Close the database handle here for this process before we send the long download.
database.close_db_filter(None)
# Return the client's data.
return send_file(client_queue_file, mimetype=LAYER_MIMETYPE)
def os_arch_checker(os, arch):
def checker(repo_image):
image_json = repo_image.compat_metadata
# Verify the architecture and os.
operating_system = image_json.get('os', 'linux')
if operating_system != os:
return False
architecture = image_json.get('architecture', 'amd64')
# Note: Some older Docker images have 'x86_64' rather than 'amd64'.
# We allow the conversion here.
if architecture == 'x86_64' and operating_system == 'linux':
architecture = 'amd64'
if architecture != arch:
return False
return True
return checker
@route_show_if(features.ACI_CONVERSION)
@anon_protect
@verbs.route('/aci/<server>/<namespace>/<repository>/<tag>/sig/<os>/<arch>/', methods=['GET'])
@verbs.route('/aci/<server>/<namespace>/<repository>/<tag>/aci.asc/<os>/<arch>/', methods=['GET'])
@process_auth
def get_aci_signature(server, namespace, repository, tag, os, arch):
return _repo_verb_signature(namespace, repository, tag, 'aci', checker=os_arch_checker(os, arch),
os=os, arch=arch)
@route_show_if(features.ACI_CONVERSION)
@anon_protect
@verbs.route('/aci/<server>/<namespace>/<repository>/<tag>/aci/<os>/<arch>/', methods=[
'GET', 'HEAD'])
@process_auth
def get_aci_image(server, namespace, repository, tag, os, arch):
return _repo_verb(namespace, repository, tag, 'aci',
AppCImageFormatter(), sign=True, checker=os_arch_checker(os, arch), os=os,
arch=arch)
@anon_protect
@verbs.route('/squash/<namespace>/<repository>/<tag>', methods=['GET'])
@process_auth
def get_squashed_tag(namespace, repository, tag):
return _repo_verb(namespace, repository, tag, 'squash', SquashedDockerImageFormatter())
@route_show_if(features.BITTORRENT)
@anon_protect
@verbs.route('/torrent{0}'.format(BLOB_DIGEST_ROUTE), methods=['GET'])
@process_auth
@parse_repository_name()
def get_tag_torrent(namespace_name, repo_name, digest):
repo = model.get_repository(namespace_name, repo_name)
repo_is_public = repo is not None and repo.is_public
permission = ReadRepositoryPermission(namespace_name, repo_name)
if not permission.can() and not repo_is_public:
abort(403)
user = get_authenticated_user()
if user is None and not repo_is_public:
# We can not generate a private torrent cluster without a user uuid (e.g. token auth)
abort(403)
if repo is not None and repo.kind != 'image':
abort(405)
blob = model.get_repo_blob_by_digest(namespace_name, repo_name, digest)
if blob is None:
abort(404)
metric_queue.repository_pull.Inc(labelvalues=[namespace_name, repo_name, 'torrent', True])
return _torrent_for_blob(blob, repo_is_public)