Use the centrally initialized storage engine for verbs in master process
This commit is contained in:
		
							parent
							
								
									dc23ccce89
								
							
						
					
					
						commit
						612098b645
					
				
					 1 changed files with 21 additions and 17 deletions
				
			
		|  | @ -4,28 +4,26 @@ import hashlib | |||
| 
 | ||||
| from flask import redirect, Blueprint, abort, send_file, make_response | ||||
| 
 | ||||
| from app import app, signer, storage | ||||
| import features | ||||
| 
 | ||||
| from app import app, signer, storage, metric_queue | ||||
| from auth.auth import process_auth | ||||
| from auth.auth_context import get_authenticated_user | ||||
| from auth.permissions import ReadRepositoryPermission | ||||
| from data import model, database | ||||
| from endpoints.trackhelper import track_and_log | ||||
| from endpoints.decorators import anon_protect | ||||
| from storage import Storage | ||||
| 
 | ||||
| from util.registry.queuefile import QueueFile | ||||
| from util.registry.queueprocess import QueueProcess | ||||
| from util.registry.torrent import (make_torrent, per_user_torrent_filename, public_torrent_filename, | ||||
|                                    PieceHasher) | ||||
| from util.registry.filelike import wrap_with_handler | ||||
| from util.registry.gzipstream import SizeInfo | ||||
| from formats.squashed import SquashedDockerImage | ||||
| from formats.aci import ACIImage | ||||
| from storage import Storage | ||||
| from endpoints.v2.blob import BLOB_DIGEST_ROUTE | ||||
| from endpoints.common import route_show_if | ||||
| 
 | ||||
| import features | ||||
| 
 | ||||
| 
 | ||||
| verbs = Blueprint('verbs', __name__) | ||||
| logger = logging.getLogger(__name__) | ||||
|  | @ -33,8 +31,9 @@ logger = logging.getLogger(__name__) | |||
| 
 | ||||
| def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image, | ||||
|                  handlers): | ||||
|   store = Storage(app) | ||||
| 
 | ||||
|   """ This method generates a stream of data which will be replicated and read from the queue files. | ||||
|       This method runs in a separate process. | ||||
|   """ | ||||
|   # For performance reasons, we load the full image list here, cache it, then disconnect from | ||||
|   # the database. | ||||
|   with database.UseThenDisconnect(app.config): | ||||
|  | @ -49,6 +48,8 @@ def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, imag | |||
|       yield current_image | ||||
| 
 | ||||
|   def get_next_layer(): | ||||
|     # Re-Initialize the storage engine because some may not respond well to forking (e.g. S3) | ||||
|     store = Storage(app, metric_queue) | ||||
|     for current_image_entry in image_list: | ||||
|       current_image_path = model.storage.get_layer_path(current_image_entry.storage) | ||||
|       current_image_stream = store.stream_read_file(current_image_entry.storage.locations, | ||||
|  | @ -69,6 +70,8 @@ def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, imag | |||
| 
 | ||||
| 
 | ||||
| def _sign_synthetic_image(verb, linked_storage_uuid, queue_file): | ||||
|   """ Read from the queue file and sign the contents which are generated. This method runs in a | ||||
|       separate process. """ | ||||
|   signature = None | ||||
|   try: | ||||
|     signature = signer.detached_sign(queue_file) | ||||
|  | @ -92,8 +95,9 @@ def _sign_synthetic_image(verb, linked_storage_uuid, queue_file): | |||
| 
 | ||||
| 
 | ||||
| def _write_synthetic_image_to_storage(verb, linked_storage_uuid, linked_locations, queue_file): | ||||
|   store = Storage(app) | ||||
| 
 | ||||
|   """ Read from the generated stream and write it back to the storage engine. This method runs in a | ||||
|       separate process. | ||||
|   """ | ||||
|   def handle_exception(ex): | ||||
|     logger.debug('Exception when building %s image %s: %s', verb, linked_storage_uuid, ex) | ||||
| 
 | ||||
|  | @ -102,6 +106,8 @@ def _write_synthetic_image_to_storage(verb, linked_storage_uuid, linked_location | |||
| 
 | ||||
|   queue_file.add_exception_handler(handle_exception) | ||||
| 
 | ||||
|   # Re-Initialize the storage engine because some may not respond well to forking (e.g. S3) | ||||
|   store = Storage(app, metric_queue) | ||||
|   image_path = store.v1_image_layer_path(linked_storage_uuid) | ||||
|   store.stream_write(linked_locations, image_path, queue_file) | ||||
|   queue_file.close() | ||||
|  | @ -147,8 +153,7 @@ def _verify_repo_verb(store, namespace, repository, tag, verb, checker=None): | |||
| 
 | ||||
| def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwargs): | ||||
|   # Verify that the image exists and that we have access to it. | ||||
|   store = Storage(app) | ||||
|   result = _verify_repo_verb(store, namespace, repository, tag, verb, checker) | ||||
|   result = _verify_repo_verb(storage, namespace, repository, tag, verb, checker) | ||||
|   (repo_image, _, _) = result | ||||
| 
 | ||||
|   # Lookup the derived image storage for the verb. | ||||
|  | @ -171,8 +176,7 @@ def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwarg | |||
| 
 | ||||
| def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=None, **kwargs): | ||||
|   # Verify that the image exists and that we have access to it. | ||||
|   store = Storage(app) | ||||
|   result = _verify_repo_verb(store, namespace, repository, tag, verb, checker) | ||||
|   result = _verify_repo_verb(storage, namespace, repository, tag, verb, checker) | ||||
|   (repo_image, tag_image, image_json) = result | ||||
| 
 | ||||
|   # Log the action. | ||||
|  | @ -180,12 +184,12 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker= | |||
| 
 | ||||
|   # Lookup/create the derived image storage for the verb and repo image. | ||||
|   derived = model.image.find_or_create_derived_storage(repo_image, verb, | ||||
|                                                        store.preferred_locations[0]) | ||||
|                                                        storage.preferred_locations[0]) | ||||
| 
 | ||||
|   if not derived.uploading: | ||||
|     logger.debug('Derived %s image %s exists in storage', verb, derived.uuid) | ||||
|     derived_layer_path = model.storage.get_layer_path(derived) | ||||
|     download_url = store.get_direct_download_url(derived.locations, derived_layer_path) | ||||
|     download_url = storage.get_direct_download_url(derived.locations, derived_layer_path) | ||||
|     if download_url: | ||||
|       logger.debug('Redirecting to download URL for derived %s image %s', verb, derived.uuid) | ||||
|       return redirect(download_url) | ||||
|  | @ -194,7 +198,7 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker= | |||
|     database.close_db_filter(None) | ||||
| 
 | ||||
|     logger.debug('Sending cached derived %s image %s', verb, derived.uuid) | ||||
|     return send_file(store.stream_read_file(derived.locations, derived_layer_path)) | ||||
|     return send_file(storage.stream_read_file(derived.locations, derived_layer_path)) | ||||
| 
 | ||||
| 
 | ||||
|   logger.debug('Building and returning derived %s image %s', verb, derived.uuid) | ||||
|  |  | |||
		Reference in a new issue