import logging import json import hashlib from flask import redirect, Blueprint, abort, send_file, request from app import app from auth.auth import process_auth from auth.auth_context import get_authenticated_user from auth.permissions import ReadRepositoryPermission from data import model from data import database from endpoints.trackhelper import track_and_log from storage import Storage from util.queuefile import QueueFile from util.queueprocess import QueueProcess from util.gzipwrap import GzipWrap from formats.squashed import SquashedDockerImage from formats.aci import ACIImage verbs = Blueprint('verbs', __name__) logger = logging.getLogger(__name__) def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, image_json, image_id_list): store = Storage(app) # For performance reasons, we load the full image list here, cache it, then disconnect from # the database. with database.UseThenDisconnect(app.config): image_list = list(model.get_matching_repository_images(namespace, repository, image_id_list)) image_list.sort(key=lambda image: image_id_list.index(image.docker_image_id)) def get_next_image(): for current_image in image_list: yield current_image def get_next_layer(): for current_image_entry in image_list: current_image_path = store.image_layer_path(current_image_entry.storage.uuid) current_image_stream = store.stream_read_file(current_image_entry.storage.locations, current_image_path) current_image_id = current_image_entry.id logger.debug('Returning image layer %s: %s' % (current_image_id, current_image_path)) yield current_image_stream stream = formatter.build_stream(namespace, repository, tag, synthetic_image_id, image_json, get_next_image, get_next_layer) return stream.read def _write_synthetic_image_to_storage(verb, linked_storage_uuid, linked_locations, queue_file): store = Storage(app) def handle_exception(ex): logger.debug('Exception when building %s image %s: %s', verb, linked_storage_uuid, ex) with database.UseThenDisconnect(app.config): model.delete_derived_storage_by_uuid(linked_storage_uuid) queue_file.add_exception_handler(handle_exception) image_path = store.image_layer_path(linked_storage_uuid) store.stream_write(linked_locations, image_path, queue_file) queue_file.close() if not queue_file.raised_exception: with database.UseThenDisconnect(app.config): done_uploading = model.get_storage_by_uuid(linked_storage_uuid) done_uploading.uploading = False done_uploading.save() def _repo_verb(namespace, repository, tag, verb, formatter, checker=None, **kwargs): permission = ReadRepositoryPermission(namespace, repository) # !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! # TODO: renable auth! # !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! if True or permission.can() or model.repository_is_public(namespace, repository): # Lookup the requested tag. try: tag_image = model.get_tag_image(namespace, repository, tag) except model.DataModelException: abort(404) # Lookup the tag's image and storage. repo_image = model.get_repo_image_extended(namespace, repository, tag_image.docker_image_id) if not repo_image: abort(404) # If there is a data checker, call it first. store = Storage(app) uuid = repo_image.storage.uuid image_json = None if checker is not None: image_json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid)) image_json = json.loads(image_json_data) if not checker(image_json): logger.debug('Check mismatch on %s/%s:%s, verb %s', namespace, repository, tag, verb) abort(404) # Log the action. track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, **kwargs) derived = model.find_or_create_derived_storage(repo_image.storage, verb, store.preferred_locations[0]) # !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! # TODO: renable caching! # !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! if False and not derived.uploading: logger.debug('Derived %s image %s exists in storage', verb, derived.uuid) derived_layer_path = store.image_layer_path(derived.uuid) download_url = store.get_direct_download_url(derived.locations, derived_layer_path) if download_url: logger.debug('Redirecting to download URL for derived %s image %s', verb, derived.uuid) return redirect(download_url) # Close the database handle here for this process before we send the long download. database.close_db_filter(None) logger.debug('Sending cached derived %s image %s', verb, derived.uuid) return send_file(store.stream_read_file(derived.locations, derived_layer_path)) # Load the ancestry for the image. logger.debug('Building and returning derived %s image %s', verb, derived.uuid) ancestry_data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid)) full_image_list = json.loads(ancestry_data) # Load the image's JSON layer. if not image_json: image_json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid)) image_json = json.loads(image_json_data) # Calculate a synthetic image ID. synthetic_image_id = hashlib.sha256(tag_image.docker_image_id + ':' + verb).hexdigest() # Create a queue process to generate the data. The queue files will read from the process # and send the results to the client and storage. def _cleanup(): # Close any existing DB connection once the process has exited. database.close_db_filter(None) args = (formatter, namespace, repository, tag, synthetic_image_id, image_json, full_image_list) queue_process = QueueProcess(_open_stream, 8 * 1024, 10 * 1024 * 1024, # 8K/10M chunk/max args, finished=_cleanup) client_queue_file = QueueFile(queue_process.create_queue(), 'client') storage_queue_file = QueueFile(queue_process.create_queue(), 'storage') # Start building. queue_process.run() # Start the storage saving. storage_args = (verb, derived.uuid, derived.locations, storage_queue_file) QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args, finished=_cleanup) # Close the database handle here for this process before we send the long download. database.close_db_filter(None) # Return the client's data. return send_file(client_queue_file) abort(403) @verbs.route('/aci/////aci///', methods=['GET']) @process_auth def get_rocket_image(server, namespace, repository, tag, os, arch): def checker(image_json): # Verify the architecture and os. operating_system = image_json.get('os', 'linux') if operating_system != os: return False architecture = image_json.get('architecture', 'amd64') if architecture != arch: return False return True return _repo_verb(namespace, repository, tag, 'aci', ACIImage(), checker=checker, os=os, arch=arch) @verbs.route('/squash///', methods=['GET']) @process_auth def get_squashed_tag(namespace, repository, tag): return _repo_verb(namespace, repository, tag, 'squash', SquashedDockerImage())