From 609f4fccd8dbbcb939e94173d627c17e527e6df7 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Thu, 14 Jul 2016 14:52:59 -0400 Subject: [PATCH 1/2] data.queue: simplify put method --- data/queue.py | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/data/queue.py b/data/queue.py index dcb031fa5..601092982 100644 --- a/data/queue.py +++ b/data/queue.py @@ -111,7 +111,7 @@ class WorkQueue(object): dim = {'queue': self._queue_name} self._metric_queue.put_deprecated('Running', running_count, dimensions=dim) self._metric_queue.put_deprecated('AvailableNotRunning', available_not_running_count, - dimensions=dim) + dimensions=dim) self._metric_queue.put_deprecated('Available', available_count, dimensions=dim) self._metric_queue.work_queue_running.set(running_count, labelvalues=[self._queue_name]) @@ -135,23 +135,17 @@ class WorkQueue(object): Put an item, if it shouldn't be processed for some number of seconds, specify that amount as available_after. Returns the ID of the queue item added. """ - - params = { - 'queue_name': self._canonical_name([self._queue_name] + canonical_name_list), - 'body': message, - 'retries_remaining': retries_remaining, - } - - available_date = datetime.utcnow() + timedelta(seconds=available_after or 0) - params['available_after'] = available_date - - with self._transaction_factory(db): - r = str(QueueItem.create(**params).id) + item = QueueItem.create( + queue_name=self._canonical_name([self._queue_name] + canonical_name_list), + body=message, + retries_remaining=retries_remaining, + available_after=datetime.utcnow() + timedelta(seconds=available_after or 0), + ) if self._metric_queue: self._metric_queue.put_deprecated('Added', 1, dimensions={'queue': self._queue_name}) - return r + return str(item.id) def get(self, processing_time=300, ordering_required=False): """ From 64d0c5b675e44942cc137173952c1569a6682aa6 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Thu, 14 Jul 2016 14:53:16 -0400 Subject: [PATCH 2/2] data.queue: fix race condition It's possible that multiple consumers will acquire a queue item if they race on an expired item. To mitigate this, we check that the processing_expires time hasn't been changed since we last read. --- data/queue.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/data/queue.py b/data/queue.py index 601092982..f05ee9c8f 100644 --- a/data/queue.py +++ b/data/queue.py @@ -184,7 +184,8 @@ class WorkQueue(object): processing_expires=now + timedelta(seconds=processing_time), retries_remaining=QueueItem.retries_remaining-1) .where(QueueItem.id == db_item.id)) - changed_query = self._available_jobs_where(set_unavailable_query, now) + changed_query = (self._available_jobs_where(set_unavailable_query, now) + .where(QueueItem.processing_expires == db_item.processing_expires)) changed = changed_query.execute() if changed == 1: item = AttrDict({