queue: explicitly declare ordering requirement

This change defaults the ordering requirement of queue items to be off
and only enables it for the build manager. This should make the queries
for getting queueitems significantly faster for every other use case.
This commit is contained in:
Jimmy Zelinskie 2016-05-24 17:42:11 -04:00
parent 79aa78906a
commit 44b56ae2cf
3 changed files with 64 additions and 38 deletions

View file

@ -163,7 +163,8 @@ class BuilderServer(object):
logger.debug('Checking for more work for %d active workers',
self._lifecycle_manager.num_workers())
job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time())
job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time(),
order_required=True)
if job_item is None:
logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT)
continue

View file

@ -1,6 +1,6 @@
from datetime import datetime, timedelta
from data.database import QueueItem, db, db_for_update
from data.database import QueueItem, db, db_for_update, db_random_func
from util.morecollections import AttrDict
@ -131,38 +131,45 @@ class WorkQueue(object):
self._metric_queue.put('Added', 1, dimensions={'queue': self._queue_name})
return r
def get(self, processing_time=300):
def get(self, processing_time=300, ordering_required=False):
"""
Get an available item and mark it as unavailable for the default of five
minutes. The result of this method must always be composed of simple
python objects which are JSON serializable for network portability reasons.
"""
now = datetime.utcnow()
name_match_query = self._name_match_query()
running = self._running_jobs(now, name_match_query)
avail = self._available_jobs_not_running(now, name_match_query, running)
item = None
try:
# The previous solution to this used a select for update in a
# transaction to prevent multiple instances from processing the
# same queue item. This suffered performance problems. This solution
# instead has instances attempt to update the potential queue item to be
# unavailable. However, since their update clause is restricted to items
# that are available=False, only one instance's update will succeed, and
# it will have a changed row count of 1. Instances that have 0 changed
# rows know that another instance is already handling that item.
db_item = avail.order_by(QueueItem.id).get()
changed_query = (QueueItem.update(
available=False,
processing_expires=now + timedelta(seconds=processing_time),
retries_remaining=QueueItem.retries_remaining-1,
)
.where(QueueItem.id == db_item.id))
changed_query = self._available_jobs_where(changed_query, now, name_match_query)
try:
if ordering_required:
# The previous solution to this used a select for update in a
# transaction to prevent multiple instances from processing the
# same queue item. This suffered performance problems. This solution
# instead has instances attempt to update the potential queue item to be
# unavailable. However, since their update clause is restricted to items
# that are available=False, only one instance's update will succeed, and
# it will have a changed row count of 1. Instances that have 0 changed
# rows know that another instance is already handling that item.
running = self._running_jobs(now, name_match_query)
avail = self._available_jobs_not_running(now, name_match_query, running)
db_item = avail.order_by(QueueItem.id).get()
else:
# If we don't require ordering, we can forego this whole subquery and
# instead try and grab any available item. We ORDER BY randomly in order
# to prevent races.
db_item = (self
._available_jobs(now, name_match_query)
.limit(50)
.order_by(db_random_func())
.get())
set_unavailable_query = (QueueItem
.update(available=False,
processing_expires=now + timedelta(seconds=processing_time),
retries_remaining=QueueItem.retries_remaining-1)
.where(QueueItem.id == db_item.id))
changed_query = self._available_jobs_where(set_unavailable_query, now, name_match_query)
changed = changed_query.execute()
if changed == 1:
item = AttrDict({

View file

@ -47,6 +47,7 @@ class AutoUpdatingQueue(object):
class QueueTestCase(unittest.TestCase):
TEST_MESSAGE_1 = json.dumps({'data': 1})
TEST_MESSAGE_2 = json.dumps({'data': 2})
TEST_MESSAGES = [json.dumps({'data': str(i)}) for i in range(1, 101)]
def setUp(self):
self.reporter = SaveLastCountReporter()
@ -71,14 +72,14 @@ class TestQueue(QueueTestCase):
self.assertEqual(self.reporter.running_count, 0)
self.assertEqual(self.reporter.total, 1)
one = self.queue.get()
one = self.queue.get(ordering_required=True)
self.assertNotEqual(None, one)
self.assertEqual(self.TEST_MESSAGE_1, one.body)
self.assertEqual(self.reporter.currently_processing, True)
self.assertEqual(self.reporter.running_count, 1)
self.assertEqual(self.reporter.total, 1)
two_fail = self.queue.get()
two_fail = self.queue.get(ordering_required=True)
self.assertEqual(None, two_fail)
self.assertEqual(self.reporter.running_count, 1)
self.assertEqual(self.reporter.total, 1)
@ -88,7 +89,7 @@ class TestQueue(QueueTestCase):
self.assertEqual(self.reporter.running_count, 0)
self.assertEqual(self.reporter.total, 1)
two = self.queue.get()
two = self.queue.get(ordering_required=True)
self.assertNotEqual(None, two)
self.assertEqual(self.reporter.currently_processing, True)
self.assertEqual(self.TEST_MESSAGE_2, two.body)
@ -101,13 +102,13 @@ class TestQueue(QueueTestCase):
self.assertEqual(self.reporter.running_count, 0)
self.assertEqual(self.reporter.total, 2)
one = self.queue.get()
one = self.queue.get(ordering_required=True)
self.assertNotEqual(None, one)
self.assertEqual(self.TEST_MESSAGE_1, one.body)
self.assertEqual(self.reporter.running_count, 1)
self.assertEqual(self.reporter.total, 2)
two = self.queue.get()
two = self.queue.get(ordering_required=True)
self.assertNotEqual(None, two)
self.assertEqual(self.TEST_MESSAGE_2, two.body)
self.assertEqual(self.reporter.running_count, 2)
@ -117,10 +118,10 @@ class TestQueue(QueueTestCase):
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
self.queue.put(['abc', 'def', 'ghi'], self.TEST_MESSAGE_1)
one = self.queue.get()
one = self.queue.get(ordering_required=True)
self.assertNotEqual(QUEUE_NAME + '/abc/def/', one)
two = self.queue.get()
two = self.queue.get(ordering_required=True)
self.assertNotEqual(QUEUE_NAME + '/abc/def/ghi/', two)
def test_expiration(self):
@ -128,12 +129,12 @@ class TestQueue(QueueTestCase):
self.assertEqual(self.reporter.running_count, 0)
self.assertEqual(self.reporter.total, 1)
one = self.queue.get(processing_time=0.5)
one = self.queue.get(processing_time=0.5, ordering_required=True)
self.assertNotEqual(None, one)
self.assertEqual(self.reporter.running_count, 1)
self.assertEqual(self.reporter.total, 1)
one_fail = self.queue.get()
one_fail = self.queue.get(ordering_required=True)
self.assertEqual(None, one_fail)
time.sleep(1)
@ -141,7 +142,7 @@ class TestQueue(QueueTestCase):
self.assertEqual(self.reporter.running_count, 0)
self.assertEqual(self.reporter.total, 1)
one_again = self.queue.get()
one_again = self.queue.get(ordering_required=True)
self.assertNotEqual(None, one_again)
self.assertEqual(self.reporter.running_count, 1)
self.assertEqual(self.reporter.total, 1)
@ -152,17 +153,34 @@ class TestQueue(QueueTestCase):
my_queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory, ['def']))
two = my_queue.get()
two = my_queue.get(ordering_required=True)
self.assertNotEqual(None, two)
self.assertEqual(self.TEST_MESSAGE_2, two.body)
one_fail = my_queue.get()
one_fail = my_queue.get(ordering_required=True)
self.assertEqual(None, one_fail)
one = self.queue.get()
one = self.queue.get(ordering_required=True)
self.assertNotEqual(None, one)
self.assertEqual(self.TEST_MESSAGE_1, one.body)
def test_random_queue_no_duplicates(self):
for msg in self.TEST_MESSAGES:
self.queue.put(['abc', 'def'], msg)
seen = set()
for _ in range(1, 101):
item = self.queue.get()
json_body = json.loads(item.body)
msg = str(json_body['data'])
self.assertTrue(msg not in seen)
seen.add(msg)
for body in self.TEST_MESSAGES:
json_body = json.loads(body)
msg = str(json_body['data'])
self.assertIn(msg, seen)
if __name__ == '__main__':
unittest.main()