diff --git a/data/database.py b/data/database.py index d6df63504..85db74142 100644 --- a/data/database.py +++ b/data/database.py @@ -762,6 +762,7 @@ class QueueItem(BaseModel): available = BooleanField(default=True) processing_expires = DateTimeField(null=True) retries_remaining = IntegerField(default=5) + state_id = CharField(default=uuid_generator, index=True, unique=True) class Meta: database = db @@ -774,6 +775,11 @@ class QueueItem(BaseModel): (('processing_expires', 'available_after', 'queue_name', 'retries_remaining', 'available'), False), ) + def save(self, *args, **kwargs): + # Always change the queue item's state ID when we update it. + self.state_id = str(uuid.uuid4()) + super(QueueItem, self).save(*args, **kwargs) + class RepositoryBuild(BaseModel): uuid = CharField(default=uuid_generator, index=True) diff --git a/data/migrations/versions/fc47c1ec019f_add_state_id_field_to_queueitem.py b/data/migrations/versions/fc47c1ec019f_add_state_id_field_to_queueitem.py new file mode 100644 index 000000000..c5d5dcb2c --- /dev/null +++ b/data/migrations/versions/fc47c1ec019f_add_state_id_field_to_queueitem.py @@ -0,0 +1,28 @@ +"""Add state_id field to QueueItem + +Revision ID: fc47c1ec019f +Revises: f5167870dd66 +Create Date: 2017-01-12 15:44:23.643016 + +""" + +# revision identifiers, used by Alembic. +revision = 'fc47c1ec019f' +down_revision = 'f5167870dd66' + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + +def upgrade(tables): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('queueitem', sa.Column('state_id', sa.String(length=255), nullable=False, server_default='')) + op.create_index('queueitem_state_id', 'queueitem', ['state_id'], unique=True) + # ### end Alembic commands ### + + +def downgrade(tables): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index('queueitem_state_id', table_name='queueitem') + op.drop_column('queueitem', 'state_id') + # ### end Alembic commands ### diff --git a/data/queue.py b/data/queue.py index 7cdd4e13a..197e45a26 100644 --- a/data/queue.py +++ b/data/queue.py @@ -1,8 +1,11 @@ +import uuid + from datetime import datetime, timedelta from contextlib import contextmanager from data.database import QueueItem, db, db_for_update, db_random_func from util.morecollections import AttrDict +from hashlib import sha256 MINIMUM_EXTENSION = timedelta(seconds=20) @@ -167,7 +170,7 @@ class WorkQueue(object): except QueueItem.DoesNotExist: return False - def _queue_body(self, canonical_name_list, message, available_after, retries_remaining): + def _queue_dict(self, canonical_name_list, message, available_after, retries_remaining): return dict( queue_name=self._canonical_name([self._queue_name] + canonical_name_list), body=message, @@ -183,7 +186,7 @@ class WorkQueue(object): Put an item, if it shouldn't be processed for some number of seconds, specify that amount as available_after. Returns the ID of the queue item added. """ - items_to_insert.append(self._queue_body(canonical_name_list, message, available_after, + items_to_insert.append(self._queue_dict(canonical_name_list, message, available_after, retries_remaining)) yield batch_put @@ -203,7 +206,7 @@ class WorkQueue(object): Put an item, if it shouldn't be processed for some number of seconds, specify that amount as available_after. Returns the ID of the queue item added. """ - item = QueueItem.create(**self._queue_body(canonical_name_list, message, available_after, + item = QueueItem.create(**self._queue_dict(canonical_name_list, message, available_after, retries_remaining)) if self._metric_queue: @@ -211,15 +214,11 @@ class WorkQueue(object): return str(item.id) - def get(self, processing_time=300, ordering_required=False): + def _select_available_item(self, ordering_required, now): + """ Selects an available queue item from the queue table and returns it, if any. If none, + return None. """ - Get an available item and mark it as unavailable for the default of five - minutes. The result of this method must always be composed of simple - python objects which are JSON serializable for network portability reasons. - """ - now = datetime.utcnow() name_match_query = self._name_match_query() - item = None try: if ordering_required: @@ -233,36 +232,81 @@ class WorkQueue(object): # rows know that another instance is already handling that item. running = self._running_jobs(now, name_match_query) avail = self._available_jobs_not_running(now, name_match_query, running) - db_item = avail.order_by(QueueItem.id).get() + return avail.order_by(QueueItem.id).get() else: # If we don't require ordering, we grab a random item from any of the first 50 available. subquery = self._available_jobs(now, name_match_query).limit(50).alias('j1') - db_item = (QueueItem - .select() - .join(subquery, on=QueueItem.id == subquery.c.id) - .order_by(db_random_func()) - .get()) + return (QueueItem + .select() + .join(subquery, on=QueueItem.id == subquery.c.id) + .order_by(db_random_func()) + .get()) - set_unavailable_query = (QueueItem - .update(available=False, - processing_expires=now + timedelta(seconds=processing_time), - retries_remaining=QueueItem.retries_remaining-1) - .where(QueueItem.id == db_item.id)) - changed_query = (self._available_jobs_where(set_unavailable_query, now) - .where(QueueItem.processing_expires == db_item.processing_expires)) - changed = changed_query.execute() - if changed == 1: - item = AttrDict({ - 'id': db_item.id, - 'body': db_item.body, - 'retries_remaining': db_item.retries_remaining - 1, - }) - self._currently_processing = True except QueueItem.DoesNotExist: + # No available queue item was found. + return None + + def _attempt_to_claim_item(self, db_item, now, processing_time): + """ Attempts to claim the specified queue item for this instance. Returns True on success and + False on failure. + + Note that the underlying QueueItem row in the database will be changed on success, but + the db_item object given as a parameter will *not* have its fields updated. + """ + + # Try to claim the item. We do so by updating the item's information only if its current + # state ID matches that returned in the previous query. Since all updates to the QueueItem + # must change the state ID, this is guarenteed to only succeed if the item has not yet been + # claimed by another caller. + # + # Note that we use this method because InnoDB takes locks on *every* clause in the WHERE when + # performing the update. Previously, we would check all these columns, resulting in a bunch + # of lock contention. This change mitigates the problem significantly by only checking two + # columns (id and state_id), both of which should be absolutely unique at all times. + # + # TODO(jschorr): Remove the extra `processing_expires` check once this has been pushed to + # production and every box is updating state_id. + set_unavailable_query = (QueueItem + .update(available=False, + processing_expires=now + timedelta(seconds=processing_time), + retries_remaining=QueueItem.retries_remaining - 1, + state_id=str(uuid.uuid4())) + .where(QueueItem.id == db_item.id, + QueueItem.state_id == db_item.state_id, + QueueItem.processing_expires == db_item.processing_expires)) + + changed = set_unavailable_query.execute() + return changed == 1 + + + def get(self, processing_time=300, ordering_required=False): + """ + Get an available item and mark it as unavailable for the default of five + minutes. The result of this method must always be composed of simple + python objects which are JSON serializable for network portability reasons. + """ + now = datetime.utcnow() + + # Select an available queue item. + db_item = self._select_available_item(ordering_required, now) + if db_item is None: self._currently_processing = False + return None + + # Attempt to claim the item for this instance. + was_claimed = self._attempt_to_claim_item(db_item, now, processing_time) + if not was_claimed: + self._currently_processing = False + return None + + self._currently_processing = True # Return a view of the queue item rather than an active db object - return item + return AttrDict({ + 'id': db_item.id, + 'body': db_item.body, + 'retries_remaining': db_item.retries_remaining - 1, + }) def cancel(self, item_id): """ Attempts to cancel the queue item with the given ID from the queue. Returns true on success @@ -312,8 +356,9 @@ class WorkQueue(object): if has_change: queue_item.save() + return has_change except QueueItem.DoesNotExist: - return + return False def delete_expired(expiration_threshold, deletion_threshold, batch_size): diff --git a/test/test_queue.py b/test/test_queue.py index b066c2e2f..63e00c18a 100644 --- a/test/test_queue.py +++ b/test/test_queue.py @@ -7,8 +7,8 @@ from functools import wraps from app import app from initdb import setup_database_for_testing, finished_database_for_testing -from data.queue import WorkQueue -from datetime import timedelta +from data.database import QueueItem +from data.queue import WorkQueue, MINIMUM_EXTENSION QUEUE_NAME = 'testqueuename' @@ -63,6 +63,72 @@ class QueueTestCase(unittest.TestCase): class TestQueue(QueueTestCase): + def test_get_single_item(self): + # Add a single item to the queue. + self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) + + # Have two "instances" retrieve an item to claim. Since there is only one, both calls should + # return the same item. + now = datetime.utcnow() + first_item = self.queue._select_available_item(False, now) + second_item = self.queue._select_available_item(False, now) + + self.assertEquals(first_item.id, second_item.id) + self.assertEquals(first_item.state_id, second_item.state_id) + + # Have both "instances" now try to claim the item. Only one should succeed. + first_claimed = self.queue._attempt_to_claim_item(first_item, now, 300) + second_claimed = self.queue._attempt_to_claim_item(first_item, now, 300) + + self.assertTrue(first_claimed) + self.assertFalse(second_claimed) + + # Ensure the item is no longer available. + self.assertIsNone(self.queue.get()) + + # Ensure the item's state ID has changed. + self.assertNotEqual(first_item.state_id, QueueItem.get().state_id) + + def test_extend_processing(self): + # Add and retrieve a queue item. + self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) + queue_item = self.queue.get(processing_time=10) + self.assertIsNotNone(queue_item) + + existing_db_item = QueueItem.get(id=queue_item.id) + + # Call extend processing with a timedelta less than the minimum and ensure its + # processing_expires and state_id do not change. + changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() - 1) + self.assertFalse(changed) + + updated_db_item = QueueItem.get(id=queue_item.id) + + self.assertEquals(existing_db_item.processing_expires, updated_db_item.processing_expires) + self.assertEquals(existing_db_item.state_id, updated_db_item.state_id) + + # Call extend processing with a timedelta greater than the minimum and ensure its + # processing_expires and state_id are changed. + changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() + 1) + self.assertTrue(changed) + + updated_db_item = QueueItem.get(id=queue_item.id) + + self.assertNotEqual(existing_db_item.processing_expires, updated_db_item.processing_expires) + self.assertNotEqual(existing_db_item.state_id, updated_db_item.state_id) + + # Call extend processing with a timedelta less than the minimum but also with new data and + # ensure its processing_expires and state_id are changed. + changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() - 1, + updated_data='newbody') + self.assertTrue(changed) + + updated_db_item = QueueItem.get(id=queue_item.id) + + self.assertNotEqual(existing_db_item.processing_expires, updated_db_item.processing_expires) + self.assertNotEqual(existing_db_item.state_id, updated_db_item.state_id) + self.assertEquals('newbody', updated_db_item.body) + def test_same_canonical_names(self): self.assertEqual(self.reporter.currently_processing, None) self.assertEqual(self.reporter.running_count, None) @@ -244,8 +310,33 @@ class TestQueue(QueueTestCase): count = self.queue.num_available_jobs_between(now, now, 'abc') self.assertEqual(0, count) + def test_incomplete(self): + # Add an item. + self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) + + now = datetime.utcnow() + count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, + ['/somenamespace']) + self.assertEqual(1, count) + + # Retrieve it. + item = self.queue.get() + self.assertIsNotNone(item) + self.assertTrue(self.reporter.currently_processing) + + # Mark it as incomplete. + self.queue.incomplete(item, retry_after=-1) + self.assertFalse(self.reporter.currently_processing) + + # Retrieve again to ensure it is once again available. + same_item = self.queue.get() + self.assertIsNotNone(same_item) + self.assertTrue(self.reporter.currently_processing) + + self.assertEquals(item.id, same_item.id) + def test_complete(self): - # Add some items. + # Add an item. self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) now = datetime.utcnow()