import logging import json import hashlib from flask import redirect, Blueprint, abort, send_file, make_response, request import features from app import app, signer, storage, metric_queue from auth.auth import process_auth from auth.auth_context import get_authenticated_user from auth.permissions import ReadRepositoryPermission from data import model, database from endpoints.trackhelper import track_and_log from endpoints.decorators import anon_protect from util.registry.queuefile import QueueFile from util.registry.queueprocess import QueueProcess from util.registry.torrent import (make_torrent, per_user_torrent_filename, public_torrent_filename, PieceHasher) from util.registry.filelike import wrap_with_handler from formats.squashed import SquashedDockerImage from formats.aci import ACIImage from storage import Storage from endpoints.v2.blob import BLOB_DIGEST_ROUTE from endpoints.common import route_show_if, parse_repository_name verbs = Blueprint('verbs', __name__) logger = logging.getLogger(__name__) def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image, handlers): """ This method generates a stream of data which will be replicated and read from the queue files. This method runs in a separate process. """ # For performance reasons, we load the full image list here, cache it, then disconnect from # the database. with database.UseThenDisconnect(app.config): image_list = list(model.image.get_parent_images(namespace, repository, repo_image)) image_list.insert(0, repo_image) def get_image_json(image): return json.loads(image.v1_json_metadata) def get_next_image(): for current_image in image_list: yield current_image def get_next_layer(): # Re-Initialize the storage engine because some may not respond well to forking (e.g. S3) store = Storage(app, metric_queue) for current_image_entry in image_list: current_image_path = model.storage.get_layer_path(current_image_entry.storage) current_image_stream = store.stream_read_file(current_image_entry.storage.locations, current_image_path) current_image_id = current_image_entry.id logger.debug('Returning image layer %s (%s): %s', current_image_id, current_image_entry.docker_image_id, current_image_path) yield current_image_stream stream = formatter.build_stream(namespace, repository, tag, synthetic_image_id, image_json, get_next_image, get_next_layer, get_image_json) for handler_fn in handlers: stream = wrap_with_handler(stream, handler_fn) return stream.read def _sign_synthetic_image(verb, linked_storage_uuid, queue_file): """ Read from the queue file and sign the contents which are generated. This method runs in a separate process. """ signature = None try: signature = signer.detached_sign(queue_file) except: logger.exception('Exception when signing %s image %s', verb, linked_storage_uuid) return # Setup the database (since this is a new process) and then disconnect immediately # once the operation completes. if not queue_file.raised_exception: with database.UseThenDisconnect(app.config): try: derived = model.storage.get_storage_by_uuid(linked_storage_uuid) except model.storage.InvalidImageException: return signature_entry = model.storage.find_or_create_storage_signature(derived, signer.name) signature_entry.signature = signature signature_entry.uploading = False signature_entry.save() def _write_synthetic_image_to_storage(verb, linked_storage_uuid, linked_locations, queue_file): """ Read from the generated stream and write it back to the storage engine. This method runs in a separate process. """ def handle_exception(ex): logger.debug('Exception when building %s image %s: %s', verb, linked_storage_uuid, ex) with database.UseThenDisconnect(app.config): model.image.delete_derived_storage_by_uuid(linked_storage_uuid) queue_file.add_exception_handler(handle_exception) # Re-Initialize the storage engine because some may not respond well to forking (e.g. S3) store = Storage(app, metric_queue) image_path = store.v1_image_layer_path(linked_storage_uuid) store.stream_write(linked_locations, image_path, queue_file) queue_file.close() if not queue_file.raised_exception: # Setup the database (since this is a new process) and then disconnect immediately # once the operation completes. with database.UseThenDisconnect(app.config): done_uploading = model.storage.get_storage_by_uuid(linked_storage_uuid) done_uploading.uploading = False done_uploading.save() def _torrent_for_storage(storage_ref, is_public): """ Returns a response containing the torrent file contents for the given storage. May abort with an error if the state is not valid (e.g. non-public, non-user request). """ # Make sure the storage has a size. if not storage_ref.image_size: abort(404) # Lookup the torrent information for the storage. try: torrent_info = model.storage.get_torrent_info(storage_ref) except model.TorrentInfoDoesNotExist: abort(404) # Lookup the webseed path for the storage. path = model.storage.get_layer_path(storage_ref) webseed = storage.get_direct_download_url(storage_ref.locations, path, expires_in=app.config['BITTORRENT_WEBSEED_LIFETIME']) if webseed is None: # We cannot support webseeds for storages that cannot provide direct downloads. abort(make_response('Storage engine does not support seeding.', 501)) # Build the filename for the torrent. if is_public: name = public_torrent_filename(storage_ref.uuid) else: user = get_authenticated_user() if not user: abort(403) name = per_user_torrent_filename(user.uuid, storage_ref.uuid) # Return the torrent file. torrent_file = make_torrent(name, webseed, storage_ref.image_size, torrent_info.piece_length, torrent_info.pieces) headers = {'Content-Type': 'application/x-bittorrent', 'Content-Disposition': 'attachment; filename={0}.torrent'.format(name)} return make_response(torrent_file, 200, headers) def _torrent_repo_verb(repo_image, tag, verb, **kwargs): """ Handles returning a torrent for the given verb on the given image and tag. """ if not features.BITTORRENT: # Torrent feature is not enabled. abort(406) # Lookup an *existing* derived storage for the verb. If the verb's image storage doesn't exist, # we cannot create it here, so we 404. derived = model.image.find_derived_storage_for_image(repo_image, verb) if not derived: abort(406) # Return the torrent. public_repo = model.repository.is_repository_public(repo_image.repository) torrent = _torrent_for_storage(derived, public_repo) # Log the action. track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, torrent=True, **kwargs) return torrent def _verify_repo_verb(store, namespace, repository, tag, verb, checker=None): permission = ReadRepositoryPermission(namespace, repository) if not permission.can() and not model.repository.repository_is_public(namespace, repository): abort(403) # Lookup the requested tag. try: tag_image = model.tag.get_tag_image(namespace, repository, tag) except model.DataModelException: abort(404) # Lookup the tag's image and storage. repo_image = model.image.get_repo_image_extended(namespace, repository, tag_image.docker_image_id) if not repo_image: abort(404) # If there is a data checker, call it first. image_json = None if checker is not None: image_json = json.loads(repo_image.v1_json_metadata) if not checker(image_json): logger.debug('Check mismatch on %s/%s:%s, verb %s', namespace, repository, tag, verb) abort(404) return (repo_image, tag_image, image_json) def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwargs): # Verify that the image exists and that we have access to it. result = _verify_repo_verb(storage, namespace, repository, tag, verb, checker) (repo_image, _, _) = result # Lookup the derived image storage for the verb. derived = model.image.find_derived_storage_for_image(repo_image, verb) if derived is None or derived.uploading: return make_response('', 202) # Check if we have a valid signer configured. if not signer.name: abort(404) # Lookup the signature for the verb. signature_entry = model.storage.lookup_storage_signature(derived, signer.name) if signature_entry is None: abort(404) # Return the signature. return make_response(signature_entry.signature) def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=None, **kwargs): # Verify that the image exists and that we have access to it. result = _verify_repo_verb(storage, namespace, repository, tag, verb, checker) (repo_image, tag_image, image_json) = result # Check for torrent. If found, we return a torrent for the repo verb image (if the derived # image already exists). if request.accept_mimetypes.best == 'application/x-bittorrent': return _torrent_repo_verb(repo_image, tag, verb, **kwargs) # Log the action. track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, **kwargs) # Lookup/create the derived image storage for the verb and repo image. derived = model.image.find_or_create_derived_storage(repo_image, verb, storage.preferred_locations[0]) if not derived.uploading: logger.debug('Derived %s image %s exists in storage', verb, derived.uuid) derived_layer_path = model.storage.get_layer_path(derived) is_head_request = request.method == 'HEAD' download_url = storage.get_direct_download_url(derived.locations, derived_layer_path, head=is_head_request) if download_url: logger.debug('Redirecting to download URL for derived %s image %s', verb, derived.uuid) return redirect(download_url) # Close the database handle here for this process before we send the long download. database.close_db_filter(None) logger.debug('Sending cached derived %s image %s', verb, derived.uuid) return send_file(storage.stream_read_file(derived.locations, derived_layer_path)) logger.debug('Building and returning derived %s image %s', verb, derived.uuid) # Load the image's JSON layer. if not image_json: image_json = json.loads(repo_image.v1_json_metadata) # Calculate a synthetic image ID. synthetic_image_id = hashlib.sha256(tag_image.docker_image_id + ':' + verb).hexdigest() def _cleanup(): # Close any existing DB connection once the process has exited. database.close_db_filter(None) hasher = PieceHasher(app.config['BITTORRENT_PIECE_SIZE']) def _store_metadata_and_cleanup(): with database.UseThenDisconnect(app.config): model.storage.save_torrent_info(derived, app.config['BITTORRENT_PIECE_SIZE'], hasher.final_piece_hashes()) derived.image_size = hasher.hashed_bytes derived.save() # Create a queue process to generate the data. The queue files will read from the process # and send the results to the client and storage. handlers = [hasher.update] args = (formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image, handlers) queue_process = QueueProcess(_open_stream, 8 * 1024, 10 * 1024 * 1024, # 8K/10M chunk/max args, finished=_store_metadata_and_cleanup) client_queue_file = QueueFile(queue_process.create_queue(), 'client') storage_queue_file = QueueFile(queue_process.create_queue(), 'storage') # If signing is required, add a QueueFile for signing the image as we stream it out. signing_queue_file = None if sign and signer.name: signing_queue_file = QueueFile(queue_process.create_queue(), 'signing') # Start building. queue_process.run() # Start the storage saving. storage_args = (verb, derived.uuid, derived.locations, storage_queue_file) QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args, finished=_cleanup) if sign and signer.name: signing_args = (verb, derived.uuid, signing_queue_file) QueueProcess.run_process(_sign_synthetic_image, signing_args, finished=_cleanup) # Close the database handle here for this process before we send the long download. database.close_db_filter(None) # Return the client's data. return send_file(client_queue_file) def os_arch_checker(os, arch): def checker(image_json): # Verify the architecture and os. operating_system = image_json.get('os', 'linux') if operating_system != os: return False architecture = image_json.get('architecture', 'amd64') # Note: Some older Docker images have 'x86_64' rather than 'amd64'. # We allow the conversion here. if architecture == 'x86_64' and operating_system == 'linux': architecture = 'amd64' if architecture != arch: return False return True return checker @anon_protect @verbs.route('/aci/////sig///', methods=['GET']) @verbs.route('/aci/////aci.asc///', methods=['GET']) @process_auth def get_aci_signature(server, namespace, repository, tag, os, arch): return _repo_verb_signature(namespace, repository, tag, 'aci', checker=os_arch_checker(os, arch), os=os, arch=arch) @anon_protect @verbs.route('/aci/////aci///', methods=['GET', 'HEAD']) @process_auth def get_aci_image(server, namespace, repository, tag, os, arch): return _repo_verb(namespace, repository, tag, 'aci', ACIImage(), sign=True, checker=os_arch_checker(os, arch), os=os, arch=arch) @anon_protect @verbs.route('/squash///', methods=['GET']) @process_auth def get_squashed_tag(namespace, repository, tag): return _repo_verb(namespace, repository, tag, 'squash', SquashedDockerImage()) @route_show_if(features.BITTORRENT) @anon_protect @verbs.route('/torrent{0}'.format(BLOB_DIGEST_ROUTE), methods=['GET']) @process_auth @parse_repository_name def get_tag_torrent(namespace, repo_name, digest): permission = ReadRepositoryPermission(namespace, repo_name) public_repo = model.repository.repository_is_public(namespace, repo_name) if not permission.can() and not public_repo: abort(403) user = get_authenticated_user() if user is None and not public_repo: # We can not generate a private torrent cluster without a user uuid (e.g. token auth) abort(403) try: blob = model.blob.get_repo_blob_by_digest(namespace, repo_name, digest) except model.BlobDoesNotExist: abort(404) return _torrent_for_storage(blob, public_repo)