From 44b56ae2cfeedf7a01d881283b124c29b9659952 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 24 May 2016 17:42:11 -0400 Subject: [PATCH] queue: explicitly declare ordering requirement This change defaults the ordering requirement of queue items to be off and only enables it for the build manager. This should make the queries for getting queueitems significantly faster for every other use case. --- buildman/server.py | 3 ++- data/queue.py | 55 ++++++++++++++++++++++++++-------------------- test/test_queue.py | 44 ++++++++++++++++++++++++++----------- 3 files changed, 64 insertions(+), 38 deletions(-) diff --git a/buildman/server.py b/buildman/server.py index 80776f6e2..aa0ea827d 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -163,7 +163,8 @@ class BuilderServer(object): logger.debug('Checking for more work for %d active workers', self._lifecycle_manager.num_workers()) - job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time()) + job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time(), + order_required=True) if job_item is None: logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT) continue diff --git a/data/queue.py b/data/queue.py index 5b4408a4c..3f4a30f50 100644 --- a/data/queue.py +++ b/data/queue.py @@ -1,6 +1,6 @@ from datetime import datetime, timedelta -from data.database import QueueItem, db, db_for_update +from data.database import QueueItem, db, db_for_update, db_random_func from util.morecollections import AttrDict @@ -131,38 +131,45 @@ class WorkQueue(object): self._metric_queue.put('Added', 1, dimensions={'queue': self._queue_name}) return r - def get(self, processing_time=300): + def get(self, processing_time=300, ordering_required=False): """ Get an available item and mark it as unavailable for the default of five minutes. The result of this method must always be composed of simple python objects which are JSON serializable for network portability reasons. """ now = datetime.utcnow() - name_match_query = self._name_match_query() - - running = self._running_jobs(now, name_match_query) - avail = self._available_jobs_not_running(now, name_match_query, running) - item = None - try: - # The previous solution to this used a select for update in a - # transaction to prevent multiple instances from processing the - # same queue item. This suffered performance problems. This solution - # instead has instances attempt to update the potential queue item to be - # unavailable. However, since their update clause is restricted to items - # that are available=False, only one instance's update will succeed, and - # it will have a changed row count of 1. Instances that have 0 changed - # rows know that another instance is already handling that item. - db_item = avail.order_by(QueueItem.id).get() - changed_query = (QueueItem.update( - available=False, - processing_expires=now + timedelta(seconds=processing_time), - retries_remaining=QueueItem.retries_remaining-1, - ) - .where(QueueItem.id == db_item.id)) - changed_query = self._available_jobs_where(changed_query, now, name_match_query) + try: + if ordering_required: + # The previous solution to this used a select for update in a + # transaction to prevent multiple instances from processing the + # same queue item. This suffered performance problems. This solution + # instead has instances attempt to update the potential queue item to be + # unavailable. However, since their update clause is restricted to items + # that are available=False, only one instance's update will succeed, and + # it will have a changed row count of 1. Instances that have 0 changed + # rows know that another instance is already handling that item. + running = self._running_jobs(now, name_match_query) + avail = self._available_jobs_not_running(now, name_match_query, running) + db_item = avail.order_by(QueueItem.id).get() + else: + # If we don't require ordering, we can forego this whole subquery and + # instead try and grab any available item. We ORDER BY randomly in order + # to prevent races. + db_item = (self + ._available_jobs(now, name_match_query) + .limit(50) + .order_by(db_random_func()) + .get()) + + set_unavailable_query = (QueueItem + .update(available=False, + processing_expires=now + timedelta(seconds=processing_time), + retries_remaining=QueueItem.retries_remaining-1) + .where(QueueItem.id == db_item.id)) + changed_query = self._available_jobs_where(set_unavailable_query, now, name_match_query) changed = changed_query.execute() if changed == 1: item = AttrDict({ diff --git a/test/test_queue.py b/test/test_queue.py index 3d31978c8..4703ba8b9 100644 --- a/test/test_queue.py +++ b/test/test_queue.py @@ -47,6 +47,7 @@ class AutoUpdatingQueue(object): class QueueTestCase(unittest.TestCase): TEST_MESSAGE_1 = json.dumps({'data': 1}) TEST_MESSAGE_2 = json.dumps({'data': 2}) + TEST_MESSAGES = [json.dumps({'data': str(i)}) for i in range(1, 101)] def setUp(self): self.reporter = SaveLastCountReporter() @@ -71,14 +72,14 @@ class TestQueue(QueueTestCase): self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) - one = self.queue.get() + one = self.queue.get(ordering_required=True) self.assertNotEqual(None, one) self.assertEqual(self.TEST_MESSAGE_1, one.body) self.assertEqual(self.reporter.currently_processing, True) self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) - two_fail = self.queue.get() + two_fail = self.queue.get(ordering_required=True) self.assertEqual(None, two_fail) self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) @@ -88,7 +89,7 @@ class TestQueue(QueueTestCase): self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) - two = self.queue.get() + two = self.queue.get(ordering_required=True) self.assertNotEqual(None, two) self.assertEqual(self.reporter.currently_processing, True) self.assertEqual(self.TEST_MESSAGE_2, two.body) @@ -101,13 +102,13 @@ class TestQueue(QueueTestCase): self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 2) - one = self.queue.get() + one = self.queue.get(ordering_required=True) self.assertNotEqual(None, one) self.assertEqual(self.TEST_MESSAGE_1, one.body) self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 2) - two = self.queue.get() + two = self.queue.get(ordering_required=True) self.assertNotEqual(None, two) self.assertEqual(self.TEST_MESSAGE_2, two.body) self.assertEqual(self.reporter.running_count, 2) @@ -117,10 +118,10 @@ class TestQueue(QueueTestCase): self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1) self.queue.put(['abc', 'def', 'ghi'], self.TEST_MESSAGE_1) - one = self.queue.get() + one = self.queue.get(ordering_required=True) self.assertNotEqual(QUEUE_NAME + '/abc/def/', one) - two = self.queue.get() + two = self.queue.get(ordering_required=True) self.assertNotEqual(QUEUE_NAME + '/abc/def/ghi/', two) def test_expiration(self): @@ -128,12 +129,12 @@ class TestQueue(QueueTestCase): self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) - one = self.queue.get(processing_time=0.5) + one = self.queue.get(processing_time=0.5, ordering_required=True) self.assertNotEqual(None, one) self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) - one_fail = self.queue.get() + one_fail = self.queue.get(ordering_required=True) self.assertEqual(None, one_fail) time.sleep(1) @@ -141,7 +142,7 @@ class TestQueue(QueueTestCase): self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) - one_again = self.queue.get() + one_again = self.queue.get(ordering_required=True) self.assertNotEqual(None, one_again) self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) @@ -152,17 +153,34 @@ class TestQueue(QueueTestCase): my_queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory, ['def'])) - two = my_queue.get() + two = my_queue.get(ordering_required=True) self.assertNotEqual(None, two) self.assertEqual(self.TEST_MESSAGE_2, two.body) - one_fail = my_queue.get() + one_fail = my_queue.get(ordering_required=True) self.assertEqual(None, one_fail) - one = self.queue.get() + one = self.queue.get(ordering_required=True) self.assertNotEqual(None, one) self.assertEqual(self.TEST_MESSAGE_1, one.body) + def test_random_queue_no_duplicates(self): + for msg in self.TEST_MESSAGES: + self.queue.put(['abc', 'def'], msg) + seen = set() + + for _ in range(1, 101): + item = self.queue.get() + json_body = json.loads(item.body) + msg = str(json_body['data']) + self.assertTrue(msg not in seen) + seen.add(msg) + + for body in self.TEST_MESSAGES: + json_body = json.loads(body) + msg = str(json_body['data']) + self.assertIn(msg, seen) + if __name__ == '__main__': unittest.main()