diff --git a/endpoints/verbs.py b/endpoints/verbs.py index 159f4777f..5f4ffcfd8 100644 --- a/endpoints/verbs.py +++ b/endpoints/verbs.py @@ -11,6 +11,8 @@ from auth.permissions import ReadRepositoryPermission from data import model from endpoints.registry import set_cache_headers +from util.queuefile import QueueFile +from util.queueprocess import QueueProcess from util.gzipwrap import GzipWrap from util.streamlayerformat import StreamLayerMerger @@ -19,6 +21,24 @@ from werkzeug.wsgi import wrap_file verbs = Blueprint('verbs', __name__) logger = logging.getLogger(__name__) +def _open_stream(namespace, repository, image_list): + def get_next_layer(): + for current_image_id in image_list: + current_image_entry = model.get_repo_image(namespace, repository, current_image_id) + current_image_path = store.image_layer_path(current_image_entry.storage.uuid) + current_image_stream = store.stream_read_file(current_image_entry.storage.locations, + current_image_path) + + logger.debug('Returning image layer %s: %s' % (current_image_id, current_image_path)) + yield current_image_stream + + stream = GzipWrap(StreamLayerMerger(get_next_layer).get_generator()) + return stream.read + +def _write_image_to_storage(namespace, repository, locations, image_id, queue_file): + image_path = store.image_layer_path(image_id) + store.stream_write(locations, image_path, queue_file) + queue_file.close() @verbs.route('////squash', methods=['GET']) @process_auth @@ -42,22 +62,43 @@ def get_squashed_tag(namespace, repository, tag, headers): unhashed = str(repo_image.storage.id) + ':' + app.config['SECRET_KEY'] synthetic_image_id = hashlib.sha256(unhashed).hexdigest() + # Check to see if the synthetic image ID exists in storage. If so, we just return a 302. + logger.debug('Looking up synthetic image %s', synthetic_image_id) + + locations = repo_image.storage.locations + saved_image_path = store.image_layer_path(synthetic_image_id) + if store.exists(locations, saved_image_path): + logger.debug('Synthetic image %s exists in storage', synthetic_image_id) + download_url = store.get_direct_download_url(locations, saved_image_path) + if download_url: + logger.debug('Redirecting to download URL for synthetic image %s', synthetic_image_id) + return redirect(download_url, code=302) + + logger.debug('Sending cached synthetic image %s', synthetic_image_id) + return send_file(store.stream_read_file(locations, saved_image_path)) + # Load the ancestry for the image. + logger.debug('Building and returning synthetic image %s', synthetic_image_id) uuid = repo_image.storage.uuid ancestry_data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid)) full_image_list = json.loads(ancestry_data) - - def get_next_layer(): - for current_image_id in full_image_list: - current_image_entry = model.get_repo_image(namespace, repository, current_image_id) - current_image_path = store.image_layer_path(current_image_entry.storage.uuid) - current_image_stream = store.stream_read_file(current_image_entry.storage.locations, - current_image_path) - logger.debug('Returning image layer %s: %s' % (current_image_id, current_image_path)) - yield current_image_stream + # Create a queue process to generate the data. The queue files will read from the process + # and send the results to the client and storage. + args = (namespace, repository, full_image_list) + queue_process = QueueProcess(_open_stream, 8 * 1024, 10 * 1024 * 1024, args) # 8K/10M chunk/max - stream = GzipWrap(StreamLayerMerger(get_next_layer).get_generator()) - return send_file(stream) + client_queue_file = QueueFile(queue_process.create_queue(), 'client') + storage_queue_file = QueueFile(queue_process.create_queue(), 'storage') + + # Start building. + queue_process.run() + + # Start the storage saving. + storage_args = (namespace, repository, locations, synthetic_image_id, storage_queue_file) + QueueProcess.run_process(_write_image_to_storage, storage_args) + + # Return the client's data. + return send_file(client_queue_file) abort(403) diff --git a/util/queuefile.py b/util/queuefile.py new file mode 100644 index 000000000..9c64c26fb --- /dev/null +++ b/util/queuefile.py @@ -0,0 +1,37 @@ +from multiprocessing import Queue +import os + +class QueueFile(object): + """ Class which implements a file-like interface and reads from a blocking + multiprocessing queue. + """ + def __init__(self, queue, name=None): + self._queue = queue + self._closed = False + self._done = False + self._buffer = '' + self._total_size = 0 + self._name = name + + def read(self, size=8192): + if self._closed or self._done: + return None + + while len(self._buffer) < size: + result = self._queue.get(block=True) + if result is None: + self._done = True + break + + self._buffer += result + self._total_size += len(result) + + buf = self._buffer[0:size] + self._buffer = self._buffer[size:] + return buf + + def flush(self): + pass + + def close(self): + self._closed = True diff --git a/util/queueprocess.py b/util/queueprocess.py new file mode 100644 index 000000000..bf8ecb280 --- /dev/null +++ b/util/queueprocess.py @@ -0,0 +1,57 @@ +from multiprocessing import Process, Queue +import logging +import multiprocessing +import os +import time +import gipc + +logger = multiprocessing.log_to_stderr() +logger.setLevel(logging.INFO) + +class QueueProcess(object): + """ Helper class which invokes a worker in a process to produce + data for one (or more) queues. + """ + def __init__(self, get_producer, chunk_size, max_size, args): + self._get_producer = get_producer + self._queues = [] + self._chunk_size = chunk_size + self._max_size = max_size + self._args = args or [] + + def create_queue(self): + """ Adds a multiprocessing queue to the list of queues. Any queues added + will have the data produced appended. + """ + queue = Queue(self._max_size / self._chunk_size) + self._queues.append(queue) + return queue + + @staticmethod + def run_process(target, args): + gipc.start_process(target=target, args=args) + + def run(self): + # Important! gipc is used here because normal multiprocessing does not work + # correctly with gevent when we sleep. + args = (self._get_producer, self._queues, self._chunk_size, self._args) + QueueProcess.run_process(_run, args) + +def _run(get_producer, queues, chunk_size, args): + producer = get_producer(*args) + while True: + data = producer(chunk_size) or None + for queue in queues: + try: + queue.put(data, block=True, timeout=10) + except Exception as ex: + # One of the listeners stopped listening. + return + + if data is None: + break + + # Important! This allows the thread that writes the queue data to the pipe + # to do so. Otherwise, this hangs. + time.sleep(0) +