Merge pull request #1621 from coreos-inc/queuefix
Fix race condition in Queue
This commit is contained in:
commit
7c727dfc9d
1 changed files with 10 additions and 15 deletions
|
@ -111,7 +111,7 @@ class WorkQueue(object):
|
||||||
dim = {'queue': self._queue_name}
|
dim = {'queue': self._queue_name}
|
||||||
self._metric_queue.put_deprecated('Running', running_count, dimensions=dim)
|
self._metric_queue.put_deprecated('Running', running_count, dimensions=dim)
|
||||||
self._metric_queue.put_deprecated('AvailableNotRunning', available_not_running_count,
|
self._metric_queue.put_deprecated('AvailableNotRunning', available_not_running_count,
|
||||||
dimensions=dim)
|
dimensions=dim)
|
||||||
self._metric_queue.put_deprecated('Available', available_count, dimensions=dim)
|
self._metric_queue.put_deprecated('Available', available_count, dimensions=dim)
|
||||||
|
|
||||||
self._metric_queue.work_queue_running.set(running_count, labelvalues=[self._queue_name])
|
self._metric_queue.work_queue_running.set(running_count, labelvalues=[self._queue_name])
|
||||||
|
@ -135,23 +135,17 @@ class WorkQueue(object):
|
||||||
Put an item, if it shouldn't be processed for some number of seconds,
|
Put an item, if it shouldn't be processed for some number of seconds,
|
||||||
specify that amount as available_after. Returns the ID of the queue item added.
|
specify that amount as available_after. Returns the ID of the queue item added.
|
||||||
"""
|
"""
|
||||||
|
item = QueueItem.create(
|
||||||
params = {
|
queue_name=self._canonical_name([self._queue_name] + canonical_name_list),
|
||||||
'queue_name': self._canonical_name([self._queue_name] + canonical_name_list),
|
body=message,
|
||||||
'body': message,
|
retries_remaining=retries_remaining,
|
||||||
'retries_remaining': retries_remaining,
|
available_after=datetime.utcnow() + timedelta(seconds=available_after or 0),
|
||||||
}
|
)
|
||||||
|
|
||||||
available_date = datetime.utcnow() + timedelta(seconds=available_after or 0)
|
|
||||||
params['available_after'] = available_date
|
|
||||||
|
|
||||||
with self._transaction_factory(db):
|
|
||||||
r = str(QueueItem.create(**params).id)
|
|
||||||
|
|
||||||
if self._metric_queue:
|
if self._metric_queue:
|
||||||
self._metric_queue.put_deprecated('Added', 1, dimensions={'queue': self._queue_name})
|
self._metric_queue.put_deprecated('Added', 1, dimensions={'queue': self._queue_name})
|
||||||
|
|
||||||
return r
|
return str(item.id)
|
||||||
|
|
||||||
def get(self, processing_time=300, ordering_required=False):
|
def get(self, processing_time=300, ordering_required=False):
|
||||||
"""
|
"""
|
||||||
|
@ -190,7 +184,8 @@ class WorkQueue(object):
|
||||||
processing_expires=now + timedelta(seconds=processing_time),
|
processing_expires=now + timedelta(seconds=processing_time),
|
||||||
retries_remaining=QueueItem.retries_remaining-1)
|
retries_remaining=QueueItem.retries_remaining-1)
|
||||||
.where(QueueItem.id == db_item.id))
|
.where(QueueItem.id == db_item.id))
|
||||||
changed_query = self._available_jobs_where(set_unavailable_query, now)
|
changed_query = (self._available_jobs_where(set_unavailable_query, now)
|
||||||
|
.where(QueueItem.processing_expires == db_item.processing_expires))
|
||||||
changed = changed_query.execute()
|
changed = changed_query.execute()
|
||||||
if changed == 1:
|
if changed == 1:
|
||||||
item = AttrDict({
|
item = AttrDict({
|
||||||
|
|
Reference in a new issue