import unittest import logging import random from datetime import datetime, timedelta from util.migrate.allocator import CompletedKeys, NoAvailableKeysError, yield_random_entries class CompletedTestCase(unittest.TestCase): def test_merge_blocks_operations(self): candidates = CompletedKeys(10) self.assertEqual(10, candidates.num_remaining) candidates.mark_completed(1, 5) self.assertTrue(candidates.is_available(5)) self.assertTrue(candidates.is_available(0)) self.assertFalse(candidates.is_available(1)) self.assertFalse(candidates.is_available(4)) self.assertFalse(candidates.is_available(11)) self.assertFalse(candidates.is_available(10)) self.assertEqual(1, len(candidates._slabs)) self.assertEqual(6, candidates.num_remaining) candidates.mark_completed(5, 6) self.assertFalse(candidates.is_available(5)) self.assertTrue(candidates.is_available(6)) self.assertEqual(1, len(candidates._slabs)) self.assertEqual(5, candidates.num_remaining) candidates.mark_completed(3, 8) self.assertTrue(candidates.is_available(9)) self.assertTrue(candidates.is_available(8)) self.assertFalse(candidates.is_available(7)) self.assertEqual(1, len(candidates._slabs)) self.assertEqual(3, candidates.num_remaining) def test_adjust_max(self): candidates = CompletedKeys(10) self.assertEqual(10, candidates.num_remaining) self.assertEqual(0, len(candidates._slabs)) self.assertTrue(candidates.is_available(9)) candidates.mark_completed(5, 12) self.assertEqual(0, len(candidates._slabs)) self.assertEqual(5, candidates.num_remaining) self.assertFalse(candidates.is_available(9)) self.assertTrue(candidates.is_available(4)) def test_adjust_min(self): candidates = CompletedKeys(10) self.assertEqual(10, candidates.num_remaining) self.assertEqual(0, len(candidates._slabs)) self.assertTrue(candidates.is_available(2)) candidates.mark_completed(0, 3) self.assertEqual(0, len(candidates._slabs)) self.assertEqual(7, candidates.num_remaining) self.assertFalse(candidates.is_available(2)) self.assertTrue(candidates.is_available(4)) def test_inside_block(self): candidates = CompletedKeys(10) self.assertEqual(10, candidates.num_remaining) candidates.mark_completed(1, 8) self.assertEqual(1, len(candidates._slabs)) self.assertEqual(3, candidates.num_remaining) candidates.mark_completed(2, 5) self.assertEqual(1, len(candidates._slabs)) self.assertEqual(3, candidates.num_remaining) self.assertFalse(candidates.is_available(1)) self.assertFalse(candidates.is_available(5)) def test_wrap_block(self): candidates = CompletedKeys(10) self.assertEqual(10, candidates.num_remaining) candidates.mark_completed(2, 5) self.assertEqual(1, len(candidates._slabs)) self.assertEqual(7, candidates.num_remaining) candidates.mark_completed(1, 8) self.assertEqual(1, len(candidates._slabs)) self.assertEqual(3, candidates.num_remaining) self.assertFalse(candidates.is_available(1)) self.assertFalse(candidates.is_available(5)) def test_non_contiguous(self): candidates = CompletedKeys(10) self.assertEqual(10, candidates.num_remaining) candidates.mark_completed(1, 5) self.assertEqual(1, len(candidates._slabs)) self.assertEqual(6, candidates.num_remaining) self.assertTrue(candidates.is_available(5)) self.assertTrue(candidates.is_available(6)) candidates.mark_completed(6, 8) self.assertEqual(2, len(candidates._slabs)) self.assertEqual(4, candidates.num_remaining) self.assertTrue(candidates.is_available(5)) self.assertFalse(candidates.is_available(6)) def test_big_merge(self): candidates = CompletedKeys(10) self.assertEqual(10, candidates.num_remaining) candidates.mark_completed(1, 5) self.assertEqual(1, len(candidates._slabs)) self.assertEqual(6, candidates.num_remaining) candidates.mark_completed(6, 8) self.assertEqual(2, len(candidates._slabs)) self.assertEqual(4, candidates.num_remaining) candidates.mark_completed(5, 6) self.assertEqual(1, len(candidates._slabs)) self.assertEqual(3, candidates.num_remaining) def test_range_limits(self): candidates = CompletedKeys(10) self.assertFalse(candidates.is_available(-1)) self.assertFalse(candidates.is_available(10)) self.assertTrue(candidates.is_available(9)) self.assertTrue(candidates.is_available(0)) def test_random_saturation(self): candidates = CompletedKeys(100) with self.assertRaises(NoAvailableKeysError): for _ in range(101): start = candidates.get_block_start_index(10) self.assertTrue(candidates.is_available(start)) candidates.mark_completed(start, start + 10) self.assertEqual(0, candidates.num_remaining) def test_huge_dataset(self): candidates = CompletedKeys(1024 * 1024) start_time = datetime.now() iterations = 0 with self.assertRaises(NoAvailableKeysError): while (datetime.now() - start_time) < timedelta(seconds=10): start = candidates.get_block_start_index(1024) self.assertTrue(candidates.is_available(start)) candidates.mark_completed(start, start + random.randint(512, 1024)) iterations += 1 self.assertGreater(iterations, 1024) self.assertEqual(0, candidates.num_remaining) class FakeQuery(object): def __init__(self, result_list): self._result_list = result_list def limit(self, *args, **kwargs): return self def where(self, *args, **kwargs): return self def order_by(self, *args, **kwargs): return self def __iter__(self): return self._result_list.__iter__() class QueryAllocatorTest(unittest.TestCase): FAKE_PK_FIELD = 10 # Must be able to compare to integers def test_no_work(self): def create_empty_query(): return FakeQuery([]) for _ in yield_random_entries(create_empty_query, self.FAKE_PK_FIELD, 1, 10): self.fail('There should never be any actual work!') if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) unittest.main()