From f648b3311ba795aa5262ce1f5e316f50f6ab11d8 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 18 Jul 2018 15:22:38 -0400 Subject: [PATCH] Move queue tests to pytest --- data/test/test_queue.py | 420 +++++++++++++++++++++++++++++++++++++++ test/test_queue.py | 426 ---------------------------------------- 2 files changed, 420 insertions(+), 426 deletions(-) create mode 100644 data/test/test_queue.py delete mode 100644 test/test_queue.py diff --git a/data/test/test_queue.py b/data/test/test_queue.py new file mode 100644 index 000000000..36f61b502 --- /dev/null +++ b/data/test/test_queue.py @@ -0,0 +1,420 @@ +import json +import time + +import pytest + +from contextlib import contextmanager +from datetime import datetime, timedelta +from functools import wraps + +from data.database import QueueItem +from data.queue import WorkQueue, MINIMUM_EXTENSION + +from test.fixtures import * + +QUEUE_NAME = 'testqueuename' + + +class SaveLastCountReporter(object): + def __init__(self): + self.currently_processing = None + self.running_count = None + self.total = None + + def __call__(self, currently_processing, running_count, total_jobs): + self.currently_processing = currently_processing + self.running_count = running_count + self.total = total_jobs + + +class AutoUpdatingQueue(object): + def __init__(self, queue_to_wrap): + self._queue = queue_to_wrap + + def _wrapper(self, func): + @wraps(func) + def wrapper(*args, **kwargs): + to_return = func(*args, **kwargs) + self._queue.update_metrics() + return to_return + return wrapper + + def __getattr__(self, attr_name): + method_or_attr = getattr(self._queue, attr_name) + if callable(method_or_attr): + return self._wrapper(method_or_attr) + else: + return method_or_attr + + +TEST_MESSAGE_1 = json.dumps({'data': 1}) +TEST_MESSAGE_2 = json.dumps({'data': 2}) +TEST_MESSAGES = [json.dumps({'data': str(i)}) for i in range(1, 101)] + + +@contextmanager +def fake_transaction(arg): + yield + +@pytest.fixture() +def reporter(): + return SaveLastCountReporter() + + +@pytest.fixture() +def transaction_factory(): + return fake_transaction + + +@pytest.fixture() +def queue(reporter, transaction_factory, initialized_db): + return AutoUpdatingQueue(WorkQueue(QUEUE_NAME, transaction_factory, reporter=reporter)) + + +def test_get_single_item(queue, reporter, transaction_factory): + # Add a single item to the queue. + queue.put(['abc', 'def'], TEST_MESSAGE_1, available_after=-1) + + # Have two "instances" retrieve an item to claim. Since there is only one, both calls should + # return the same item. + now = datetime.utcnow() + first_item = queue._select_available_item(False, now) + second_item = queue._select_available_item(False, now) + + assert first_item.id == second_item.id + assert first_item.state_id == second_item.state_id + + # Have both "instances" now try to claim the item. Only one should succeed. + first_claimed = queue._attempt_to_claim_item(first_item, now, 300) + second_claimed = queue._attempt_to_claim_item(first_item, now, 300) + + assert first_claimed + assert not second_claimed + + # Ensure the item is no longer available. + assert queue.get() is None + + # Ensure the item's state ID has changed. + assert first_item.state_id != QueueItem.get().state_id + +def test_extend_processing(queue, reporter, transaction_factory): + # Add and retrieve a queue item. + queue.put(['abc', 'def'], TEST_MESSAGE_1, available_after=-1) + queue_item = queue.get(processing_time=10) + assert queue_item is not None + + existing_db_item = QueueItem.get(id=queue_item.id) + + # Call extend processing with a timedelta less than the minimum and ensure its + # processing_expires and state_id do not change. + changed = queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() - 1) + assert not changed + + updated_db_item = QueueItem.get(id=queue_item.id) + + assert existing_db_item.processing_expires == updated_db_item.processing_expires + assert existing_db_item.state_id == updated_db_item.state_id + + # Call extend processing with a timedelta greater than the minimum and ensure its + # processing_expires and state_id are changed. + changed = queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() + 1) + assert changed + + updated_db_item = QueueItem.get(id=queue_item.id) + + assert existing_db_item.processing_expires != updated_db_item.processing_expires + assert existing_db_item.state_id != updated_db_item.state_id + + # Call extend processing with a timedelta less than the minimum but also with new data and + # ensure its processing_expires and state_id are changed. + changed = queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() - 1, + updated_data='newbody') + assert changed + + updated_db_item = QueueItem.get(id=queue_item.id) + + assert existing_db_item.processing_expires != updated_db_item.processing_expires + assert existing_db_item.state_id != updated_db_item.state_id + assert updated_db_item.body == 'newbody' + +def test_same_canonical_names(queue, reporter, transaction_factory): + assert reporter.currently_processing is None + assert reporter.running_count is None + assert reporter.total is None + + id_1 = int(queue.put(['abc', 'def'], TEST_MESSAGE_1, available_after=-1)) + id_2 = int(queue.put(['abc', 'def'], TEST_MESSAGE_2, available_after=-1)) + assert id_1 + 1 == id_2 + assert not reporter.currently_processing + assert reporter.running_count == 0 + assert reporter.total == 1 + + one = queue.get(ordering_required=True) + assert one is not None + assert one.body == TEST_MESSAGE_1 + assert reporter.currently_processing + assert reporter.running_count == 1 + assert reporter.total == 1 + + two_fail = queue.get(ordering_required=True) + assert two_fail is None + assert reporter.running_count == 1 + assert reporter.total == 1 + + queue.complete(one) + assert not reporter.currently_processing + assert reporter.running_count == 0 + assert reporter.total == 1 + + two = queue.get(ordering_required=True) + assert two is not None + assert reporter.currently_processing + assert two.body == TEST_MESSAGE_2 + assert reporter.running_count == 1 + assert reporter.total == 1 + +def test_different_canonical_names(queue, reporter, transaction_factory): + queue.put(['abc', 'def'], TEST_MESSAGE_1, available_after=-1) + queue.put(['abc', 'ghi'], TEST_MESSAGE_2, available_after=-1) + assert reporter.running_count == 0 + assert reporter.total == 2 + + one = queue.get(ordering_required=True) + assert one is not None + assert one.body == TEST_MESSAGE_1 + assert reporter.running_count == 1 + assert reporter.total == 2 + + two = queue.get(ordering_required=True) + assert two is not None + assert two.body == TEST_MESSAGE_2 + assert reporter.running_count == 2 + assert reporter.total == 2 + +def test_canonical_name(queue, reporter, transaction_factory): + queue.put(['abc', 'def'], TEST_MESSAGE_1, available_after=-1) + queue.put(['abc', 'def', 'ghi'], TEST_MESSAGE_1, available_after=-1) + + one = queue.get(ordering_required=True) + assert QUEUE_NAME + '/abc/def/' != one + + two = queue.get(ordering_required=True) + assert QUEUE_NAME + '/abc/def/ghi/' != two + +def test_expiration(queue, reporter, transaction_factory): + queue.put(['abc', 'def'], TEST_MESSAGE_1, available_after=-1) + assert reporter.running_count == 0 + assert reporter.total == 1 + + one = queue.get(processing_time=0.5, ordering_required=True) + assert one is not None + assert reporter.running_count == 1 + assert reporter.total == 1 + + one_fail = queue.get(ordering_required=True) + assert one_fail is None + + time.sleep(1) + queue.update_metrics() + assert reporter.running_count == 0 + assert reporter.total == 1 + + one_again = queue.get(ordering_required=True) + assert one_again is not None + assert reporter.running_count == 1 + assert reporter.total == 1 + +def test_alive(queue, reporter, transaction_factory): + # No queue item = not alive. + assert not queue.alive(['abc', 'def']) + + # Add a queue item. + queue.put(['abc', 'def'], TEST_MESSAGE_1, available_after=-1) + assert queue.alive(['abc', 'def']) + + # Retrieve the queue item. + queue_item = queue.get() + assert queue_item is not None + assert queue.alive(['abc', 'def']) + + # Make sure it is running by trying to retrieve it again. + assert queue.get() is None + + # Delete the queue item. + queue.complete(queue_item) + assert not queue.alive(['abc', 'def']) + +def test_specialized_queue(queue, reporter, transaction_factory): + queue.put(['abc', 'def'], TEST_MESSAGE_1, available_after=-1) + queue.put(['def', 'def'], TEST_MESSAGE_2, available_after=-1) + + my_queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, transaction_factory, ['def'])) + + two = my_queue.get(ordering_required=True) + assert two is not None + assert two.body == TEST_MESSAGE_2 + + one_fail = my_queue.get(ordering_required=True) + assert one_fail is None + + one = queue.get(ordering_required=True) + assert one is not None + assert one.body == TEST_MESSAGE_1 + +def test_random_queue_no_duplicates(queue, reporter, transaction_factory): + for msg in TEST_MESSAGES: + queue.put(['abc', 'def'], msg, available_after=-1) + seen = set() + + for _ in range(1, 101): + item = queue.get() + json_body = json.loads(item.body) + msg = str(json_body['data']) + assert msg not in seen + seen.add(msg) + + for body in TEST_MESSAGES: + json_body = json.loads(body) + msg = str(json_body['data']) + assert msg in seen + +def test_bulk_insert(queue, reporter, transaction_factory): + assert reporter.currently_processing is None + assert reporter.running_count is None + assert reporter.total is None + + with queue.batch_insert() as queue_put: + queue_put(['abc', 'def'], TEST_MESSAGE_1, available_after=-1) + queue_put(['abc', 'def'], TEST_MESSAGE_2, available_after=-1) + + queue.update_metrics() + assert not reporter.currently_processing + assert reporter.running_count == 0 + assert reporter.total == 1 + + with queue.batch_insert() as queue_put: + queue_put(['abd', 'def'], TEST_MESSAGE_1, available_after=-1) + queue_put(['abd', 'ghi'], TEST_MESSAGE_2, available_after=-1) + + queue.update_metrics() + assert not reporter.currently_processing + assert reporter.running_count == 0 + assert reporter.total == 3 + +def test_num_available_between(queue, reporter, transaction_factory): + now = datetime.utcnow() + queue.put(['abc', 'def'], TEST_MESSAGE_1, available_after=-10) + queue.put(['abc', 'ghi'], TEST_MESSAGE_2, available_after=-5) + + # Partial results + count = queue.num_available_jobs_between(now-timedelta(seconds=8), now, ['abc']) + assert count == 1 + + # All results + count = queue.num_available_jobs_between(now-timedelta(seconds=20), now, ['/abc']) + assert count == 2 + + # No results + count = queue.num_available_jobs_between(now, now, 'abc') + assert count == 0 + +def test_incomplete(queue, reporter, transaction_factory): + # Add an item. + queue.put(['somenamespace', 'abc', 'def'], TEST_MESSAGE_1, available_after=-10) + + now = datetime.utcnow() + count = queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/somenamespace']) + assert count == 1 + + # Retrieve it. + item = queue.get() + assert item is not None + assert reporter.currently_processing + + # Mark it as incomplete. + queue.incomplete(item, retry_after=-1) + assert not reporter.currently_processing + + # Retrieve again to ensure it is once again available. + same_item = queue.get() + assert same_item is not None + assert reporter.currently_processing + + assert item.id == same_item.id + +def test_complete(queue, reporter, transaction_factory): + # Add an item. + queue.put(['somenamespace', 'abc', 'def'], TEST_MESSAGE_1, available_after=-10) + + now = datetime.utcnow() + count = queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/somenamespace']) + assert count == 1 + + # Retrieve it. + item = queue.get() + assert item is not None + assert reporter.currently_processing + + # Mark it as complete. + queue.complete(item) + assert not reporter.currently_processing + +def test_cancel(queue, reporter, transaction_factory): + # Add an item. + queue.put(['somenamespace', 'abc', 'def'], TEST_MESSAGE_1, available_after=-10) + queue.put(['somenamespace', 'abc', 'def'], TEST_MESSAGE_2, available_after=-5) + + now = datetime.utcnow() + count = queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/somenamespace']) + assert count == 2 + + # Retrieve it. + item = queue.get() + assert item is not None + + # Make sure we can cancel it. + assert queue.cancel(item.id) + + now = datetime.utcnow() + count = queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/somenamespace']) + assert count == 1 + + # Make sure it is gone. + assert not queue.cancel(item.id) + +def test_deleted_namespaced_items(queue, reporter, transaction_factory): + queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, transaction_factory, + reporter=reporter, + has_namespace=True)) + + queue.put(['somenamespace', 'abc', 'def'], TEST_MESSAGE_1, available_after=-10) + queue.put(['somenamespace', 'abc', 'ghi'], TEST_MESSAGE_2, available_after=-5) + queue.put(['anothernamespace', 'abc', 'def'], TEST_MESSAGE_1, available_after=-10) + + # Ensure we have 2 items under `somenamespace` and 1 item under `anothernamespace`. + now = datetime.utcnow() + count = queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/somenamespace']) + assert count == 2 + + count = queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/anothernamespace']) + assert count == 1 + + # Delete all `somenamespace` items. + queue.delete_namespaced_items('somenamespace') + + # Check the updated counts. + count = queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/somenamespace']) + assert count == 0 + + count = queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/anothernamespace']) + assert count == 1 + + # Delete all `anothernamespace` items. + queue.delete_namespaced_items('anothernamespace') + + # Check the updated counts. + count = queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/somenamespace']) + assert count == 0 + + count = queue.num_available_jobs_between(now - timedelta(seconds=60), now, ['/anothernamespace']) + assert count == 0 diff --git a/test/test_queue.py b/test/test_queue.py deleted file mode 100644 index 63e00c18a..000000000 --- a/test/test_queue.py +++ /dev/null @@ -1,426 +0,0 @@ -import unittest -import json -import time - -from datetime import datetime, timedelta -from functools import wraps - -from app import app -from initdb import setup_database_for_testing, finished_database_for_testing -from data.database import QueueItem -from data.queue import WorkQueue, MINIMUM_EXTENSION - - -QUEUE_NAME = 'testqueuename' - - -class SaveLastCountReporter(object): - def __init__(self): - self.currently_processing = None - self.running_count = None - self.total = None - - def __call__(self, currently_processing, running_count, total_jobs): - self.currently_processing = currently_processing - self.running_count = running_count - self.total = total_jobs - - -class AutoUpdatingQueue(object): - def __init__(self, queue_to_wrap): - self._queue = queue_to_wrap - - def _wrapper(self, func): - @wraps(func) - def wrapper(*args, **kwargs): - to_return = func(*args, **kwargs) - self._queue.update_metrics() - return to_return - return wrapper - - def __getattr__(self, attr_name): - method_or_attr = getattr(self._queue, attr_name) - if callable(method_or_attr): - return self._wrapper(method_or_attr) - else: - return method_or_attr - - -class QueueTestCase(unittest.TestCase): - TEST_MESSAGE_1 = json.dumps({'data': 1}) - TEST_MESSAGE_2 = json.dumps({'data': 2}) - TEST_MESSAGES = [json.dumps({'data': str(i)}) for i in range(1, 101)] - - def setUp(self): - self.reporter = SaveLastCountReporter() - self.transaction_factory = app.config['DB_TRANSACTION_FACTORY'] - self.queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory, - reporter=self.reporter)) - setup_database_for_testing(self) - - def tearDown(self): - finished_database_for_testing(self) - - -class TestQueue(QueueTestCase): - def test_get_single_item(self): - # Add a single item to the queue. - self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) - - # Have two "instances" retrieve an item to claim. Since there is only one, both calls should - # return the same item. - now = datetime.utcnow() - first_item = self.queue._select_available_item(False, now) - second_item = self.queue._select_available_item(False, now) - - self.assertEquals(first_item.id, second_item.id) - self.assertEquals(first_item.state_id, second_item.state_id) - - # Have both "instances" now try to claim the item. Only one should succeed. - first_claimed = self.queue._attempt_to_claim_item(first_item, now, 300) - second_claimed = self.queue._attempt_to_claim_item(first_item, now, 300) - - self.assertTrue(first_claimed) - self.assertFalse(second_claimed) - - # Ensure the item is no longer available. - self.assertIsNone(self.queue.get()) - - # Ensure the item's state ID has changed. - self.assertNotEqual(first_item.state_id, QueueItem.get().state_id) - - def test_extend_processing(self): - # Add and retrieve a queue item. - self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) - queue_item = self.queue.get(processing_time=10) - self.assertIsNotNone(queue_item) - - existing_db_item = QueueItem.get(id=queue_item.id) - - # Call extend processing with a timedelta less than the minimum and ensure its - # processing_expires and state_id do not change. - changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() - 1) - self.assertFalse(changed) - - updated_db_item = QueueItem.get(id=queue_item.id) - - self.assertEquals(existing_db_item.processing_expires, updated_db_item.processing_expires) - self.assertEquals(existing_db_item.state_id, updated_db_item.state_id) - - # Call extend processing with a timedelta greater than the minimum and ensure its - # processing_expires and state_id are changed. - changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() + 1) - self.assertTrue(changed) - - updated_db_item = QueueItem.get(id=queue_item.id) - - self.assertNotEqual(existing_db_item.processing_expires, updated_db_item.processing_expires) - self.assertNotEqual(existing_db_item.state_id, updated_db_item.state_id) - - # Call extend processing with a timedelta less than the minimum but also with new data and - # ensure its processing_expires and state_id are changed. - changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() - 1, - updated_data='newbody') - self.assertTrue(changed) - - updated_db_item = QueueItem.get(id=queue_item.id) - - self.assertNotEqual(existing_db_item.processing_expires, updated_db_item.processing_expires) - self.assertNotEqual(existing_db_item.state_id, updated_db_item.state_id) - self.assertEquals('newbody', updated_db_item.body) - - def test_same_canonical_names(self): - self.assertEqual(self.reporter.currently_processing, None) - self.assertEqual(self.reporter.running_count, None) - self.assertEqual(self.reporter.total, None) - - id_1 = int(self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)) - id_2 = int(self.queue.put(['abc', 'def'], self.TEST_MESSAGE_2, available_after=-1)) - self.assertEqual(id_1 + 1, id_2) - self.assertEqual(self.reporter.currently_processing, False) - self.assertEqual(self.reporter.running_count, 0) - self.assertEqual(self.reporter.total, 1) - - one = self.queue.get(ordering_required=True) - self.assertNotEqual(None, one) - self.assertEqual(self.TEST_MESSAGE_1, one.body) - self.assertEqual(self.reporter.currently_processing, True) - self.assertEqual(self.reporter.running_count, 1) - self.assertEqual(self.reporter.total, 1) - - two_fail = self.queue.get(ordering_required=True) - self.assertEqual(None, two_fail) - self.assertEqual(self.reporter.running_count, 1) - self.assertEqual(self.reporter.total, 1) - - self.queue.complete(one) - self.assertEqual(self.reporter.currently_processing, False) - self.assertEqual(self.reporter.running_count, 0) - self.assertEqual(self.reporter.total, 1) - - two = self.queue.get(ordering_required=True) - self.assertNotEqual(None, two) - self.assertEqual(self.reporter.currently_processing, True) - self.assertEqual(self.TEST_MESSAGE_2, two.body) - self.assertEqual(self.reporter.running_count, 1) - self.assertEqual(self.reporter.total, 1) - - def test_different_canonical_names(self): - self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) - self.queue.put(['abc', 'ghi'], self.TEST_MESSAGE_2, available_after=-1) - self.assertEqual(self.reporter.running_count, 0) - self.assertEqual(self.reporter.total, 2) - - one = self.queue.get(ordering_required=True) - self.assertNotEqual(None, one) - self.assertEqual(self.TEST_MESSAGE_1, one.body) - self.assertEqual(self.reporter.running_count, 1) - self.assertEqual(self.reporter.total, 2) - - two = self.queue.get(ordering_required=True) - self.assertNotEqual(None, two) - self.assertEqual(self.TEST_MESSAGE_2, two.body) - self.assertEqual(self.reporter.running_count, 2) - self.assertEqual(self.reporter.total, 2) - - def test_canonical_name(self): - self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) - self.queue.put(['abc', 'def', 'ghi'], self.TEST_MESSAGE_1, available_after=-1) - - one = self.queue.get(ordering_required=True) - self.assertNotEqual(QUEUE_NAME + '/abc/def/', one) - - two = self.queue.get(ordering_required=True) - self.assertNotEqual(QUEUE_NAME + '/abc/def/ghi/', two) - - def test_expiration(self): - self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) - self.assertEqual(self.reporter.running_count, 0) - self.assertEqual(self.reporter.total, 1) - - one = self.queue.get(processing_time=0.5, ordering_required=True) - self.assertNotEqual(None, one) - self.assertEqual(self.reporter.running_count, 1) - self.assertEqual(self.reporter.total, 1) - - one_fail = self.queue.get(ordering_required=True) - self.assertEqual(None, one_fail) - - time.sleep(1) - self.queue.update_metrics() - self.assertEqual(self.reporter.running_count, 0) - self.assertEqual(self.reporter.total, 1) - - one_again = self.queue.get(ordering_required=True) - self.assertNotEqual(None, one_again) - self.assertEqual(self.reporter.running_count, 1) - self.assertEqual(self.reporter.total, 1) - - def test_alive(self): - # No queue item = not alive. - self.assertFalse(self.queue.alive(['abc', 'def'])) - - # Add a queue item. - self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) - self.assertTrue(self.queue.alive(['abc', 'def'])) - - # Retrieve the queue item. - queue_item = self.queue.get() - self.assertIsNotNone(queue_item) - self.assertTrue(self.queue.alive(['abc', 'def'])) - - # Make sure it is running by trying to retrieve it again. - self.assertIsNone(self.queue.get()) - - # Delete the queue item. - self.queue.complete(queue_item) - self.assertFalse(self.queue.alive(['abc', 'def'])) - - def test_specialized_queue(self): - self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) - self.queue.put(['def', 'def'], self.TEST_MESSAGE_2, available_after=-1) - - my_queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory, ['def'])) - - two = my_queue.get(ordering_required=True) - self.assertNotEqual(None, two) - self.assertEqual(self.TEST_MESSAGE_2, two.body) - - one_fail = my_queue.get(ordering_required=True) - self.assertEqual(None, one_fail) - - one = self.queue.get(ordering_required=True) - self.assertNotEqual(None, one) - self.assertEqual(self.TEST_MESSAGE_1, one.body) - - def test_random_queue_no_duplicates(self): - for msg in self.TEST_MESSAGES: - self.queue.put(['abc', 'def'], msg, available_after=-1) - seen = set() - - for _ in range(1, 101): - item = self.queue.get() - json_body = json.loads(item.body) - msg = str(json_body['data']) - self.assertTrue(msg not in seen) - seen.add(msg) - - for body in self.TEST_MESSAGES: - json_body = json.loads(body) - msg = str(json_body['data']) - self.assertIn(msg, seen) - - def test_bulk_insert(self): - self.assertEqual(self.reporter.currently_processing, None) - self.assertEqual(self.reporter.running_count, None) - self.assertEqual(self.reporter.total, None) - - with self.queue.batch_insert() as queue_put: - queue_put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1) - queue_put(['abc', 'def'], self.TEST_MESSAGE_2, available_after=-1) - - self.queue.update_metrics() - self.assertEqual(self.reporter.currently_processing, False) - self.assertEqual(self.reporter.running_count, 0) - self.assertEqual(self.reporter.total, 1) - - with self.queue.batch_insert() as queue_put: - queue_put(['abd', 'def'], self.TEST_MESSAGE_1, available_after=-1) - queue_put(['abd', 'ghi'], self.TEST_MESSAGE_2, available_after=-1) - - self.queue.update_metrics() - self.assertEqual(self.reporter.currently_processing, False) - self.assertEqual(self.reporter.running_count, 0) - self.assertEqual(self.reporter.total, 3) - - def test_num_available_between(self): - now = datetime.utcnow() - self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) - self.queue.put(['abc', 'ghi'], self.TEST_MESSAGE_2, available_after=-5) - - # Partial results - count = self.queue.num_available_jobs_between(now-timedelta(seconds=8), now, ['abc']) - self.assertEqual(1, count) - - # All results - count = self.queue.num_available_jobs_between(now-timedelta(seconds=20), now, ['/abc']) - self.assertEqual(2, count) - - # No results - count = self.queue.num_available_jobs_between(now, now, 'abc') - self.assertEqual(0, count) - - def test_incomplete(self): - # Add an item. - self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) - - now = datetime.utcnow() - count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, - ['/somenamespace']) - self.assertEqual(1, count) - - # Retrieve it. - item = self.queue.get() - self.assertIsNotNone(item) - self.assertTrue(self.reporter.currently_processing) - - # Mark it as incomplete. - self.queue.incomplete(item, retry_after=-1) - self.assertFalse(self.reporter.currently_processing) - - # Retrieve again to ensure it is once again available. - same_item = self.queue.get() - self.assertIsNotNone(same_item) - self.assertTrue(self.reporter.currently_processing) - - self.assertEquals(item.id, same_item.id) - - def test_complete(self): - # Add an item. - self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) - - now = datetime.utcnow() - count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, - ['/somenamespace']) - self.assertEqual(1, count) - - # Retrieve it. - item = self.queue.get() - self.assertIsNotNone(item) - self.assertTrue(self.reporter.currently_processing) - - # Mark it as complete. - self.queue.complete(item) - self.assertFalse(self.reporter.currently_processing) - - def test_cancel(self): - # Add an item. - self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) - self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_2, available_after=-5) - - now = datetime.utcnow() - count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, - ['/somenamespace']) - self.assertEqual(2, count) - - # Retrieve it. - item = self.queue.get() - self.assertIsNotNone(item) - - # Make sure we can cancel it. - self.assertTrue(self.queue.cancel(item.id)) - - now = datetime.utcnow() - count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, - ['/somenamespace']) - self.assertEqual(1, count) - - # Make sure it is gone. - self.assertFalse(self.queue.cancel(item.id)) - - def test_deleted_namespaced_items(self): - self.queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory, - reporter=self.reporter, - has_namespace=True)) - - self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) - self.queue.put(['somenamespace', 'abc', 'ghi'], self.TEST_MESSAGE_2, available_after=-5) - self.queue.put(['anothernamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10) - - # Ensure we have 2 items under `somenamespace` and 1 item under `anothernamespace`. - now = datetime.utcnow() - count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, - ['/somenamespace']) - self.assertEqual(2, count) - - count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, - ['/anothernamespace']) - self.assertEqual(1, count) - - # Delete all `somenamespace` items. - self.queue.delete_namespaced_items('somenamespace') - - # Check the updated counts. - count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, - ['/somenamespace']) - self.assertEqual(0, count) - - count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, - ['/anothernamespace']) - self.assertEqual(1, count) - - # Delete all `anothernamespace` items. - self.queue.delete_namespaced_items('anothernamespace') - - # Check the updated counts. - count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, - ['/somenamespace']) - self.assertEqual(0, count) - - count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now, - ['/anothernamespace']) - self.assertEqual(0, count) - - -if __name__ == '__main__': - unittest.main()