From dcfd379b176f5bcd3238d3e03cabfb3c81b259e4 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 12 Jan 2017 13:10:52 -0500 Subject: [PATCH 1/5] Queue cancelation test --- test/test_queue.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/test/test_queue.py b/test/test_queue.py index 44041779a..ca7012263 100644 --- a/test/test_queue.py +++ b/test/test_queue.py @@ -244,6 +244,31 @@ class TestQueue(QueueTestCase): count = self.queue.num_available_jobs_between(now, now, 'abc') self.assertEqual(0, count) + def test_cancel(self): + # Add an item. + self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) + self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_2, available_after=-5) + + now = datetime.utcnow() + count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, + ['/somenamespace']) + self.assertEqual(2, count) + + # Retrieve it. + item = self.queue.get() + self.assertIsNotNone(item) + + # Make sure we can cancel it. + self.assertTrue(self.queue.cancel(item.id)) + + now = datetime.utcnow() + count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, + ['/somenamespace']) + self.assertEqual(1, count) + + # Make sure it is gone. + self.assertFalse(self.queue.cancel(item.id)) + def test_deleted_namespaced_items(self): self.queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory, reporter=self.reporter, From 939c122f701d1a3e6ccbaa592790f8ee17da483f Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 12 Jan 2017 13:21:59 -0500 Subject: [PATCH 2/5] Complete item queue test --- test/test_queue.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/test_queue.py b/test/test_queue.py index ca7012263..b066c2e2f 100644 --- a/test/test_queue.py +++ b/test/test_queue.py @@ -244,6 +244,24 @@ class TestQueue(QueueTestCase): count = self.queue.num_available_jobs_between(now, now, 'abc') self.assertEqual(0, count) + def test_complete(self): + # Add some items. + self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) + + now = datetime.utcnow() + count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, + ['/somenamespace']) + self.assertEqual(1, count) + + # Retrieve it. + item = self.queue.get() + self.assertIsNotNone(item) + self.assertTrue(self.reporter.currently_processing) + + # Mark it as complete. + self.queue.complete(item) + self.assertFalse(self.reporter.currently_processing) + def test_cancel(self): # Add an item. self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) From 7f63cbd14fb00355fb21433da06127e01d1c124c Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 12 Jan 2017 13:22:42 -0500 Subject: [PATCH 3/5] Remove `FOR UPDATE` in Queue cancel and complete We have no need for them anymore and it should reduce lock contention a bit Fixes #776 --- data/queue.py | 24 +++--------------------- 1 file changed, 3 insertions(+), 21 deletions(-) diff --git a/data/queue.py b/data/queue.py index dbdaadb7d..303a2706a 100644 --- a/data/queue.py +++ b/data/queue.py @@ -276,29 +276,11 @@ class WorkQueue(object): """ Attempts to cancel the queue item with the given ID from the queue. Returns true on success and false if the queue item could not be canceled. """ - - with self._transaction_factory(db): - # Load the build queue item for update. - try: - queue_item = db_for_update(QueueItem.select() - .where(QueueItem.id == item_id)).get() - except QueueItem.DoesNotExist: - return False - - # Delete the queue item. - queue_item.delete_instance(recursive=True) - return True + count_removed = QueueItem.delete().where(QueueItem.id == item_id).execute() + return count_removed > 0 def complete(self, completed_item): - with self._transaction_factory(db): - try: - completed_item_obj = self._item_by_id_for_update(completed_item.id) - except QueueItem.DoesNotExist: - self._currently_processing = False - return - - completed_item_obj.delete_instance(recursive=True) - self._currently_processing = False + self._currently_processing = not self.cancel(completed_item.id) def incomplete(self, incomplete_item, retry_after=300, restore_retry=False): with self._transaction_factory(db): From 19cb64df5d9548596ddb4875f6682e11ac436517 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 12 Jan 2017 13:43:49 -0500 Subject: [PATCH 4/5] Remove unused class --- data/queue.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/data/queue.py b/data/queue.py index 303a2706a..7cdd4e13a 100644 --- a/data/queue.py +++ b/data/queue.py @@ -9,14 +9,6 @@ MINIMUM_EXTENSION = timedelta(seconds=20) DEFAULT_BATCH_SIZE = 1000 -class NoopWith: - def __enter__(self): - pass - - def __exit__(self, type, value, traceback): - pass - - class BuildMetricQueueReporter(object): """ Metric queue reporter for the build system. """ def __init__(self, metric_queue): From 8c4e86f48b1f32db5064a8f9800156d765de93b2 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 12 Jan 2017 16:13:27 -0500 Subject: [PATCH 5/5] Change queue to use state-field for claiming items Before this change, the queue code would check that none of the fields on the item to be claimed had changed between the time when the item was selected and the item is claimed. While this is a safe approach, it also causes quite a bit of lock contention in MySQL, because InnoDB will take a lock on *any* rows examined by the `where` clause of the `update`, even if they will ultimately thrown out due to other clauses (See: http://dev.mysql.com/doc/refman/5.7/en/innodb-locks-set.html: "A ..., an UPDATE, ... generally set record locks on every index record that is scanned in the processing of the SQL statement. It does not matter whether there are WHERE conditions in the statement that would exclude the row. InnoDB does not remember the exact WHERE condition, but only knows which index ranges were scanned"). As a result, we want to minimize the number of fields accessed in the `where` clause on an update to the QueueItem row. To do so, we introduce a new `state_id` column, which is updated on *every change* to the QueueItem rows with a unique, random value. We can then have the queue item claiming code simply check that the `state_id` column has not changed between the retrieval and claiming steps. This minimizes the number of columns being checked to two (`id` and `state_id`), and thus, should significantly reduce lock contention. Note that we can not (yet) reduce to just a single `state_id` column (which should work in theory), because we need to maintain backwards compatibility with existing items in the QueueItem table, which will be given empty `state_id` values when the migration in this change runs. Also adds a number of tests for other queue operations that we want to make sure operate correctly following this change. [Delivers #133632501] --- data/database.py | 6 + ...1ec019f_add_state_id_field_to_queueitem.py | 28 +++++ data/queue.py | 111 ++++++++++++------ test/test_queue.py | 97 ++++++++++++++- 4 files changed, 206 insertions(+), 36 deletions(-) create mode 100644 data/migrations/versions/fc47c1ec019f_add_state_id_field_to_queueitem.py diff --git a/data/database.py b/data/database.py index d6df63504..85db74142 100644 --- a/data/database.py +++ b/data/database.py @@ -762,6 +762,7 @@ class QueueItem(BaseModel): available = BooleanField(default=True) processing_expires = DateTimeField(null=True) retries_remaining = IntegerField(default=5) + state_id = CharField(default=uuid_generator, index=True, unique=True) class Meta: database = db @@ -774,6 +775,11 @@ class QueueItem(BaseModel): (('processing_expires', 'available_after', 'queue_name', 'retries_remaining', 'available'), False), ) + def save(self, *args, **kwargs): + # Always change the queue item's state ID when we update it. + self.state_id = str(uuid.uuid4()) + super(QueueItem, self).save(*args, **kwargs) + class RepositoryBuild(BaseModel): uuid = CharField(default=uuid_generator, index=True) diff --git a/data/migrations/versions/fc47c1ec019f_add_state_id_field_to_queueitem.py b/data/migrations/versions/fc47c1ec019f_add_state_id_field_to_queueitem.py new file mode 100644 index 000000000..c5d5dcb2c --- /dev/null +++ b/data/migrations/versions/fc47c1ec019f_add_state_id_field_to_queueitem.py @@ -0,0 +1,28 @@ +"""Add state_id field to QueueItem + +Revision ID: fc47c1ec019f +Revises: f5167870dd66 +Create Date: 2017-01-12 15:44:23.643016 + +""" + +# revision identifiers, used by Alembic. +revision = 'fc47c1ec019f' +down_revision = 'f5167870dd66' + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + +def upgrade(tables): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('queueitem', sa.Column('state_id', sa.String(length=255), nullable=False, server_default='')) + op.create_index('queueitem_state_id', 'queueitem', ['state_id'], unique=True) + # ### end Alembic commands ### + + +def downgrade(tables): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index('queueitem_state_id', table_name='queueitem') + op.drop_column('queueitem', 'state_id') + # ### end Alembic commands ### diff --git a/data/queue.py b/data/queue.py index 7cdd4e13a..197e45a26 100644 --- a/data/queue.py +++ b/data/queue.py @@ -1,8 +1,11 @@ +import uuid + from datetime import datetime, timedelta from contextlib import contextmanager from data.database import QueueItem, db, db_for_update, db_random_func from util.morecollections import AttrDict +from hashlib import sha256 MINIMUM_EXTENSION = timedelta(seconds=20) @@ -167,7 +170,7 @@ class WorkQueue(object): except QueueItem.DoesNotExist: return False - def _queue_body(self, canonical_name_list, message, available_after, retries_remaining): + def _queue_dict(self, canonical_name_list, message, available_after, retries_remaining): return dict( queue_name=self._canonical_name([self._queue_name] + canonical_name_list), body=message, @@ -183,7 +186,7 @@ class WorkQueue(object): Put an item, if it shouldn't be processed for some number of seconds, specify that amount as available_after. Returns the ID of the queue item added. """ - items_to_insert.append(self._queue_body(canonical_name_list, message, available_after, + items_to_insert.append(self._queue_dict(canonical_name_list, message, available_after, retries_remaining)) yield batch_put @@ -203,7 +206,7 @@ class WorkQueue(object): Put an item, if it shouldn't be processed for some number of seconds, specify that amount as available_after. Returns the ID of the queue item added. """ - item = QueueItem.create(**self._queue_body(canonical_name_list, message, available_after, + item = QueueItem.create(**self._queue_dict(canonical_name_list, message, available_after, retries_remaining)) if self._metric_queue: @@ -211,15 +214,11 @@ class WorkQueue(object): return str(item.id) - def get(self, processing_time=300, ordering_required=False): + def _select_available_item(self, ordering_required, now): + """ Selects an available queue item from the queue table and returns it, if any. If none, + return None. """ - Get an available item and mark it as unavailable for the default of five - minutes. The result of this method must always be composed of simple - python objects which are JSON serializable for network portability reasons. - """ - now = datetime.utcnow() name_match_query = self._name_match_query() - item = None try: if ordering_required: @@ -233,36 +232,81 @@ class WorkQueue(object): # rows know that another instance is already handling that item. running = self._running_jobs(now, name_match_query) avail = self._available_jobs_not_running(now, name_match_query, running) - db_item = avail.order_by(QueueItem.id).get() + return avail.order_by(QueueItem.id).get() else: # If we don't require ordering, we grab a random item from any of the first 50 available. subquery = self._available_jobs(now, name_match_query).limit(50).alias('j1') - db_item = (QueueItem - .select() - .join(subquery, on=QueueItem.id == subquery.c.id) - .order_by(db_random_func()) - .get()) + return (QueueItem + .select() + .join(subquery, on=QueueItem.id == subquery.c.id) + .order_by(db_random_func()) + .get()) - set_unavailable_query = (QueueItem - .update(available=False, - processing_expires=now + timedelta(seconds=processing_time), - retries_remaining=QueueItem.retries_remaining-1) - .where(QueueItem.id == db_item.id)) - changed_query = (self._available_jobs_where(set_unavailable_query, now) - .where(QueueItem.processing_expires == db_item.processing_expires)) - changed = changed_query.execute() - if changed == 1: - item = AttrDict({ - 'id': db_item.id, - 'body': db_item.body, - 'retries_remaining': db_item.retries_remaining - 1, - }) - self._currently_processing = True except QueueItem.DoesNotExist: + # No available queue item was found. + return None + + def _attempt_to_claim_item(self, db_item, now, processing_time): + """ Attempts to claim the specified queue item for this instance. Returns True on success and + False on failure. + + Note that the underlying QueueItem row in the database will be changed on success, but + the db_item object given as a parameter will *not* have its fields updated. + """ + + # Try to claim the item. We do so by updating the item's information only if its current + # state ID matches that returned in the previous query. Since all updates to the QueueItem + # must change the state ID, this is guarenteed to only succeed if the item has not yet been + # claimed by another caller. + # + # Note that we use this method because InnoDB takes locks on *every* clause in the WHERE when + # performing the update. Previously, we would check all these columns, resulting in a bunch + # of lock contention. This change mitigates the problem significantly by only checking two + # columns (id and state_id), both of which should be absolutely unique at all times. + # + # TODO(jschorr): Remove the extra `processing_expires` check once this has been pushed to + # production and every box is updating state_id. + set_unavailable_query = (QueueItem + .update(available=False, + processing_expires=now + timedelta(seconds=processing_time), + retries_remaining=QueueItem.retries_remaining - 1, + state_id=str(uuid.uuid4())) + .where(QueueItem.id == db_item.id, + QueueItem.state_id == db_item.state_id, + QueueItem.processing_expires == db_item.processing_expires)) + + changed = set_unavailable_query.execute() + return changed == 1 + + + def get(self, processing_time=300, ordering_required=False): + """ + Get an available item and mark it as unavailable for the default of five + minutes. The result of this method must always be composed of simple + python objects which are JSON serializable for network portability reasons. + """ + now = datetime.utcnow() + + # Select an available queue item. + db_item = self._select_available_item(ordering_required, now) + if db_item is None: self._currently_processing = False + return None + + # Attempt to claim the item for this instance. + was_claimed = self._attempt_to_claim_item(db_item, now, processing_time) + if not was_claimed: + self._currently_processing = False + return None + + self._currently_processing = True # Return a view of the queue item rather than an active db object - return item + return AttrDict({ + 'id': db_item.id, + 'body': db_item.body, + 'retries_remaining': db_item.retries_remaining - 1, + }) def cancel(self, item_id): """ Attempts to cancel the queue item with the given ID from the queue. Returns true on success @@ -312,8 +356,9 @@ class WorkQueue(object): if has_change: queue_item.save() + return has_change except QueueItem.DoesNotExist: - return + return False def delete_expired(expiration_threshold, deletion_threshold, batch_size): diff --git a/test/test_queue.py b/test/test_queue.py index b066c2e2f..63e00c18a 100644 --- a/test/test_queue.py +++ b/test/test_queue.py @@ -7,8 +7,8 @@ from functools import wraps from app import app from initdb import setup_database_for_testing, finished_database_for_testing -from data.queue import WorkQueue -from datetime import timedelta +from data.database import QueueItem +from data.queue import WorkQueue, MINIMUM_EXTENSION QUEUE_NAME = 'testqueuename' @@ -63,6 +63,72 @@ class QueueTestCase(unittest.TestCase): class TestQueue(QueueTestCase): + def test_get_single_item(self): + # Add a single item to the queue. + self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) + + # Have two "instances" retrieve an item to claim. Since there is only one, both calls should + # return the same item. + now = datetime.utcnow() + first_item = self.queue._select_available_item(False, now) + second_item = self.queue._select_available_item(False, now) + + self.assertEquals(first_item.id, second_item.id) + self.assertEquals(first_item.state_id, second_item.state_id) + + # Have both "instances" now try to claim the item. Only one should succeed. + first_claimed = self.queue._attempt_to_claim_item(first_item, now, 300) + second_claimed = self.queue._attempt_to_claim_item(first_item, now, 300) + + self.assertTrue(first_claimed) + self.assertFalse(second_claimed) + + # Ensure the item is no longer available. + self.assertIsNone(self.queue.get()) + + # Ensure the item's state ID has changed. + self.assertNotEqual(first_item.state_id, QueueItem.get().state_id) + + def test_extend_processing(self): + # Add and retrieve a queue item. + self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) + queue_item = self.queue.get(processing_time=10) + self.assertIsNotNone(queue_item) + + existing_db_item = QueueItem.get(id=queue_item.id) + + # Call extend processing with a timedelta less than the minimum and ensure its + # processing_expires and state_id do not change. + changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() - 1) + self.assertFalse(changed) + + updated_db_item = QueueItem.get(id=queue_item.id) + + self.assertEquals(existing_db_item.processing_expires, updated_db_item.processing_expires) + self.assertEquals(existing_db_item.state_id, updated_db_item.state_id) + + # Call extend processing with a timedelta greater than the minimum and ensure its + # processing_expires and state_id are changed. + changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() + 1) + self.assertTrue(changed) + + updated_db_item = QueueItem.get(id=queue_item.id) + + self.assertNotEqual(existing_db_item.processing_expires, updated_db_item.processing_expires) + self.assertNotEqual(existing_db_item.state_id, updated_db_item.state_id) + + # Call extend processing with a timedelta less than the minimum but also with new data and + # ensure its processing_expires and state_id are changed. + changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() - 1, + updated_data='newbody') + self.assertTrue(changed) + + updated_db_item = QueueItem.get(id=queue_item.id) + + self.assertNotEqual(existing_db_item.processing_expires, updated_db_item.processing_expires) + self.assertNotEqual(existing_db_item.state_id, updated_db_item.state_id) + self.assertEquals('newbody', updated_db_item.body) + def test_same_canonical_names(self): self.assertEqual(self.reporter.currently_processing, None) self.assertEqual(self.reporter.running_count, None) @@ -244,8 +310,33 @@ class TestQueue(QueueTestCase): count = self.queue.num_available_jobs_between(now, now, 'abc') self.assertEqual(0, count) + def test_incomplete(self): + # Add an item. + self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) + + now = datetime.utcnow() + count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, + ['/somenamespace']) + self.assertEqual(1, count) + + # Retrieve it. + item = self.queue.get() + self.assertIsNotNone(item) + self.assertTrue(self.reporter.currently_processing) + + # Mark it as incomplete. + self.queue.incomplete(item, retry_after=-1) + self.assertFalse(self.reporter.currently_processing) + + # Retrieve again to ensure it is once again available. + same_item = self.queue.get() + self.assertIsNotNone(same_item) + self.assertTrue(self.reporter.currently_processing) + + self.assertEquals(item.id, same_item.id) + def test_complete(self): - # Add some items. + # Add an item. self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) now = datetime.utcnow()