From 0ef17b082b543b37e98a9b4702eaba52ecdae6fc Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 21 Oct 2014 17:40:57 -0400 Subject: [PATCH] Make sure to disconnect from the database when finished with the processes --- endpoints/verbs.py | 10 ++++++++-- util/queueprocess.py | 16 ++++++++++++---- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/endpoints/verbs.py b/endpoints/verbs.py index 7a808ef8e..581da0a17 100644 --- a/endpoints/verbs.py +++ b/endpoints/verbs.py @@ -109,8 +109,14 @@ def get_squashed_tag(namespace, repository, tag): # Create a queue process to generate the data. The queue files will read from the process # and send the results to the client and storage. + def _cleanup(): + # Close any existing DB connection once the process has exited. + database.close_db_filter(None) + args = (namespace, repository, tag, synthetic_image_id, image_json, full_image_list) - queue_process = QueueProcess(_open_stream, 8 * 1024, 10 * 1024 * 1024, args) # 8K/10M chunk/max + queue_process = QueueProcess(_open_stream, + 8 * 1024, 10 * 1024 * 1024, # 8K/10M chunk/max + args, finished=_cleanup) client_queue_file = QueueFile(queue_process.create_queue(), 'client') storage_queue_file = QueueFile(queue_process.create_queue(), 'storage') @@ -120,7 +126,7 @@ def get_squashed_tag(namespace, repository, tag): # Start the storage saving. storage_args = (derived.uuid, derived.locations, storage_queue_file) - QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args) + QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args, finished=_cleanup) # Return the client's data. return send_file(client_queue_file) diff --git a/util/queueprocess.py b/util/queueprocess.py index f6522589a..f39fc26f6 100644 --- a/util/queueprocess.py +++ b/util/queueprocess.py @@ -13,12 +13,13 @@ class QueueProcess(object): """ Helper class which invokes a worker in a process to produce data for one (or more) queues. """ - def __init__(self, get_producer, chunk_size, max_size, args): + def __init__(self, get_producer, chunk_size, max_size, args, finished=None): self._get_producer = get_producer self._queues = [] self._chunk_size = chunk_size self._max_size = max_size self._args = args or [] + self._finished = finished def create_queue(self): """ Adds a multiprocessing queue to the list of queues. Any queues added @@ -29,14 +30,21 @@ class QueueProcess(object): return queue @staticmethod - def run_process(target, args): - Process(target=target, args=args).start() + def run_process(target, args, finished=None): + def _target(tar, arg, fin): + try: + tar(*args) + finally: + if fin: + fin() + + Process(target=_target, args=(target, args, finished)).start() def run(self): # Important! gipc is used here because normal multiprocessing does not work # correctly with gevent when we sleep. args = (self._get_producer, self._queues, self._chunk_size, self._args) - QueueProcess.run_process(_run, args) + QueueProcess.run_process(_run, args, finished=self._finished) def _run(get_producer, queues, chunk_size, args): producer = get_producer(*args)