This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/verbs.py

280 lines
10 KiB
Python
Raw Normal View History

import logging
import json
import hashlib
import uuid
2015-02-04 20:29:24 +00:00
from flask import redirect, Blueprint, abort, send_file, make_response
2015-02-04 20:29:24 +00:00
from app import app, signer
from auth.auth import process_auth
from auth.permissions import ReadRepositoryPermission
from data import model, database
from endpoints.trackhelper import track_and_log
from endpoints.decorators import anon_protect
from storage import Storage
from util.registry.queuefile import QueueFile
from util.registry.queueprocess import QueueProcess
from formats.squashed import SquashedDockerImage
from formats.aci import ACIImage
verbs = Blueprint('verbs', __name__)
logger = logging.getLogger(__name__)
def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image):
store = Storage(app)
# For performance reasons, we load the full image list here, cache it, then disconnect from
# the database.
with database.UseThenDisconnect(app.config):
image_list = list(model.image.get_parent_images(namespace, repository, repo_image))
2015-11-06 22:47:08 +00:00
image_list.insert(0, repo_image)
2015-11-03 19:43:47 +00:00
2015-11-17 23:25:43 +00:00
def get_image_json(image):
return json.loads(image.v1_json_metadata)
def get_next_image():
for current_image in image_list:
yield current_image
def get_next_layer():
for current_image_entry in image_list:
2015-08-18 15:53:48 +00:00
current_image_path = model.storage.get_layer_path(current_image_entry.storage)
current_image_stream = store.stream_read_file(current_image_entry.storage.locations,
current_image_path)
2014-11-24 21:07:38 +00:00
current_image_id = current_image_entry.id
2015-11-03 19:43:47 +00:00
logger.debug('Returning image layer %s (%s): %s', current_image_id,
current_image_entry.docker_image_id, current_image_path)
yield current_image_stream
stream = formatter.build_stream(namespace, repository, tag, synthetic_image_id, image_json,
2015-08-18 15:53:48 +00:00
get_next_image, get_next_layer, get_image_json)
return stream.read
2015-02-04 20:29:24 +00:00
def _sign_sythentic_image(verb, linked_storage_uuid, queue_file):
signature = None
try:
signature = signer.detached_sign(queue_file)
except:
logger.exception('Exception when signing %s image %s', verb, linked_storage_uuid)
return
# Setup the database (since this is a new process) and then disconnect immediately
# once the operation completes.
if not queue_file.raised_exception:
with database.UseThenDisconnect(app.config):
try:
derived = model.storage.get_storage_by_uuid(linked_storage_uuid)
except model.storage.InvalidImageException:
return
2015-02-04 20:29:24 +00:00
signature_entry = model.storage.find_or_create_storage_signature(derived, signer.name)
signature_entry.signature = signature
signature_entry.uploading = False
signature_entry.save()
2015-02-04 20:29:24 +00:00
def _write_synthetic_image_to_storage(verb, linked_storage_uuid, linked_locations, queue_file):
store = Storage(app)
def handle_exception(ex):
logger.debug('Exception when building %s image %s: %s', verb, linked_storage_uuid, ex)
2014-11-24 21:07:38 +00:00
with database.UseThenDisconnect(app.config):
model.storage.delete_derived_storage_by_uuid(linked_storage_uuid)
queue_file.add_exception_handler(handle_exception)
2015-08-18 15:53:48 +00:00
image_path = store.v1_image_layer_path(linked_storage_uuid)
store.stream_write(linked_locations, image_path, queue_file)
queue_file.close()
if not queue_file.raised_exception:
# Setup the database (since this is a new process) and then disconnect immediately
# once the operation completes.
with database.UseThenDisconnect(app.config):
done_uploading = model.storage.get_storage_by_uuid(linked_storage_uuid)
done_uploading.uploading = False
done_uploading.save()
2015-02-04 20:29:24 +00:00
def _verify_repo_verb(store, namespace, repository, tag, verb, checker=None):
permission = ReadRepositoryPermission(namespace, repository)
2015-02-04 20:29:24 +00:00
if not permission.can() and not model.repository.repository_is_public(namespace, repository):
2015-02-04 20:29:24 +00:00
abort(403)
# Lookup the requested tag.
try:
tag_image = model.tag.get_tag_image(namespace, repository, tag)
2015-02-04 20:29:24 +00:00
except model.DataModelException:
abort(404)
# Lookup the tag's image and storage.
repo_image = model.image.get_repo_image_extended(namespace, repository, tag_image.docker_image_id)
2015-02-04 20:29:24 +00:00
if not repo_image:
abort(404)
# If there is a data checker, call it first.
image_json = None
if checker is not None:
image_json = json.loads(repo_image.v1_json_metadata)
2015-02-04 20:29:24 +00:00
if not checker(image_json):
logger.debug('Check mismatch on %s/%s:%s, verb %s', namespace, repository, tag, verb)
abort(404)
2015-02-04 20:29:24 +00:00
return (repo_image, tag_image, image_json)
2015-02-04 20:29:24 +00:00
def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwargs):
# Verify that the image exists and that we have access to it.
store = Storage(app)
result = _verify_repo_verb(store, namespace, repository, tag, verb, checker)
(repo_image, tag_image, image_json) = result
2015-02-04 20:29:24 +00:00
# Lookup the derived image storage for the verb.
derived = model.storage.find_derived_storage(repo_image.storage, verb)
2015-02-04 20:29:24 +00:00
if derived is None or derived.uploading:
return make_response('', 202)
2015-02-04 20:29:24 +00:00
# Check if we have a valid signer configured.
if not signer.name:
abort(404)
2015-02-04 20:29:24 +00:00
# Lookup the signature for the verb.
signature_entry = model.storage.lookup_storage_signature(derived, signer.name)
2015-02-04 20:29:24 +00:00
if signature_entry is None:
abort(404)
2015-02-04 20:29:24 +00:00
# Return the signature.
return make_response(signature_entry.signature)
2015-02-04 20:29:24 +00:00
def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=None, **kwargs):
# Verify that the image exists and that we have access to it.
store = Storage(app)
result = _verify_repo_verb(store, namespace, repository, tag, verb, checker)
(repo_image, tag_image, image_json) = result
2015-02-04 20:29:24 +00:00
# Log the action.
track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, **kwargs)
2015-02-04 20:29:24 +00:00
# Lookup/create the derived image storage for the verb.
#derived = model.storage.find_or_create_derived_storage(repo_image.storage, verb,
# store.preferred_locations[0])
#if not derived.uploading:
# logger.debug('Derived %s image %s exists in storage', verb, derived.uuid)
# derived_layer_path = model.storage.get_layer_path(derived)
# download_url = store.get_direct_download_url(derived.locations, derived_layer_path)
# if download_url:
# logger.debug('Redirecting to download URL for derived %s image %s', verb, derived.uuid)
# return redirect(download_url)
# # Close the database handle here for this process before we send the long download.
# database.close_db_filter(None)
# logger.debug('Sending cached derived %s image %s', verb, derived.uuid)
# return send_file(store.stream_read_file(derived.locations, derived_layer_path))
derived_uuid = str(uuid.uuid4())
logger.debug('Building and returning derived %s image %s', verb, derived_uuid)
2015-02-04 20:29:24 +00:00
# Load the image's JSON layer.
if not image_json:
image_json = json.loads(repo_image.v1_json_metadata)
2015-02-04 20:29:24 +00:00
# Calculate a synthetic image ID.
synthetic_image_id = hashlib.sha256(tag_image.docker_image_id + ':' + verb).hexdigest()
def _cleanup():
# Close any existing DB connection once the process has exited.
database.close_db_filter(None)
2014-11-05 17:27:38 +00:00
2015-02-04 20:29:24 +00:00
# Create a queue process to generate the data. The queue files will read from the process
# and send the results to the client and storage.
args = (formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image)
2015-02-04 20:29:24 +00:00
queue_process = QueueProcess(_open_stream,
8 * 1024, 10 * 1024 * 1024, # 8K/10M chunk/max
args, finished=_cleanup)
2015-02-04 20:29:24 +00:00
client_queue_file = QueueFile(queue_process.create_queue(), 'client')
#storage_queue_file = QueueFile(queue_process.create_queue(), 'storage')
2015-02-04 20:29:24 +00:00
# If signing is required, add a QueueFile for signing the image as we stream it out.
#signing_queue_file = None
#if sign and signer.name:
# signing_queue_file = QueueFile(queue_process.create_queue(), 'signing')
2015-02-04 20:29:24 +00:00
# Start building.
queue_process.run()
# Start the storage saving.
#storage_args = (verb, derived_uuid, derived.locations, storage_queue_file)
#QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args, finished=_cleanup)
2015-02-04 20:29:24 +00:00
#if sign and signer.name:
# signing_args = (verb, derived_uuid, signing_queue_file)
# QueueProcess.run_process(_sign_sythentic_image, signing_args, finished=_cleanup)
2015-02-04 20:29:24 +00:00
# Close the database handle here for this process before we send the long download.
database.close_db_filter(None)
# Return the client's data.
return send_file(client_queue_file)
def os_arch_checker(os, arch):
def checker(image_json):
# Verify the architecture and os.
operating_system = image_json.get('os', 'linux')
if operating_system != os:
return False
architecture = image_json.get('architecture', 'amd64')
2015-02-06 17:22:27 +00:00
# Note: Some older Docker images have 'x86_64' rather than 'amd64'.
# We allow the conversion here.
if architecture == 'x86_64' and operating_system == 'linux':
architecture = 'amd64'
if architecture != arch:
return False
return True
2015-02-04 20:29:24 +00:00
return checker
@anon_protect
2015-02-04 20:29:24 +00:00
@verbs.route('/aci/<server>/<namespace>/<repository>/<tag>/sig/<os>/<arch>/', methods=['GET'])
@verbs.route('/aci/<server>/<namespace>/<repository>/<tag>/aci.asc/<os>/<arch>/', methods=['GET'])
2015-02-04 20:29:24 +00:00
@process_auth
def get_aci_signature(server, namespace, repository, tag, os, arch):
return _repo_verb_signature(namespace, repository, tag, 'aci', checker=os_arch_checker(os, arch),
os=os, arch=arch)
@anon_protect
2015-02-04 20:29:24 +00:00
@verbs.route('/aci/<server>/<namespace>/<repository>/<tag>/aci/<os>/<arch>/', methods=['GET'])
@process_auth
def get_aci_image(server, namespace, repository, tag, os, arch):
return _repo_verb(namespace, repository, tag, 'aci', ACIImage(),
2015-02-04 20:29:24 +00:00
sign=True, checker=os_arch_checker(os, arch), os=os, arch=arch)
@anon_protect
@verbs.route('/squash/<namespace>/<repository>/<tag>', methods=['GET'])
@process_auth
def get_squashed_tag(namespace, repository, tag):
return _repo_verb(namespace, repository, tag, 'squash', SquashedDockerImage())