Make sure to disconnect from the database when finished with the processes
This commit is contained in:
parent
efeb0dc655
commit
0ef17b082b
2 changed files with 20 additions and 6 deletions
|
@ -109,8 +109,14 @@ def get_squashed_tag(namespace, repository, tag):
|
||||||
|
|
||||||
# Create a queue process to generate the data. The queue files will read from the process
|
# Create a queue process to generate the data. The queue files will read from the process
|
||||||
# and send the results to the client and storage.
|
# and send the results to the client and storage.
|
||||||
|
def _cleanup():
|
||||||
|
# Close any existing DB connection once the process has exited.
|
||||||
|
database.close_db_filter(None)
|
||||||
|
|
||||||
args = (namespace, repository, tag, synthetic_image_id, image_json, full_image_list)
|
args = (namespace, repository, tag, synthetic_image_id, image_json, full_image_list)
|
||||||
queue_process = QueueProcess(_open_stream, 8 * 1024, 10 * 1024 * 1024, args) # 8K/10M chunk/max
|
queue_process = QueueProcess(_open_stream,
|
||||||
|
8 * 1024, 10 * 1024 * 1024, # 8K/10M chunk/max
|
||||||
|
args, finished=_cleanup)
|
||||||
|
|
||||||
client_queue_file = QueueFile(queue_process.create_queue(), 'client')
|
client_queue_file = QueueFile(queue_process.create_queue(), 'client')
|
||||||
storage_queue_file = QueueFile(queue_process.create_queue(), 'storage')
|
storage_queue_file = QueueFile(queue_process.create_queue(), 'storage')
|
||||||
|
@ -120,7 +126,7 @@ def get_squashed_tag(namespace, repository, tag):
|
||||||
|
|
||||||
# Start the storage saving.
|
# Start the storage saving.
|
||||||
storage_args = (derived.uuid, derived.locations, storage_queue_file)
|
storage_args = (derived.uuid, derived.locations, storage_queue_file)
|
||||||
QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args)
|
QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args, finished=_cleanup)
|
||||||
|
|
||||||
# Return the client's data.
|
# Return the client's data.
|
||||||
return send_file(client_queue_file)
|
return send_file(client_queue_file)
|
||||||
|
|
|
@ -13,12 +13,13 @@ class QueueProcess(object):
|
||||||
""" Helper class which invokes a worker in a process to produce
|
""" Helper class which invokes a worker in a process to produce
|
||||||
data for one (or more) queues.
|
data for one (or more) queues.
|
||||||
"""
|
"""
|
||||||
def __init__(self, get_producer, chunk_size, max_size, args):
|
def __init__(self, get_producer, chunk_size, max_size, args, finished=None):
|
||||||
self._get_producer = get_producer
|
self._get_producer = get_producer
|
||||||
self._queues = []
|
self._queues = []
|
||||||
self._chunk_size = chunk_size
|
self._chunk_size = chunk_size
|
||||||
self._max_size = max_size
|
self._max_size = max_size
|
||||||
self._args = args or []
|
self._args = args or []
|
||||||
|
self._finished = finished
|
||||||
|
|
||||||
def create_queue(self):
|
def create_queue(self):
|
||||||
""" Adds a multiprocessing queue to the list of queues. Any queues added
|
""" Adds a multiprocessing queue to the list of queues. Any queues added
|
||||||
|
@ -29,14 +30,21 @@ class QueueProcess(object):
|
||||||
return queue
|
return queue
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def run_process(target, args):
|
def run_process(target, args, finished=None):
|
||||||
Process(target=target, args=args).start()
|
def _target(tar, arg, fin):
|
||||||
|
try:
|
||||||
|
tar(*args)
|
||||||
|
finally:
|
||||||
|
if fin:
|
||||||
|
fin()
|
||||||
|
|
||||||
|
Process(target=_target, args=(target, args, finished)).start()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
# Important! gipc is used here because normal multiprocessing does not work
|
# Important! gipc is used here because normal multiprocessing does not work
|
||||||
# correctly with gevent when we sleep.
|
# correctly with gevent when we sleep.
|
||||||
args = (self._get_producer, self._queues, self._chunk_size, self._args)
|
args = (self._get_producer, self._queues, self._chunk_size, self._args)
|
||||||
QueueProcess.run_process(_run, args)
|
QueueProcess.run_process(_run, args, finished=self._finished)
|
||||||
|
|
||||||
def _run(get_producer, queues, chunk_size, args):
|
def _run(get_producer, queues, chunk_size, args):
|
||||||
producer = get_producer(*args)
|
producer = get_producer(*args)
|
||||||
|
|
Reference in a new issue