278 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			278 lines
		
	
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| import json
 | |
| import hashlib
 | |
| 
 | |
| from flask import redirect, Blueprint, abort, send_file, make_response
 | |
| 
 | |
| from app import app, signer
 | |
| from auth.auth import process_auth
 | |
| from auth.permissions import ReadRepositoryPermission
 | |
| from data import model
 | |
| from data import database
 | |
| from endpoints.trackhelper import track_and_log
 | |
| from storage import Storage
 | |
| 
 | |
| from util.queuefile import QueueFile
 | |
| from util.queueprocess import QueueProcess
 | |
| from formats.squashed import SquashedDockerImage
 | |
| from formats.aci import ACIImage
 | |
| 
 | |
| 
 | |
| verbs = Blueprint('verbs', __name__)
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, image_json,
 | |
|                  image_id_list):
 | |
|   store = Storage(app)
 | |
| 
 | |
|   # For performance reasons, we load the full image list here, cache it, then disconnect from
 | |
|   # the database.
 | |
|   with database.UseThenDisconnect(app.config):
 | |
|     image_list = list(model.get_matching_repository_images(namespace, repository, image_id_list))
 | |
| 
 | |
|   image_list.sort(key=lambda image: image_id_list.index(image.docker_image_id))
 | |
| 
 | |
|   def get_next_image():
 | |
|     for current_image in image_list:
 | |
|       yield current_image
 | |
| 
 | |
|   def get_next_layer():
 | |
|     for current_image_entry in image_list:
 | |
|       current_image_path = store.image_layer_path(current_image_entry.storage.uuid)
 | |
|       current_image_stream = store.stream_read_file(current_image_entry.storage.locations,
 | |
|                                                     current_image_path)
 | |
| 
 | |
|       current_image_id = current_image_entry.id
 | |
|       logger.debug('Returning image layer %s: %s', current_image_id, current_image_path)
 | |
|       yield current_image_stream
 | |
| 
 | |
|   stream = formatter.build_stream(namespace, repository, tag, synthetic_image_id, image_json,
 | |
|     get_next_image, get_next_layer)
 | |
| 
 | |
|   return stream.read
 | |
| 
 | |
| 
 | |
| def _sign_sythentic_image(verb, linked_storage_uuid, queue_file):
 | |
|   signature = None
 | |
|   try:
 | |
|     signature = signer.detached_sign(queue_file)
 | |
|   except:
 | |
|     logger.exception('Exception when signing %s image %s', verb, linked_storage_uuid)
 | |
|     return
 | |
| 
 | |
|   # Setup the database (since this is a new process) and then disconnect immediately
 | |
|   # once the operation completes.
 | |
|   if not queue_file.raised_exception:
 | |
|     with database.UseThenDisconnect(app.config):
 | |
|       try:
 | |
|         derived = model.get_storage_by_uuid(linked_storage_uuid)
 | |
|       except model.InvalidImageException:
 | |
|         return
 | |
| 
 | |
|       signature_entry = model.find_or_create_storage_signature(derived, signer.name)
 | |
|       signature_entry.signature = signature
 | |
|       signature_entry.uploading = False
 | |
|       signature_entry.save()
 | |
| 
 | |
| 
 | |
| def _write_synthetic_image_to_storage(verb, linked_storage_uuid, linked_locations, queue_file):
 | |
|   store = Storage(app)
 | |
| 
 | |
|   def handle_exception(ex):
 | |
|     logger.debug('Exception when building %s image %s: %s', verb, linked_storage_uuid, ex)
 | |
| 
 | |
|     with database.UseThenDisconnect(app.config):
 | |
|       model.delete_derived_storage_by_uuid(linked_storage_uuid)
 | |
| 
 | |
|   queue_file.add_exception_handler(handle_exception)
 | |
| 
 | |
|   image_path = store.image_layer_path(linked_storage_uuid)
 | |
|   store.stream_write(linked_locations, image_path, queue_file)
 | |
|   queue_file.close()
 | |
| 
 | |
|   if not queue_file.raised_exception:
 | |
|     # Setup the database (since this is a new process) and then disconnect immediately
 | |
|     # once the operation completes.
 | |
|     with database.UseThenDisconnect(app.config):
 | |
|       done_uploading = model.get_storage_by_uuid(linked_storage_uuid)
 | |
|       done_uploading.uploading = False
 | |
|       done_uploading.save()
 | |
| 
 | |
| 
 | |
| def _verify_repo_verb(store, namespace, repository, tag, verb, checker=None):
 | |
|   permission = ReadRepositoryPermission(namespace, repository)
 | |
| 
 | |
|   if not permission.can() and not model.repository_is_public(namespace, repository):
 | |
|     abort(403)
 | |
| 
 | |
|   # Lookup the requested tag.
 | |
|   try:
 | |
|     tag_image = model.get_tag_image(namespace, repository, tag)
 | |
|   except model.DataModelException:
 | |
|     abort(404)
 | |
| 
 | |
|   # Lookup the tag's image and storage.
 | |
|   repo_image = model.get_repo_image_extended(namespace, repository, tag_image.docker_image_id)
 | |
|   if not repo_image:
 | |
|     abort(404)
 | |
| 
 | |
|   # If there is a data checker, call it first.
 | |
|   uuid = repo_image.storage.uuid
 | |
|   image_json = None
 | |
| 
 | |
|   if checker is not None:
 | |
|     image_json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid))
 | |
|     image_json = json.loads(image_json_data)
 | |
| 
 | |
|     if not checker(image_json):
 | |
|       logger.debug('Check mismatch on %s/%s:%s, verb %s', namespace, repository, tag, verb)
 | |
|       abort(404)
 | |
| 
 | |
|   return (repo_image, tag_image, image_json)
 | |
| 
 | |
| 
 | |
| def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwargs):
 | |
|   # Verify that the image exists and that we have access to it.
 | |
|   store = Storage(app)
 | |
|   result = _verify_repo_verb(store, namespace, repository, tag, verb, checker)
 | |
|   (repo_image, tag_image, image_json) = result
 | |
| 
 | |
|   # Lookup the derived image storage for the verb.
 | |
|   derived = model.find_derived_storage(repo_image.storage, verb)
 | |
|   if derived is None or derived.uploading:
 | |
|     abort(404)
 | |
| 
 | |
|   # Check if we have a valid signer configured.
 | |
|   if not signer.name:
 | |
|     abort(404)
 | |
| 
 | |
|   # Lookup the signature for the verb.
 | |
|   signature_entry = model.lookup_storage_signature(derived, signer.name)
 | |
|   if signature_entry is None:
 | |
|     abort(404)
 | |
| 
 | |
|   # Return the signature.
 | |
|   return make_response(signature_entry.signature)
 | |
| 
 | |
| 
 | |
| def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=None, **kwargs):
 | |
|   # Verify that the image exists and that we have access to it.
 | |
|   store = Storage(app)
 | |
|   result = _verify_repo_verb(store, namespace, repository, tag, verb, checker)
 | |
|   (repo_image, tag_image, image_json) = result
 | |
| 
 | |
|   # Log the action.
 | |
|   track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, **kwargs)
 | |
| 
 | |
|   # Lookup/create the derived image storage for the verb.
 | |
|   derived = model.find_or_create_derived_storage(repo_image.storage, verb,
 | |
|                                                  store.preferred_locations[0])
 | |
| 
 | |
|   if not derived.uploading:
 | |
|     logger.debug('Derived %s image %s exists in storage', verb, derived.uuid)
 | |
|     derived_layer_path = store.image_layer_path(derived.uuid)
 | |
|     download_url = store.get_direct_download_url(derived.locations, derived_layer_path)
 | |
|     if download_url:
 | |
|       logger.debug('Redirecting to download URL for derived %s image %s', verb, derived.uuid)
 | |
|       return redirect(download_url)
 | |
| 
 | |
|     # Close the database handle here for this process before we send the long download.
 | |
|     database.close_db_filter(None)
 | |
| 
 | |
|     logger.debug('Sending cached derived %s image %s', verb, derived.uuid)
 | |
|     return send_file(store.stream_read_file(derived.locations, derived_layer_path))
 | |
| 
 | |
|   # Load the ancestry for the image.
 | |
|   uuid = repo_image.storage.uuid
 | |
| 
 | |
|   logger.debug('Building and returning derived %s image %s', verb, derived.uuid)
 | |
|   ancestry_data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid))
 | |
|   full_image_list = json.loads(ancestry_data)
 | |
| 
 | |
|   # Load the image's JSON layer.
 | |
|   if not image_json:
 | |
|     image_json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid))
 | |
|     image_json = json.loads(image_json_data)
 | |
| 
 | |
|   # Calculate a synthetic image ID.
 | |
|   synthetic_image_id = hashlib.sha256(tag_image.docker_image_id + ':' + verb).hexdigest()
 | |
| 
 | |
|   def _cleanup():
 | |
|     # Close any existing DB connection once the process has exited.
 | |
|     database.close_db_filter(None)
 | |
| 
 | |
|   # Create a queue process to generate the data. The queue files will read from the process
 | |
|   # and send the results to the client and storage.
 | |
|   args = (formatter, namespace, repository, tag, synthetic_image_id, image_json, full_image_list)
 | |
|   queue_process = QueueProcess(_open_stream,
 | |
|                   8 * 1024, 10 * 1024 * 1024, # 8K/10M chunk/max
 | |
|                   args, finished=_cleanup)
 | |
| 
 | |
|   client_queue_file = QueueFile(queue_process.create_queue(), 'client')
 | |
|   storage_queue_file = QueueFile(queue_process.create_queue(), 'storage')
 | |
| 
 | |
|   # If signing is required, add a QueueFile for signing the image as we stream it out.
 | |
|   signing_queue_file = None
 | |
|   if sign and signer.name:
 | |
|     signing_queue_file = QueueFile(queue_process.create_queue(), 'signing')
 | |
| 
 | |
|   # Start building.
 | |
|   queue_process.run()
 | |
| 
 | |
|   # Start the storage saving.
 | |
|   storage_args = (verb, derived.uuid, derived.locations, storage_queue_file)
 | |
|   QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args, finished=_cleanup)
 | |
| 
 | |
|   if sign and signer.name:
 | |
|     signing_args = (verb, derived.uuid, signing_queue_file)
 | |
|     QueueProcess.run_process(_sign_sythentic_image, signing_args, finished=_cleanup)
 | |
| 
 | |
|   # Close the database handle here for this process before we send the long download.
 | |
|   database.close_db_filter(None)
 | |
| 
 | |
|   # Return the client's data.
 | |
|   return send_file(client_queue_file)
 | |
| 
 | |
| 
 | |
| def os_arch_checker(os, arch):
 | |
|   def checker(image_json):
 | |
|     # Verify the architecture and os.
 | |
|     operating_system = image_json.get('os', 'linux')
 | |
|     if operating_system != os:
 | |
|       return False
 | |
| 
 | |
|     architecture = image_json.get('architecture', 'amd64')
 | |
| 
 | |
|     # Note: Some older Docker images have 'x86_64' rather than 'amd64'.
 | |
|     # We allow the conversion here.
 | |
|     if architecture == 'x86_64' and operating_system == 'linux':
 | |
|       architecture = 'amd64'
 | |
| 
 | |
|     if architecture != arch:
 | |
|       return False
 | |
| 
 | |
|     return True
 | |
| 
 | |
|   return checker
 | |
| 
 | |
| 
 | |
| @verbs.route('/aci/<server>/<namespace>/<repository>/<tag>/sig/<os>/<arch>/', methods=['GET'])
 | |
| @verbs.route('/aci/<server>/<namespace>/<repository>/<tag>/aci.asc/<os>/<arch>/', methods=['GET'])
 | |
| @process_auth
 | |
| def get_aci_signature(server, namespace, repository, tag, os, arch):
 | |
|   return _repo_verb_signature(namespace, repository, tag, 'aci', checker=os_arch_checker(os, arch),
 | |
|                               os=os, arch=arch)
 | |
| 
 | |
| 
 | |
| @verbs.route('/aci/<server>/<namespace>/<repository>/<tag>/aci/<os>/<arch>/', methods=['GET'])
 | |
| @process_auth
 | |
| def get_aci_image(server, namespace, repository, tag, os, arch):
 | |
|   return _repo_verb(namespace, repository, tag, 'aci', ACIImage(),
 | |
|                     sign=True, checker=os_arch_checker(os, arch), os=os, arch=arch)
 | |
| 
 | |
| 
 | |
| @verbs.route('/squash/<namespace>/<repository>/<tag>', methods=['GET'])
 | |
| @process_auth
 | |
| def get_squashed_tag(namespace, repository, tag):
 | |
|   return _repo_verb(namespace, repository, tag, 'squash', SquashedDockerImage())
 | |
| 
 |