diff --git a/buildman/server.py b/buildman/server.py index 80776f6e2..aa0ea827d 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -163,7 +163,8 @@ class BuilderServer(object): logger.debug('Checking for more work for %d active workers', self._lifecycle_manager.num_workers()) - job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time()) + job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time(), + order_required=True) if job_item is None: logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT) continue diff --git a/buildman/templates/cloudconfig.yaml b/buildman/templates/cloudconfig.yaml index 03487a038..6cc298382 100644 --- a/buildman/templates/cloudconfig.yaml +++ b/buildman/templates/cloudconfig.yaml @@ -1,12 +1,11 @@ #cloud-config ssh_authorized_keys: -- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCC0m+hVmyR3vn/xoxJe9+atRWBxSK+YXgyufNVDMcb7H00Jfnc341QH3kDVYZamUbhVh/nyc2RP7YbnZR5zORFtgOaNSdkMYrPozzBvxjnvSUokkCCWbLqXDHvIKiR12r+UTSijPJE/Yk702Mb2ejAFuae1C3Ec+qKAoOCagDjpQ3THyb5oaKE7VPHdwCWjWIQLRhC+plu77ObhoXIFJLD13gCi01L/rp4mYVCxIc2lX5A8rkK+bZHnIZwWUQ4t8SIjWxIaUo0FE7oZ83nKuNkYj5ngmLHQLY23Nx2WhE9H6NBthUpik9SmqQPtVYbhIG+bISPoH9Xs8CLrFb0VRjz Joey's Mac -- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCo6FhAP7mFFOAzM91gtaKW7saahtaN4lur42FMMztz6aqUycIltCmvxo+3FmrXgCG30maMNU36Vm1+9QRtVQEd+eRuoIWP28t+8MT01Fh4zPuE2Wca3pOHSNo3X81FfWJLzmwEHiQKs9HPQqUhezR9PcVWVkbMyAzw85c0UycGmHGFNb0UiRd9HFY6XbgbxhZv/mvKLZ99xE3xkOzS1PNsdSNvjUKwZR7pSUPqNS5S/1NXyR4GhFTU24VPH/bTATOv2ATH+PSzsZ7Qyz9UHj38tKC+ALJHEDJ4HXGzobyOUP78cHGZOfCB5FYubq0zmOudAjKIAhwI8XTFvJ2DX1P3 jimmyzelinskie -- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDNvw8qo9m8np7yQ/Smv/oklM8bo8VyNRZriGYBDuolWDL/mZpYCQnZJXphQo7RFdNABYistikjJlBuuwUohLf2uSq0iKoFa2TgwI43wViWzvuzU4nA02/ITD5BZdmWAFNyIoqeB50Ol4qUgDwLAZ+7Kv7uCi6chcgr9gTi99jY3GHyZjrMiXMHGVGi+FExFuzhVC2drKjbz5q6oRfQeLtNfG4psl5GU3MQU6FkX4fgoCx0r9R48/b7l4+TT7pWblJQiRfeldixu6308vyoTUEHasdkU3/X0OTaGz/h5XqTKnGQc6stvvoED3w+L3QFp0H5Z8sZ9stSsitmCBrmbcKZ jakemoshenko -- ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAgEAo/JkbGO6R7g1ZxARi0xWVM7FOfN02snRAcIO6vT9M7xMUkWVLgD+hM/o91lk+UFiYdql0CATobpFWncRL36KaUqsbw9/1BlI40wg296XHXSSnxhxZ4L7ytf6G1tyN319HXlI2kh9vAf/fy++yDvkH8dI3k1oLoW+mZPET6Pff04/6AXXrRlS5mhmGv9irGwiDHtVKpj6lU8DN/UtOrv1tiQ0pgwEJq05fLGoQfgPNaBCnW2z4Ubpn2gyMcMBMpSwo4hCqJePd349e4bLmFcT+gXYg7Mnup1DoTDlowFFN56wpxQbdp96IxWzU+jYPaIAuRo+BJzCyOS8qBv0Z4RZrgop0qp2JYiVwmViO6TZhIDz6loQJXUOIleQmNgTbiZx8Bwv5GY2jMYoVwlBp7yy5bRjxfbFsJ0vU7TVzNAG7oEJy/74HmHmWzRQlSlQjesr8gRbm9zgR8wqc/L107UOWFg7Cgh8ZNjKuADbXqYuda1Y9m2upcfS26UPz5l5PW5uFRMHZSi8pb1XV6/0Z8H8vwsh37Ur6aLi/5jruRmKhdlsNrB1IiDicBsPW3yg7HHSIdPU4oBNPC77yDCT3l4CKr4el81RrZt7FbJPfY+Ig9Q5O+05f6I8+ZOlJGyZ/Qfyl2aVm1HnlJKuBqPxeic8tMng/9B5N7uZL6Y3k5jFU8c= quentin -- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDI7LtxLItapmUbt3Gs+4Oxa1i22fkx1+aJDkAjiRWPSX3+cxOzuPfHX9uFzr+qj5hy4J7ErrPp8q9alu+il9lE26GQuUxOZiaUrXu4dRCXXdCqTHARWBxGUXjkxdMp2HIzFpBxmVqcRubrgM36LBzKapdDOqQdz7XnNm5Jmf0tH/N0+TgV60P0WVY1CxmTya+JHNFVgazhd+oIGEhTyW/eszMGcFUgZet7DQFytYIQXYSwwGpGdJ+0InKAJ2SzCt/yuUlSrhrVM8vSGeami1XYmgQiyth1zjteMd8uTrc9NREH7bZTNcMFBqVYE3BYQWGRrv8pMMgP9gxgLbxtVsUl barakmich-titania -- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDiNawWSZL2MF99zwG9cFjGmML6agsKwaacQEoTsjcjHGixyUnqHXaLdrGma5i/uphZPkI5XRBKiuIROACY/aRoIxJUpV7AQ1Zx87cILx6fDVePvU5lW2DdhlCDUdwjuzDb/WO/c/qMWjOPqRG4q8XvB7nhuORMMgdpDXWVH4LXPmFez1iIBCKNk04l6Se7wiEOQjaBnTDiBDYlWD78r6RdiAU5eIxpq+lKBDTcET0vegwcA/WE4YOlYBbOrgtHrgwWqG/pXxUu77aapDOmfjtDrgim6XP5kEnytg5gCaN9iLvIpT8b1wD/1Z+LoNSZg6m9gkcC2yTRI0apOBa2G8lz silas@pro.local +- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCC0m+hVmyR3vn/xoxJe9+atRWBxSK+YXgyufNVDMcb7H00Jfnc341QH3kDVYZamUbhVh/nyc2RP7YbnZR5zORFtgOaNSdkMYrPozzBvxjnvSUokkCCWbLqXDHvIKiR12r+UTSijPJE/Yk702Mb2ejAFuae1C3Ec+qKAoOCagDjpQ3THyb5oaKE7VPHdwCWjWIQLRhC+plu77ObhoXIFJLD13gCi01L/rp4mYVCxIc2lX5A8rkK+bZHnIZwWUQ4t8SIjWxIaUo0FE7oZ83nKuNkYj5ngmLHQLY23Nx2WhE9H6NBthUpik9SmqQPtVYbhIG+bISPoH9Xs8CLrFb0VRjz Joey Schorr +- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCo6FhAP7mFFOAzM91gtaKW7saahtaN4lur42FMMztz6aqUycIltCmvxo+3FmrXgCG30maMNU36Vm1+9QRtVQEd+eRuoIWP28t+8MT01Fh4zPuE2Wca3pOHSNo3X81FfWJLzmwEHiQKs9HPQqUhezR9PcVWVkbMyAzw85c0UycGmHGFNb0UiRd9HFY6XbgbxhZv/mvKLZ99xE3xkOzS1PNsdSNvjUKwZR7pSUPqNS5S/1NXyR4GhFTU24VPH/bTATOv2ATH+PSzsZ7Qyz9UHj38tKC+ALJHEDJ4HXGzobyOUP78cHGZOfCB5FYubq0zmOudAjKIAhwI8XTFvJ2DX1P3 Jimmy Zelinskie +- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDNvw8qo9m8np7yQ/Smv/oklM8bo8VyNRZriGYBDuolWDL/mZpYCQnZJXphQo7RFdNABYistikjJlBuuwUohLf2uSq0iKoFa2TgwI43wViWzvuzU4nA02/ITD5BZdmWAFNyIoqeB50Ol4qUgDwLAZ+7Kv7uCi6chcgr9gTi99jY3GHyZjrMiXMHGVGi+FExFuzhVC2drKjbz5q6oRfQeLtNfG4psl5GU3MQU6FkX4fgoCx0r9R48/b7l4+TT7pWblJQiRfeldixu6308vyoTUEHasdkU3/X0OTaGz/h5XqTKnGQc6stvvoED3w+L3QFp0H5Z8sZ9stSsitmCBrmbcKZ Jake Moshenko +- ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAgEAo/JkbGO6R7g1ZxARi0xWVM7FOfN02snRAcIO6vT9M7xMUkWVLgD+hM/o91lk+UFiYdql0CATobpFWncRL36KaUqsbw9/1BlI40wg296XHXSSnxhxZ4L7ytf6G1tyN319HXlI2kh9vAf/fy++yDvkH8dI3k1oLoW+mZPET6Pff04/6AXXrRlS5mhmGv9irGwiDHtVKpj6lU8DN/UtOrv1tiQ0pgwEJq05fLGoQfgPNaBCnW2z4Ubpn2gyMcMBMpSwo4hCqJePd349e4bLmFcT+gXYg7Mnup1DoTDlowFFN56wpxQbdp96IxWzU+jYPaIAuRo+BJzCyOS8qBv0Z4RZrgop0qp2JYiVwmViO6TZhIDz6loQJXUOIleQmNgTbiZx8Bwv5GY2jMYoVwlBp7yy5bRjxfbFsJ0vU7TVzNAG7oEJy/74HmHmWzRQlSlQjesr8gRbm9zgR8wqc/L107UOWFg7Cgh8ZNjKuADbXqYuda1Y9m2upcfS26UPz5l5PW5uFRMHZSi8pb1XV6/0Z8H8vwsh37Ur6aLi/5jruRmKhdlsNrB1IiDicBsPW3yg7HHSIdPU4oBNPC77yDCT3l4CKr4el81RrZt7FbJPfY+Ig9Q5O+05f6I8+ZOlJGyZ/Qfyl2aVm1HnlJKuBqPxeic8tMng/9B5N7uZL6Y3k5jFU8c= Quentin Machu +- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQC964SY8ojXZVfWknF+Pz+pTHpyb66VBH7OLYnGP+Tm452YKJVFb/rXCpZYHFlzSQtzz9hko8qBoEFXuD2humojx0P7nEtTy8wUClnKcifIqD5b/V1r7ZDa/5hL9Xog11gOXZ17TW1qjN+00qgXwoSh+jM8mAxD7V2ZLnanIDqmpYamT3ZlICz1k4bwYj35gnpSFpijAXeF9LXOEUfDtzNBjeaCvyniYlQyKzpKr8x+oIHumPlxwkFOzGhBMRGrCQ1Kzija8vVZQ6/Tjvxl19jwfgcNT0Zd9vLbHNowJPWQZhLYXdGIb3NxEfAqkGPvGCsaLfsfETYhcFwxr2g+zvf4xvyKgK35PHA/5t7TQryDSKDrQ1qTDUp3dAjzwsBFwEoQ0x68shGC661n/+APMNtj8qR5M9ueIH5WEqdRW10kKzlEm/ESvjyjEVRhXiwWyKkPch/OIUPKexKaEeOBdKocSnNx1+5ntk8OXWRQgjfwtQvm1NE/qD7fViBVUlTRk0c1SVpZaybIZkiMWmA1hzsdUbDP2mzPek1ydsVffw0I8z/dRo5gXQSPq06WfNIKpsiQF8LqP+KU+462A2tbHxFzq9VozI9PeFV+xO59wlJogv6q2yA0Jfv9BFgVgNzItIsUMvStrfkUBTYgaG9djp/vAm+SwMdnLSXILJtMO/3eRQ== Evan Cordell write_files: - path: /root/overrides.list diff --git a/data/queue.py b/data/queue.py index 5b4408a4c..720f84beb 100644 --- a/data/queue.py +++ b/data/queue.py @@ -1,6 +1,6 @@ from datetime import datetime, timedelta -from data.database import QueueItem, db, db_for_update +from data.database import QueueItem, db, db_for_update, db_random_func from util.morecollections import AttrDict @@ -42,30 +42,37 @@ class WorkQueue(object): def _canonical_name(name_list): return '/'.join(name_list) + '/' - def _running_jobs(self, now, name_match_query): + @classmethod + def _running_jobs(cls, now, name_match_query): return (QueueItem .select(QueueItem.queue_name) .where(QueueItem.available == False, QueueItem.processing_expires > now, QueueItem.queue_name ** name_match_query)) - def _available_jobs(self, now, name_match_query): - return self._available_jobs_where(QueueItem.select(), now, name_match_query) + @classmethod + def _available_jobs(cls, now, name_match_query): + return (cls + ._available_jobs_where(QueueItem.select(), now) + .where(QueueItem.queue_name ** name_match_query)) - def _available_jobs_where(self, query, now, name_match_query): - return query.where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now, - ((QueueItem.available == True) | (QueueItem.processing_expires <= now)), - QueueItem.retries_remaining > 0) + @staticmethod + def _available_jobs_where(query, now): + return query.where(QueueItem.available_after <= now, + ((QueueItem.available == True) | (QueueItem.processing_expires <= now)), + QueueItem.retries_remaining > 0) - def _available_jobs_not_running(self, now, name_match_query, running_query): - return (self + @classmethod + def _available_jobs_not_running(cls, now, name_match_query, running_query): + return (cls ._available_jobs(now, name_match_query) .where(~(QueueItem.queue_name << running_query))) def _name_match_query(self): return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list) - def _item_by_id_for_update(self, queue_id): + @staticmethod + def _item_by_id_for_update(queue_id): return db_for_update(QueueItem.select().where(QueueItem.id == queue_id)).get() def get_metrics(self): @@ -80,8 +87,10 @@ class WorkQueue(object): available_not_running_query = self._available_jobs_not_running(now, name_match_query, running_query) - available_not_running_count = (available_not_running_query.select(QueueItem.queue_name) - .distinct().count()) + available_not_running_count = (available_not_running_query + .select(QueueItem.queue_name) + .distinct() + .count()) return (running_count, available_not_running_count, available_count) @@ -131,38 +140,44 @@ class WorkQueue(object): self._metric_queue.put('Added', 1, dimensions={'queue': self._queue_name}) return r - def get(self, processing_time=300): + def get(self, processing_time=300, ordering_required=False): """ Get an available item and mark it as unavailable for the default of five minutes. The result of this method must always be composed of simple python objects which are JSON serializable for network portability reasons. """ now = datetime.utcnow() - name_match_query = self._name_match_query() - - running = self._running_jobs(now, name_match_query) - avail = self._available_jobs_not_running(now, name_match_query, running) - item = None - try: - # The previous solution to this used a select for update in a - # transaction to prevent multiple instances from processing the - # same queue item. This suffered performance problems. This solution - # instead has instances attempt to update the potential queue item to be - # unavailable. However, since their update clause is restricted to items - # that are available=False, only one instance's update will succeed, and - # it will have a changed row count of 1. Instances that have 0 changed - # rows know that another instance is already handling that item. - db_item = avail.order_by(QueueItem.id).get() - changed_query = (QueueItem.update( - available=False, - processing_expires=now + timedelta(seconds=processing_time), - retries_remaining=QueueItem.retries_remaining-1, - ) - .where(QueueItem.id == db_item.id)) - changed_query = self._available_jobs_where(changed_query, now, name_match_query) + try: + if ordering_required: + # The previous solution to this used a select for update in a + # transaction to prevent multiple instances from processing the + # same queue item. This suffered performance problems. This solution + # instead has instances attempt to update the potential queue item to be + # unavailable. However, since their update clause is restricted to items + # that are available=False, only one instance's update will succeed, and + # it will have a changed row count of 1. Instances that have 0 changed + # rows know that another instance is already handling that item. + running = self._running_jobs(now, name_match_query) + avail = self._available_jobs_not_running(now, name_match_query, running) + db_item = avail.order_by(QueueItem.id).get() + else: + # If we don't require ordering, we grab a random item from any of the first 50 available. + subquery = self._available_jobs(now, name_match_query).limit(50).alias('j1') + db_item = (QueueItem + .select() + .join(subquery, on=QueueItem.id == subquery.c.id) + .order_by(db_random_func()) + .get()) + + set_unavailable_query = (QueueItem + .update(available=False, + processing_expires=now + timedelta(seconds=processing_time), + retries_remaining=QueueItem.retries_remaining-1) + .where(QueueItem.id == db_item.id)) + changed_query = self._available_jobs_where(set_unavailable_query, now) changed = changed_query.execute() if changed == 1: item = AttrDict({ @@ -187,7 +202,7 @@ class WorkQueue(object): # Load the build queue item for update. try: queue_item = db_for_update(QueueItem.select() - .where(QueueItem.id == item_id)).get() + .where(QueueItem.id == item_id)).get() except QueueItem.DoesNotExist: return False diff --git a/requirements-dev.txt b/requirements-dev.txt index d1e4fccab..916966dbc 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,3 +1,4 @@ ipython pylint +ipdb tqdm diff --git a/test/test_queue.py b/test/test_queue.py index 3d31978c8..4703ba8b9 100644 --- a/test/test_queue.py +++ b/test/test_queue.py @@ -47,6 +47,7 @@ class AutoUpdatingQueue(object): class QueueTestCase(unittest.TestCase): TEST_MESSAGE_1 = json.dumps({'data': 1}) TEST_MESSAGE_2 = json.dumps({'data': 2}) + TEST_MESSAGES = [json.dumps({'data': str(i)}) for i in range(1, 101)] def setUp(self): self.reporter = SaveLastCountReporter() @@ -71,14 +72,14 @@ class TestQueue(QueueTestCase): self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) - one = self.queue.get() + one = self.queue.get(ordering_required=True) self.assertNotEqual(None, one) self.assertEqual(self.TEST_MESSAGE_1, one.body) self.assertEqual(self.reporter.currently_processing, True) self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) - two_fail = self.queue.get() + two_fail = self.queue.get(ordering_required=True) self.assertEqual(None, two_fail) self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) @@ -88,7 +89,7 @@ class TestQueue(QueueTestCase): self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) - two = self.queue.get() + two = self.queue.get(ordering_required=True) self.assertNotEqual(None, two) self.assertEqual(self.reporter.currently_processing, True) self.assertEqual(self.TEST_MESSAGE_2, two.body) @@ -101,13 +102,13 @@ class TestQueue(QueueTestCase): self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 2) - one = self.queue.get() + one = self.queue.get(ordering_required=True) self.assertNotEqual(None, one) self.assertEqual(self.TEST_MESSAGE_1, one.body) self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 2) - two = self.queue.get() + two = self.queue.get(ordering_required=True) self.assertNotEqual(None, two) self.assertEqual(self.TEST_MESSAGE_2, two.body) self.assertEqual(self.reporter.running_count, 2) @@ -117,10 +118,10 @@ class TestQueue(QueueTestCase): self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1) self.queue.put(['abc', 'def', 'ghi'], self.TEST_MESSAGE_1) - one = self.queue.get() + one = self.queue.get(ordering_required=True) self.assertNotEqual(QUEUE_NAME + '/abc/def/', one) - two = self.queue.get() + two = self.queue.get(ordering_required=True) self.assertNotEqual(QUEUE_NAME + '/abc/def/ghi/', two) def test_expiration(self): @@ -128,12 +129,12 @@ class TestQueue(QueueTestCase): self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) - one = self.queue.get(processing_time=0.5) + one = self.queue.get(processing_time=0.5, ordering_required=True) self.assertNotEqual(None, one) self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) - one_fail = self.queue.get() + one_fail = self.queue.get(ordering_required=True) self.assertEqual(None, one_fail) time.sleep(1) @@ -141,7 +142,7 @@ class TestQueue(QueueTestCase): self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) - one_again = self.queue.get() + one_again = self.queue.get(ordering_required=True) self.assertNotEqual(None, one_again) self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) @@ -152,17 +153,34 @@ class TestQueue(QueueTestCase): my_queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory, ['def'])) - two = my_queue.get() + two = my_queue.get(ordering_required=True) self.assertNotEqual(None, two) self.assertEqual(self.TEST_MESSAGE_2, two.body) - one_fail = my_queue.get() + one_fail = my_queue.get(ordering_required=True) self.assertEqual(None, one_fail) - one = self.queue.get() + one = self.queue.get(ordering_required=True) self.assertNotEqual(None, one) self.assertEqual(self.TEST_MESSAGE_1, one.body) + def test_random_queue_no_duplicates(self): + for msg in self.TEST_MESSAGES: + self.queue.put(['abc', 'def'], msg) + seen = set() + + for _ in range(1, 101): + item = self.queue.get() + json_body = json.loads(item.body) + msg = str(json_body['data']) + self.assertTrue(msg not in seen) + seen.add(msg) + + for body in self.TEST_MESSAGES: + json_body = json.loads(body) + msg = str(json_body['data']) + self.assertIn(msg, seen) + if __name__ == '__main__': unittest.main()