8c4e86f48b
Before this change, the queue code would check that none of the fields on the item to be claimed had changed between the time when the item was selected and the item is claimed. While this is a safe approach, it also causes quite a bit of lock contention in MySQL, because InnoDB will take a lock on *any* rows examined by the `where` clause of the `update`, even if they will ultimately thrown out due to other clauses (See: http://dev.mysql.com/doc/refman/5.7/en/innodb-locks-set.html: "A ..., an UPDATE, ... generally set record locks on every index record that is scanned in the processing of the SQL statement. It does not matter whether there are WHERE conditions in the statement that would exclude the row. InnoDB does not remember the exact WHERE condition, but only knows which index ranges were scanned"). As a result, we want to minimize the number of fields accessed in the `where` clause on an update to the QueueItem row. To do so, we introduce a new `state_id` column, which is updated on *every change* to the QueueItem rows with a unique, random value. We can then have the queue item claiming code simply check that the `state_id` column has not changed between the retrieval and claiming steps. This minimizes the number of columns being checked to two (`id` and `state_id`), and thus, should significantly reduce lock contention. Note that we can not (yet) reduce to just a single `state_id` column (which should work in theory), because we need to maintain backwards compatibility with existing items in the QueueItem table, which will be given empty `state_id` values when the migration in this change runs. Also adds a number of tests for other queue operations that we want to make sure operate correctly following this change. [Delivers #133632501]
426 lines
16 KiB
Python
426 lines
16 KiB
Python
import unittest
|
|
import json
|
|
import time
|
|
|
|
from datetime import datetime, timedelta
|
|
from functools import wraps
|
|
|
|
from app import app
|
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
|
from data.database import QueueItem
|
|
from data.queue import WorkQueue, MINIMUM_EXTENSION
|
|
|
|
|
|
QUEUE_NAME = 'testqueuename'
|
|
|
|
|
|
class SaveLastCountReporter(object):
|
|
def __init__(self):
|
|
self.currently_processing = None
|
|
self.running_count = None
|
|
self.total = None
|
|
|
|
def __call__(self, currently_processing, running_count, total_jobs):
|
|
self.currently_processing = currently_processing
|
|
self.running_count = running_count
|
|
self.total = total_jobs
|
|
|
|
|
|
class AutoUpdatingQueue(object):
|
|
def __init__(self, queue_to_wrap):
|
|
self._queue = queue_to_wrap
|
|
|
|
def _wrapper(self, func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
to_return = func(*args, **kwargs)
|
|
self._queue.update_metrics()
|
|
return to_return
|
|
return wrapper
|
|
|
|
def __getattr__(self, attr_name):
|
|
method_or_attr = getattr(self._queue, attr_name)
|
|
if callable(method_or_attr):
|
|
return self._wrapper(method_or_attr)
|
|
else:
|
|
return method_or_attr
|
|
|
|
|
|
class QueueTestCase(unittest.TestCase):
|
|
TEST_MESSAGE_1 = json.dumps({'data': 1})
|
|
TEST_MESSAGE_2 = json.dumps({'data': 2})
|
|
TEST_MESSAGES = [json.dumps({'data': str(i)}) for i in range(1, 101)]
|
|
|
|
def setUp(self):
|
|
self.reporter = SaveLastCountReporter()
|
|
self.transaction_factory = app.config['DB_TRANSACTION_FACTORY']
|
|
self.queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory,
|
|
reporter=self.reporter))
|
|
setup_database_for_testing(self)
|
|
|
|
def tearDown(self):
|
|
finished_database_for_testing(self)
|
|
|
|
|
|
class TestQueue(QueueTestCase):
|
|
def test_get_single_item(self):
|
|
# Add a single item to the queue.
|
|
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)
|
|
|
|
# Have two "instances" retrieve an item to claim. Since there is only one, both calls should
|
|
# return the same item.
|
|
now = datetime.utcnow()
|
|
first_item = self.queue._select_available_item(False, now)
|
|
second_item = self.queue._select_available_item(False, now)
|
|
|
|
self.assertEquals(first_item.id, second_item.id)
|
|
self.assertEquals(first_item.state_id, second_item.state_id)
|
|
|
|
# Have both "instances" now try to claim the item. Only one should succeed.
|
|
first_claimed = self.queue._attempt_to_claim_item(first_item, now, 300)
|
|
second_claimed = self.queue._attempt_to_claim_item(first_item, now, 300)
|
|
|
|
self.assertTrue(first_claimed)
|
|
self.assertFalse(second_claimed)
|
|
|
|
# Ensure the item is no longer available.
|
|
self.assertIsNone(self.queue.get())
|
|
|
|
# Ensure the item's state ID has changed.
|
|
self.assertNotEqual(first_item.state_id, QueueItem.get().state_id)
|
|
|
|
def test_extend_processing(self):
|
|
# Add and retrieve a queue item.
|
|
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)
|
|
queue_item = self.queue.get(processing_time=10)
|
|
self.assertIsNotNone(queue_item)
|
|
|
|
existing_db_item = QueueItem.get(id=queue_item.id)
|
|
|
|
# Call extend processing with a timedelta less than the minimum and ensure its
|
|
# processing_expires and state_id do not change.
|
|
changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() - 1)
|
|
self.assertFalse(changed)
|
|
|
|
updated_db_item = QueueItem.get(id=queue_item.id)
|
|
|
|
self.assertEquals(existing_db_item.processing_expires, updated_db_item.processing_expires)
|
|
self.assertEquals(existing_db_item.state_id, updated_db_item.state_id)
|
|
|
|
# Call extend processing with a timedelta greater than the minimum and ensure its
|
|
# processing_expires and state_id are changed.
|
|
changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() + 1)
|
|
self.assertTrue(changed)
|
|
|
|
updated_db_item = QueueItem.get(id=queue_item.id)
|
|
|
|
self.assertNotEqual(existing_db_item.processing_expires, updated_db_item.processing_expires)
|
|
self.assertNotEqual(existing_db_item.state_id, updated_db_item.state_id)
|
|
|
|
# Call extend processing with a timedelta less than the minimum but also with new data and
|
|
# ensure its processing_expires and state_id are changed.
|
|
changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() - 1,
|
|
updated_data='newbody')
|
|
self.assertTrue(changed)
|
|
|
|
updated_db_item = QueueItem.get(id=queue_item.id)
|
|
|
|
self.assertNotEqual(existing_db_item.processing_expires, updated_db_item.processing_expires)
|
|
self.assertNotEqual(existing_db_item.state_id, updated_db_item.state_id)
|
|
self.assertEquals('newbody', updated_db_item.body)
|
|
|
|
def test_same_canonical_names(self):
|
|
self.assertEqual(self.reporter.currently_processing, None)
|
|
self.assertEqual(self.reporter.running_count, None)
|
|
self.assertEqual(self.reporter.total, None)
|
|
|
|
id_1 = int(self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1))
|
|
id_2 = int(self.queue.put(['abc', 'def'], self.TEST_MESSAGE_2, available_after=-1))
|
|
self.assertEqual(id_1 + 1, id_2)
|
|
self.assertEqual(self.reporter.currently_processing, False)
|
|
self.assertEqual(self.reporter.running_count, 0)
|
|
self.assertEqual(self.reporter.total, 1)
|
|
|
|
one = self.queue.get(ordering_required=True)
|
|
self.assertNotEqual(None, one)
|
|
self.assertEqual(self.TEST_MESSAGE_1, one.body)
|
|
self.assertEqual(self.reporter.currently_processing, True)
|
|
self.assertEqual(self.reporter.running_count, 1)
|
|
self.assertEqual(self.reporter.total, 1)
|
|
|
|
two_fail = self.queue.get(ordering_required=True)
|
|
self.assertEqual(None, two_fail)
|
|
self.assertEqual(self.reporter.running_count, 1)
|
|
self.assertEqual(self.reporter.total, 1)
|
|
|
|
self.queue.complete(one)
|
|
self.assertEqual(self.reporter.currently_processing, False)
|
|
self.assertEqual(self.reporter.running_count, 0)
|
|
self.assertEqual(self.reporter.total, 1)
|
|
|
|
two = self.queue.get(ordering_required=True)
|
|
self.assertNotEqual(None, two)
|
|
self.assertEqual(self.reporter.currently_processing, True)
|
|
self.assertEqual(self.TEST_MESSAGE_2, two.body)
|
|
self.assertEqual(self.reporter.running_count, 1)
|
|
self.assertEqual(self.reporter.total, 1)
|
|
|
|
def test_different_canonical_names(self):
|
|
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)
|
|
self.queue.put(['abc', 'ghi'], self.TEST_MESSAGE_2, available_after=-1)
|
|
self.assertEqual(self.reporter.running_count, 0)
|
|
self.assertEqual(self.reporter.total, 2)
|
|
|
|
one = self.queue.get(ordering_required=True)
|
|
self.assertNotEqual(None, one)
|
|
self.assertEqual(self.TEST_MESSAGE_1, one.body)
|
|
self.assertEqual(self.reporter.running_count, 1)
|
|
self.assertEqual(self.reporter.total, 2)
|
|
|
|
two = self.queue.get(ordering_required=True)
|
|
self.assertNotEqual(None, two)
|
|
self.assertEqual(self.TEST_MESSAGE_2, two.body)
|
|
self.assertEqual(self.reporter.running_count, 2)
|
|
self.assertEqual(self.reporter.total, 2)
|
|
|
|
def test_canonical_name(self):
|
|
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)
|
|
self.queue.put(['abc', 'def', 'ghi'], self.TEST_MESSAGE_1, available_after=-1)
|
|
|
|
one = self.queue.get(ordering_required=True)
|
|
self.assertNotEqual(QUEUE_NAME + '/abc/def/', one)
|
|
|
|
two = self.queue.get(ordering_required=True)
|
|
self.assertNotEqual(QUEUE_NAME + '/abc/def/ghi/', two)
|
|
|
|
def test_expiration(self):
|
|
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)
|
|
self.assertEqual(self.reporter.running_count, 0)
|
|
self.assertEqual(self.reporter.total, 1)
|
|
|
|
one = self.queue.get(processing_time=0.5, ordering_required=True)
|
|
self.assertNotEqual(None, one)
|
|
self.assertEqual(self.reporter.running_count, 1)
|
|
self.assertEqual(self.reporter.total, 1)
|
|
|
|
one_fail = self.queue.get(ordering_required=True)
|
|
self.assertEqual(None, one_fail)
|
|
|
|
time.sleep(1)
|
|
self.queue.update_metrics()
|
|
self.assertEqual(self.reporter.running_count, 0)
|
|
self.assertEqual(self.reporter.total, 1)
|
|
|
|
one_again = self.queue.get(ordering_required=True)
|
|
self.assertNotEqual(None, one_again)
|
|
self.assertEqual(self.reporter.running_count, 1)
|
|
self.assertEqual(self.reporter.total, 1)
|
|
|
|
def test_alive(self):
|
|
# No queue item = not alive.
|
|
self.assertFalse(self.queue.alive(['abc', 'def']))
|
|
|
|
# Add a queue item.
|
|
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)
|
|
self.assertTrue(self.queue.alive(['abc', 'def']))
|
|
|
|
# Retrieve the queue item.
|
|
queue_item = self.queue.get()
|
|
self.assertIsNotNone(queue_item)
|
|
self.assertTrue(self.queue.alive(['abc', 'def']))
|
|
|
|
# Make sure it is running by trying to retrieve it again.
|
|
self.assertIsNone(self.queue.get())
|
|
|
|
# Delete the queue item.
|
|
self.queue.complete(queue_item)
|
|
self.assertFalse(self.queue.alive(['abc', 'def']))
|
|
|
|
def test_specialized_queue(self):
|
|
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)
|
|
self.queue.put(['def', 'def'], self.TEST_MESSAGE_2, available_after=-1)
|
|
|
|
my_queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory, ['def']))
|
|
|
|
two = my_queue.get(ordering_required=True)
|
|
self.assertNotEqual(None, two)
|
|
self.assertEqual(self.TEST_MESSAGE_2, two.body)
|
|
|
|
one_fail = my_queue.get(ordering_required=True)
|
|
self.assertEqual(None, one_fail)
|
|
|
|
one = self.queue.get(ordering_required=True)
|
|
self.assertNotEqual(None, one)
|
|
self.assertEqual(self.TEST_MESSAGE_1, one.body)
|
|
|
|
def test_random_queue_no_duplicates(self):
|
|
for msg in self.TEST_MESSAGES:
|
|
self.queue.put(['abc', 'def'], msg, available_after=-1)
|
|
seen = set()
|
|
|
|
for _ in range(1, 101):
|
|
item = self.queue.get()
|
|
json_body = json.loads(item.body)
|
|
msg = str(json_body['data'])
|
|
self.assertTrue(msg not in seen)
|
|
seen.add(msg)
|
|
|
|
for body in self.TEST_MESSAGES:
|
|
json_body = json.loads(body)
|
|
msg = str(json_body['data'])
|
|
self.assertIn(msg, seen)
|
|
|
|
def test_bulk_insert(self):
|
|
self.assertEqual(self.reporter.currently_processing, None)
|
|
self.assertEqual(self.reporter.running_count, None)
|
|
self.assertEqual(self.reporter.total, None)
|
|
|
|
with self.queue.batch_insert() as queue_put:
|
|
queue_put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)
|
|
queue_put(['abc', 'def'], self.TEST_MESSAGE_2, available_after=-1)
|
|
|
|
self.queue.update_metrics()
|
|
self.assertEqual(self.reporter.currently_processing, False)
|
|
self.assertEqual(self.reporter.running_count, 0)
|
|
self.assertEqual(self.reporter.total, 1)
|
|
|
|
with self.queue.batch_insert() as queue_put:
|
|
queue_put(['abd', 'def'], self.TEST_MESSAGE_1, available_after=-1)
|
|
queue_put(['abd', 'ghi'], self.TEST_MESSAGE_2, available_after=-1)
|
|
|
|
self.queue.update_metrics()
|
|
self.assertEqual(self.reporter.currently_processing, False)
|
|
self.assertEqual(self.reporter.running_count, 0)
|
|
self.assertEqual(self.reporter.total, 3)
|
|
|
|
def test_num_available_between(self):
|
|
now = datetime.utcnow()
|
|
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-10)
|
|
self.queue.put(['abc', 'ghi'], self.TEST_MESSAGE_2, available_after=-5)
|
|
|
|
# Partial results
|
|
count = self.queue.num_available_jobs_between(now-timedelta(seconds=8), now, ['abc'])
|
|
self.assertEqual(1, count)
|
|
|
|
# All results
|
|
count = self.queue.num_available_jobs_between(now-timedelta(seconds=20), now, ['/abc'])
|
|
self.assertEqual(2, count)
|
|
|
|
# No results
|
|
count = self.queue.num_available_jobs_between(now, now, 'abc')
|
|
self.assertEqual(0, count)
|
|
|
|
def test_incomplete(self):
|
|
# Add an item.
|
|
self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10)
|
|
|
|
now = datetime.utcnow()
|
|
count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now,
|
|
['/somenamespace'])
|
|
self.assertEqual(1, count)
|
|
|
|
# Retrieve it.
|
|
item = self.queue.get()
|
|
self.assertIsNotNone(item)
|
|
self.assertTrue(self.reporter.currently_processing)
|
|
|
|
# Mark it as incomplete.
|
|
self.queue.incomplete(item, retry_after=-1)
|
|
self.assertFalse(self.reporter.currently_processing)
|
|
|
|
# Retrieve again to ensure it is once again available.
|
|
same_item = self.queue.get()
|
|
self.assertIsNotNone(same_item)
|
|
self.assertTrue(self.reporter.currently_processing)
|
|
|
|
self.assertEquals(item.id, same_item.id)
|
|
|
|
def test_complete(self):
|
|
# Add an item.
|
|
self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10)
|
|
|
|
now = datetime.utcnow()
|
|
count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now,
|
|
['/somenamespace'])
|
|
self.assertEqual(1, count)
|
|
|
|
# Retrieve it.
|
|
item = self.queue.get()
|
|
self.assertIsNotNone(item)
|
|
self.assertTrue(self.reporter.currently_processing)
|
|
|
|
# Mark it as complete.
|
|
self.queue.complete(item)
|
|
self.assertFalse(self.reporter.currently_processing)
|
|
|
|
def test_cancel(self):
|
|
# Add an item.
|
|
self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10)
|
|
self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_2, available_after=-5)
|
|
|
|
now = datetime.utcnow()
|
|
count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now,
|
|
['/somenamespace'])
|
|
self.assertEqual(2, count)
|
|
|
|
# Retrieve it.
|
|
item = self.queue.get()
|
|
self.assertIsNotNone(item)
|
|
|
|
# Make sure we can cancel it.
|
|
self.assertTrue(self.queue.cancel(item.id))
|
|
|
|
now = datetime.utcnow()
|
|
count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now,
|
|
['/somenamespace'])
|
|
self.assertEqual(1, count)
|
|
|
|
# Make sure it is gone.
|
|
self.assertFalse(self.queue.cancel(item.id))
|
|
|
|
def test_deleted_namespaced_items(self):
|
|
self.queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory,
|
|
reporter=self.reporter,
|
|
has_namespace=True))
|
|
|
|
self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10)
|
|
self.queue.put(['somenamespace', 'abc', 'ghi'], self.TEST_MESSAGE_2, available_after=-5)
|
|
self.queue.put(['anothernamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10)
|
|
|
|
# Ensure we have 2 items under `somenamespace` and 1 item under `anothernamespace`.
|
|
now = datetime.utcnow()
|
|
count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now,
|
|
['/somenamespace'])
|
|
self.assertEqual(2, count)
|
|
|
|
count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now,
|
|
['/anothernamespace'])
|
|
self.assertEqual(1, count)
|
|
|
|
# Delete all `somenamespace` items.
|
|
self.queue.delete_namespaced_items('somenamespace')
|
|
|
|
# Check the updated counts.
|
|
count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now,
|
|
['/somenamespace'])
|
|
self.assertEqual(0, count)
|
|
|
|
count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now,
|
|
['/anothernamespace'])
|
|
self.assertEqual(1, count)
|
|
|
|
# Delete all `anothernamespace` items.
|
|
self.queue.delete_namespaced_items('anothernamespace')
|
|
|
|
# Check the updated counts.
|
|
count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now,
|
|
['/somenamespace'])
|
|
self.assertEqual(0, count)
|
|
|
|
count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now,
|
|
['/anothernamespace'])
|
|
self.assertEqual(0, count)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|