import unittest import json import time from datetime import datetime, timedelta from functools import wraps from app import app from initdb import setup_database_for_testing, finished_database_for_testing from data.database import QueueItem from data.queue import WorkQueue, MINIMUM_EXTENSION QUEUE_NAME = 'testqueuename' class SaveLastCountReporter(object): def __init__(self): self.currently_processing = None self.running_count = None self.total = None def __call__(self, currently_processing, running_count, total_jobs): self.currently_processing = currently_processing self.running_count = running_count self.total = total_jobs class AutoUpdatingQueue(object): def __init__(self, queue_to_wrap): self._queue = queue_to_wrap def _wrapper(self, func): @wraps(func) def wrapper(*args, **kwargs): to_return = func(*args, **kwargs) self._queue.update_metrics() return to_return return wrapper def __getattr__(self, attr_name): method_or_attr = getattr(self._queue, attr_name) if callable(method_or_attr): return self._wrapper(method_or_attr) else: return method_or_attr class QueueTestCase(unittest.TestCase): TEST_MESSAGE_1 = json.dumps({'data': 1}) TEST_MESSAGE_2 = json.dumps({'data': 2}) TEST_MESSAGES = [json.dumps({'data': str(i)}) for i in range(1, 101)] def setUp(self): self.reporter = SaveLastCountReporter() self.transaction_factory = app.config['DB_TRANSACTION_FACTORY'] self.queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory, reporter=self.reporter)) setup_database_for_testing(self) def tearDown(self): finished_database_for_testing(self) class TestQueue(QueueTestCase): def test_get_single_item(self): # Add a single item to the queue. self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) # Have two "instances" retrieve an item to claim. Since there is only one, both calls should # return the same item. now = datetime.utcnow() first_item = self.queue._select_available_item(False, now) second_item = self.queue._select_available_item(False, now) self.assertEquals(first_item.id, second_item.id) self.assertEquals(first_item.state_id, second_item.state_id) # Have both "instances" now try to claim the item. Only one should succeed. first_claimed = self.queue._attempt_to_claim_item(first_item, now, 300) second_claimed = self.queue._attempt_to_claim_item(first_item, now, 300) self.assertTrue(first_claimed) self.assertFalse(second_claimed) # Ensure the item is no longer available. self.assertIsNone(self.queue.get()) # Ensure the item's state ID has changed. self.assertNotEqual(first_item.state_id, QueueItem.get().state_id) def test_extend_processing(self): # Add and retrieve a queue item. self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) queue_item = self.queue.get(processing_time=10) self.assertIsNotNone(queue_item) existing_db_item = QueueItem.get(id=queue_item.id) # Call extend processing with a timedelta less than the minimum and ensure its # processing_expires and state_id do not change. changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() - 1) self.assertFalse(changed) updated_db_item = QueueItem.get(id=queue_item.id) self.assertEquals(existing_db_item.processing_expires, updated_db_item.processing_expires) self.assertEquals(existing_db_item.state_id, updated_db_item.state_id) # Call extend processing with a timedelta greater than the minimum and ensure its # processing_expires and state_id are changed. changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() + 1) self.assertTrue(changed) updated_db_item = QueueItem.get(id=queue_item.id) self.assertNotEqual(existing_db_item.processing_expires, updated_db_item.processing_expires) self.assertNotEqual(existing_db_item.state_id, updated_db_item.state_id) # Call extend processing with a timedelta less than the minimum but also with new data and # ensure its processing_expires and state_id are changed. changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() - 1, updated_data='newbody') self.assertTrue(changed) updated_db_item = QueueItem.get(id=queue_item.id) self.assertNotEqual(existing_db_item.processing_expires, updated_db_item.processing_expires) self.assertNotEqual(existing_db_item.state_id, updated_db_item.state_id) self.assertEquals('newbody', updated_db_item.body) def test_same_canonical_names(self): self.assertEqual(self.reporter.currently_processing, None) self.assertEqual(self.reporter.running_count, None) self.assertEqual(self.reporter.total, None) id_1 = int(self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)) id_2 = int(self.queue.put(['abc', 'def'], self.TEST_MESSAGE_2, available_after=-1)) self.assertEqual(id_1 + 1, id_2) self.assertEqual(self.reporter.currently_processing, False) self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) one = self.queue.get(ordering_required=True) self.assertNotEqual(None, one) self.assertEqual(self.TEST_MESSAGE_1, one.body) self.assertEqual(self.reporter.currently_processing, True) self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) two_fail = self.queue.get(ordering_required=True) self.assertEqual(None, two_fail) self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) self.queue.complete(one) self.assertEqual(self.reporter.currently_processing, False) self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) two = self.queue.get(ordering_required=True) self.assertNotEqual(None, two) self.assertEqual(self.reporter.currently_processing, True) self.assertEqual(self.TEST_MESSAGE_2, two.body) self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) def test_different_canonical_names(self): self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) self.queue.put(['abc', 'ghi'], self.TEST_MESSAGE_2, available_after=-1) self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 2) one = self.queue.get(ordering_required=True) self.assertNotEqual(None, one) self.assertEqual(self.TEST_MESSAGE_1, one.body) self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 2) two = self.queue.get(ordering_required=True) self.assertNotEqual(None, two) self.assertEqual(self.TEST_MESSAGE_2, two.body) self.assertEqual(self.reporter.running_count, 2) self.assertEqual(self.reporter.total, 2) def test_canonical_name(self): self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) self.queue.put(['abc', 'def', 'ghi'], self.TEST_MESSAGE_1, available_after=-1) one = self.queue.get(ordering_required=True) self.assertNotEqual(QUEUE_NAME + '/abc/def/', one) two = self.queue.get(ordering_required=True) self.assertNotEqual(QUEUE_NAME + '/abc/def/ghi/', two) def test_expiration(self): self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) one = self.queue.get(processing_time=0.5, ordering_required=True) self.assertNotEqual(None, one) self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) one_fail = self.queue.get(ordering_required=True) self.assertEqual(None, one_fail) time.sleep(1) self.queue.update_metrics() self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) one_again = self.queue.get(ordering_required=True) self.assertNotEqual(None, one_again) self.assertEqual(self.reporter.running_count, 1) self.assertEqual(self.reporter.total, 1) def test_alive(self): # No queue item = not alive. self.assertFalse(self.queue.alive(['abc', 'def'])) # Add a queue item. self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) self.assertTrue(self.queue.alive(['abc', 'def'])) # Retrieve the queue item. queue_item = self.queue.get() self.assertIsNotNone(queue_item) self.assertTrue(self.queue.alive(['abc', 'def'])) # Make sure it is running by trying to retrieve it again. self.assertIsNone(self.queue.get()) # Delete the queue item. self.queue.complete(queue_item) self.assertFalse(self.queue.alive(['abc', 'def'])) def test_specialized_queue(self): self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) self.queue.put(['def', 'def'], self.TEST_MESSAGE_2, available_after=-1) my_queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory, ['def'])) two = my_queue.get(ordering_required=True) self.assertNotEqual(None, two) self.assertEqual(self.TEST_MESSAGE_2, two.body) one_fail = my_queue.get(ordering_required=True) self.assertEqual(None, one_fail) one = self.queue.get(ordering_required=True) self.assertNotEqual(None, one) self.assertEqual(self.TEST_MESSAGE_1, one.body) def test_random_queue_no_duplicates(self): for msg in self.TEST_MESSAGES: self.queue.put(['abc', 'def'], msg, available_after=-1) seen = set() for _ in range(1, 101): item = self.queue.get() json_body = json.loads(item.body) msg = str(json_body['data']) self.assertTrue(msg not in seen) seen.add(msg) for body in self.TEST_MESSAGES: json_body = json.loads(body) msg = str(json_body['data']) self.assertIn(msg, seen) def test_bulk_insert(self): self.assertEqual(self.reporter.currently_processing, None) self.assertEqual(self.reporter.running_count, None) self.assertEqual(self.reporter.total, None) with self.queue.batch_insert() as queue_put: queue_put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) queue_put(['abc', 'def'], self.TEST_MESSAGE_2, available_after=-1) self.queue.update_metrics() self.assertEqual(self.reporter.currently_processing, False) self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 1) with self.queue.batch_insert() as queue_put: queue_put(['abd', 'def'], self.TEST_MESSAGE_1, available_after=-1) queue_put(['abd', 'ghi'], self.TEST_MESSAGE_2, available_after=-1) self.queue.update_metrics() self.assertEqual(self.reporter.currently_processing, False) self.assertEqual(self.reporter.running_count, 0) self.assertEqual(self.reporter.total, 3) def test_num_available_between(self): now = datetime.utcnow() self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) self.queue.put(['abc', 'ghi'], self.TEST_MESSAGE_2, available_after=-5) # Partial results count = self.queue.num_available_jobs_between(now-timedelta(seconds=8), now, ['abc']) self.assertEqual(1, count) # All results count = self.queue.num_available_jobs_between(now-timedelta(seconds=20), now, ['/abc']) self.assertEqual(2, count) # No results count = self.queue.num_available_jobs_between(now, now, 'abc') self.assertEqual(0, count) def test_incomplete(self): # Add an item. self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) now = datetime.utcnow() count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/somenamespace']) self.assertEqual(1, count) # Retrieve it. item = self.queue.get() self.assertIsNotNone(item) self.assertTrue(self.reporter.currently_processing) # Mark it as incomplete. self.queue.incomplete(item, retry_after=-1) self.assertFalse(self.reporter.currently_processing) # Retrieve again to ensure it is once again available. same_item = self.queue.get() self.assertIsNotNone(same_item) self.assertTrue(self.reporter.currently_processing) self.assertEquals(item.id, same_item.id) def test_complete(self): # Add an item. self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) now = datetime.utcnow() count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/somenamespace']) self.assertEqual(1, count) # Retrieve it. item = self.queue.get() self.assertIsNotNone(item) self.assertTrue(self.reporter.currently_processing) # Mark it as complete. self.queue.complete(item) self.assertFalse(self.reporter.currently_processing) def test_cancel(self): # Add an item. self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_2, available_after=-5) now = datetime.utcnow() count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/somenamespace']) self.assertEqual(2, count) # Retrieve it. item = self.queue.get() self.assertIsNotNone(item) # Make sure we can cancel it. self.assertTrue(self.queue.cancel(item.id)) now = datetime.utcnow() count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/somenamespace']) self.assertEqual(1, count) # Make sure it is gone. self.assertFalse(self.queue.cancel(item.id)) def test_deleted_namespaced_items(self): self.queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory, reporter=self.reporter, has_namespace=True)) self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) self.queue.put(['somenamespace', 'abc', 'ghi'], self.TEST_MESSAGE_2, available_after=-5) self.queue.put(['anothernamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) # Ensure we have 2 items under `somenamespace` and 1 item under `anothernamespace`. now = datetime.utcnow() count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/somenamespace']) self.assertEqual(2, count) count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/anothernamespace']) self.assertEqual(1, count) # Delete all `somenamespace` items. self.queue.delete_namespaced_items('somenamespace') # Check the updated counts. count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/somenamespace']) self.assertEqual(0, count) count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/anothernamespace']) self.assertEqual(1, count) # Delete all `anothernamespace` items. self.queue.delete_namespaced_items('anothernamespace') # Check the updated counts. count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/somenamespace']) self.assertEqual(0, count) count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/anothernamespace']) self.assertEqual(0, count) if __name__ == '__main__': unittest.main()