From 612098b64599f0cae9cfeb12bae5784a26f5a091 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Fri, 15 Jan 2016 11:32:29 -0500 Subject: [PATCH] Use the centrally initialized storage engine for verbs in master process --- endpoints/verbs.py | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/endpoints/verbs.py b/endpoints/verbs.py index 166d8cd25..85c46f495 100644 --- a/endpoints/verbs.py +++ b/endpoints/verbs.py @@ -4,28 +4,26 @@ import hashlib from flask import redirect, Blueprint, abort, send_file, make_response -from app import app, signer, storage +import features + +from app import app, signer, storage, metric_queue from auth.auth import process_auth from auth.auth_context import get_authenticated_user from auth.permissions import ReadRepositoryPermission from data import model, database from endpoints.trackhelper import track_and_log from endpoints.decorators import anon_protect -from storage import Storage - from util.registry.queuefile import QueueFile from util.registry.queueprocess import QueueProcess from util.registry.torrent import (make_torrent, per_user_torrent_filename, public_torrent_filename, PieceHasher) from util.registry.filelike import wrap_with_handler -from util.registry.gzipstream import SizeInfo from formats.squashed import SquashedDockerImage from formats.aci import ACIImage +from storage import Storage from endpoints.v2.blob import BLOB_DIGEST_ROUTE from endpoints.common import route_show_if -import features - verbs = Blueprint('verbs', __name__) logger = logging.getLogger(__name__) @@ -33,8 +31,9 @@ logger = logging.getLogger(__name__) def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image, handlers): - store = Storage(app) - + """ This method generates a stream of data which will be replicated and read from the queue files. + This method runs in a separate process. + """ # For performance reasons, we load the full image list here, cache it, then disconnect from # the database. with database.UseThenDisconnect(app.config): @@ -49,6 +48,8 @@ def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, imag yield current_image def get_next_layer(): + # Re-Initialize the storage engine because some may not respond well to forking (e.g. S3) + store = Storage(app, metric_queue) for current_image_entry in image_list: current_image_path = model.storage.get_layer_path(current_image_entry.storage) current_image_stream = store.stream_read_file(current_image_entry.storage.locations, @@ -69,6 +70,8 @@ def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, imag def _sign_synthetic_image(verb, linked_storage_uuid, queue_file): + """ Read from the queue file and sign the contents which are generated. This method runs in a + separate process. """ signature = None try: signature = signer.detached_sign(queue_file) @@ -92,8 +95,9 @@ def _sign_synthetic_image(verb, linked_storage_uuid, queue_file): def _write_synthetic_image_to_storage(verb, linked_storage_uuid, linked_locations, queue_file): - store = Storage(app) - + """ Read from the generated stream and write it back to the storage engine. This method runs in a + separate process. + """ def handle_exception(ex): logger.debug('Exception when building %s image %s: %s', verb, linked_storage_uuid, ex) @@ -102,6 +106,8 @@ def _write_synthetic_image_to_storage(verb, linked_storage_uuid, linked_location queue_file.add_exception_handler(handle_exception) + # Re-Initialize the storage engine because some may not respond well to forking (e.g. S3) + store = Storage(app, metric_queue) image_path = store.v1_image_layer_path(linked_storage_uuid) store.stream_write(linked_locations, image_path, queue_file) queue_file.close() @@ -147,8 +153,7 @@ def _verify_repo_verb(store, namespace, repository, tag, verb, checker=None): def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwargs): # Verify that the image exists and that we have access to it. - store = Storage(app) - result = _verify_repo_verb(store, namespace, repository, tag, verb, checker) + result = _verify_repo_verb(storage, namespace, repository, tag, verb, checker) (repo_image, _, _) = result # Lookup the derived image storage for the verb. @@ -171,8 +176,7 @@ def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwarg def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=None, **kwargs): # Verify that the image exists and that we have access to it. - store = Storage(app) - result = _verify_repo_verb(store, namespace, repository, tag, verb, checker) + result = _verify_repo_verb(storage, namespace, repository, tag, verb, checker) (repo_image, tag_image, image_json) = result # Log the action. @@ -180,12 +184,12 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker= # Lookup/create the derived image storage for the verb and repo image. derived = model.image.find_or_create_derived_storage(repo_image, verb, - store.preferred_locations[0]) + storage.preferred_locations[0]) if not derived.uploading: logger.debug('Derived %s image %s exists in storage', verb, derived.uuid) derived_layer_path = model.storage.get_layer_path(derived) - download_url = store.get_direct_download_url(derived.locations, derived_layer_path) + download_url = storage.get_direct_download_url(derived.locations, derived_layer_path) if download_url: logger.debug('Redirecting to download URL for derived %s image %s', verb, derived.uuid) return redirect(download_url) @@ -194,7 +198,7 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker= database.close_db_filter(None) logger.debug('Sending cached derived %s image %s', verb, derived.uuid) - return send_file(store.stream_read_file(derived.locations, derived_layer_path)) + return send_file(storage.stream_read_file(derived.locations, derived_layer_path)) logger.debug('Building and returning derived %s image %s', verb, derived.uuid)