Have the squashing system write the data (via a double queue system and multiprocessing) to both the client and the stack's storage. On subsequent calls, if the synthetic image exists, it will be returned directly instead of being recomputed

This commit is contained in:
Joseph Schorr 2014-09-16 22:43:19 -04:00
parent 5cca609c55
commit 1cfb6fc353
3 changed files with 146 additions and 11 deletions

View file

@ -11,6 +11,8 @@ from auth.permissions import ReadRepositoryPermission
from data import model
from endpoints.registry import set_cache_headers
from util.queuefile import QueueFile
from util.queueprocess import QueueProcess
from util.gzipwrap import GzipWrap
from util.streamlayerformat import StreamLayerMerger
@ -19,6 +21,24 @@ from werkzeug.wsgi import wrap_file
verbs = Blueprint('verbs', __name__)
logger = logging.getLogger(__name__)
def _open_stream(namespace, repository, image_list):
def get_next_layer():
for current_image_id in image_list:
current_image_entry = model.get_repo_image(namespace, repository, current_image_id)
current_image_path = store.image_layer_path(current_image_entry.storage.uuid)
current_image_stream = store.stream_read_file(current_image_entry.storage.locations,
current_image_path)
logger.debug('Returning image layer %s: %s' % (current_image_id, current_image_path))
yield current_image_stream
stream = GzipWrap(StreamLayerMerger(get_next_layer).get_generator())
return stream.read
def _write_image_to_storage(namespace, repository, locations, image_id, queue_file):
image_path = store.image_layer_path(image_id)
store.stream_write(locations, image_path, queue_file)
queue_file.close()
@verbs.route('/<namespace>/<repository>/<tag>/squash', methods=['GET'])
@process_auth
@ -42,22 +62,43 @@ def get_squashed_tag(namespace, repository, tag, headers):
unhashed = str(repo_image.storage.id) + ':' + app.config['SECRET_KEY']
synthetic_image_id = hashlib.sha256(unhashed).hexdigest()
# Check to see if the synthetic image ID exists in storage. If so, we just return a 302.
logger.debug('Looking up synthetic image %s', synthetic_image_id)
locations = repo_image.storage.locations
saved_image_path = store.image_layer_path(synthetic_image_id)
if store.exists(locations, saved_image_path):
logger.debug('Synthetic image %s exists in storage', synthetic_image_id)
download_url = store.get_direct_download_url(locations, saved_image_path)
if download_url:
logger.debug('Redirecting to download URL for synthetic image %s', synthetic_image_id)
return redirect(download_url, code=302)
logger.debug('Sending cached synthetic image %s', synthetic_image_id)
return send_file(store.stream_read_file(locations, saved_image_path))
# Load the ancestry for the image.
logger.debug('Building and returning synthetic image %s', synthetic_image_id)
uuid = repo_image.storage.uuid
ancestry_data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid))
full_image_list = json.loads(ancestry_data)
def get_next_layer():
for current_image_id in full_image_list:
current_image_entry = model.get_repo_image(namespace, repository, current_image_id)
current_image_path = store.image_layer_path(current_image_entry.storage.uuid)
current_image_stream = store.stream_read_file(current_image_entry.storage.locations,
current_image_path)
logger.debug('Returning image layer %s: %s' % (current_image_id, current_image_path))
yield current_image_stream
# Create a queue process to generate the data. The queue files will read from the process
# and send the results to the client and storage.
args = (namespace, repository, full_image_list)
queue_process = QueueProcess(_open_stream, 8 * 1024, 10 * 1024 * 1024, args) # 8K/10M chunk/max
stream = GzipWrap(StreamLayerMerger(get_next_layer).get_generator())
return send_file(stream)
client_queue_file = QueueFile(queue_process.create_queue(), 'client')
storage_queue_file = QueueFile(queue_process.create_queue(), 'storage')
# Start building.
queue_process.run()
# Start the storage saving.
storage_args = (namespace, repository, locations, synthetic_image_id, storage_queue_file)
QueueProcess.run_process(_write_image_to_storage, storage_args)
# Return the client's data.
return send_file(client_queue_file)
abort(403)

37
util/queuefile.py Normal file
View file

@ -0,0 +1,37 @@
from multiprocessing import Queue
import os
class QueueFile(object):
""" Class which implements a file-like interface and reads from a blocking
multiprocessing queue.
"""
def __init__(self, queue, name=None):
self._queue = queue
self._closed = False
self._done = False
self._buffer = ''
self._total_size = 0
self._name = name
def read(self, size=8192):
if self._closed or self._done:
return None
while len(self._buffer) < size:
result = self._queue.get(block=True)
if result is None:
self._done = True
break
self._buffer += result
self._total_size += len(result)
buf = self._buffer[0:size]
self._buffer = self._buffer[size:]
return buf
def flush(self):
pass
def close(self):
self._closed = True

57
util/queueprocess.py Normal file
View file

@ -0,0 +1,57 @@
from multiprocessing import Process, Queue
import logging
import multiprocessing
import os
import time
import gipc
logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.INFO)
class QueueProcess(object):
""" Helper class which invokes a worker in a process to produce
data for one (or more) queues.
"""
def __init__(self, get_producer, chunk_size, max_size, args):
self._get_producer = get_producer
self._queues = []
self._chunk_size = chunk_size
self._max_size = max_size
self._args = args or []
def create_queue(self):
""" Adds a multiprocessing queue to the list of queues. Any queues added
will have the data produced appended.
"""
queue = Queue(self._max_size / self._chunk_size)
self._queues.append(queue)
return queue
@staticmethod
def run_process(target, args):
gipc.start_process(target=target, args=args)
def run(self):
# Important! gipc is used here because normal multiprocessing does not work
# correctly with gevent when we sleep.
args = (self._get_producer, self._queues, self._chunk_size, self._args)
QueueProcess.run_process(_run, args)
def _run(get_producer, queues, chunk_size, args):
producer = get_producer(*args)
while True:
data = producer(chunk_size) or None
for queue in queues:
try:
queue.put(data, block=True, timeout=10)
except Exception as ex:
# One of the listeners stopped listening.
return
if data is None:
break
# Important! This allows the thread that writes the queue data to the pipe
# to do so. Otherwise, this hangs.
time.sleep(0)