This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/verbs.py

134 lines
5.1 KiB
Python

import logging
import json
import hashlib
from flask import redirect, Blueprint, abort, send_file
from app import app
from auth.auth import process_auth
from auth.permissions import ReadRepositoryPermission
from data import model
from data import database
from storage import Storage
from util.queuefile import QueueFile
from util.queueprocess import QueueProcess
from util.gzipwrap import GzipWrap
from util.dockerloadformat import build_docker_load_stream
verbs = Blueprint('verbs', __name__)
logger = logging.getLogger(__name__)
def _open_stream(namespace, repository, tag, synthetic_image_id, image_json, image_list):
store = Storage(app)
def get_next_image():
for current_image_id in image_list:
yield model.get_repo_image(namespace, repository, current_image_id)
def get_next_layer():
for current_image_id in image_list:
current_image_entry = model.get_repo_image(namespace, repository, current_image_id)
current_image_path = store.image_layer_path(current_image_entry.storage.uuid)
current_image_stream = store.stream_read_file(current_image_entry.storage.locations,
current_image_path)
logger.debug('Returning image layer %s: %s' % (current_image_id, current_image_path))
yield current_image_stream
database.configure(app.config)
stream = build_docker_load_stream(namespace, repository, tag, synthetic_image_id, image_json,
get_next_image, get_next_layer)
return stream.read
def _write_synthetic_image_to_storage(linked_storage_uuid, linked_locations, queue_file):
database.configure(app.config)
store = Storage(app)
def handle_exception(ex):
logger.debug('Exception when building squashed image %s: %s', linked_storage_uuid, ex)
model.delete_derived_storage_by_uuid(linked_storage_uuid)
queue_file.add_exception_handler(handle_exception)
image_path = store.image_layer_path(linked_storage_uuid)
store.stream_write(linked_locations, image_path, queue_file)
queue_file.close()
if not queue_file.raised_exception:
done_uploading = model.get_storage_by_uuid(linked_storage_uuid)
done_uploading.uploading = False
done_uploading.save()
@verbs.route('/squash/<namespace>/<repository>/<tag>', methods=['GET'])
@process_auth
def get_squashed_tag(namespace, repository, tag):
permission = ReadRepositoryPermission(namespace, repository)
if permission.can() or model.repository_is_public(namespace, repository):
# Lookup the requested tag.
try:
tag_image = model.get_tag_image(namespace, repository, tag)
except model.DataModelException:
abort(404)
# Lookup the tag's image and storage.
repo_image = model.get_repo_image(namespace, repository, tag_image.docker_image_id)
if not repo_image:
abort(404)
store = Storage(app)
derived = model.find_or_create_derived_storage(repo_image.storage, 'squash',
store.preferred_locations[0])
if not derived.uploading:
logger.debug('Derived image %s exists in storage', derived.uuid)
derived_layer_path = store.image_layer_path(derived.uuid)
download_url = store.get_direct_download_url(derived.locations, derived_layer_path)
if download_url:
logger.debug('Redirecting to download URL for derived image %s', derived.uuid)
return redirect(download_url)
logger.debug('Sending cached derived image %s', derived.uuid)
return send_file(store.stream_read_file(derived.locations, derived_layer_path))
# Load the ancestry for the image.
logger.debug('Building and returning derived image %s', derived.uuid)
uuid = repo_image.storage.uuid
ancestry_data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid))
full_image_list = json.loads(ancestry_data)
# Load the image's JSON layer.
image_json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid))
image_json = json.loads(image_json_data)
# Calculate a synthetic image ID.
synthetic_image_id = hashlib.sha256(tag_image.docker_image_id + ':squash').hexdigest()
# Create a queue process to generate the data. The queue files will read from the process
# and send the results to the client and storage.
def _cleanup():
# Close any existing DB connection once the process has exited.
database.close_db_filter(None)
args = (namespace, repository, tag, synthetic_image_id, image_json, full_image_list)
queue_process = QueueProcess(_open_stream,
8 * 1024, 10 * 1024 * 1024, # 8K/10M chunk/max
args, finished=_cleanup)
client_queue_file = QueueFile(queue_process.create_queue(), 'client')
storage_queue_file = QueueFile(queue_process.create_queue(), 'storage')
# Start building.
queue_process.run()
# Start the storage saving.
storage_args = (derived.uuid, derived.locations, storage_queue_file)
QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args, finished=_cleanup)
# Return the client's data.
return send_file(client_queue_file)
abort(403)