import unittest import logging import random from datetime import datetime, timedelta from util.migrate.allocator import CompletedKeys, NoAvailableKeysError class CompletedTestCase(unittest.TestCase): def test_merge_blocks_operations(self): candidates = CompletedKeys(10) candidates.mark_completed(1, 5) self.assertTrue(candidates.is_available(5)) self.assertTrue(candidates.is_available(0)) self.assertFalse(candidates.is_available(1)) self.assertFalse(candidates.is_available(4)) self.assertFalse(candidates.is_available(11)) self.assertFalse(candidates.is_available(10)) self.assertEqual(1, len(candidates._slabs)) candidates.mark_completed(5, 6) self.assertFalse(candidates.is_available(5)) self.assertTrue(candidates.is_available(6)) self.assertEqual(1, len(candidates._slabs)) candidates.mark_completed(3, 8) self.assertTrue(candidates.is_available(9)) self.assertTrue(candidates.is_available(8)) self.assertFalse(candidates.is_available(7)) self.assertEqual(1, len(candidates._slabs)) def test_adjust_max(self): candidates = CompletedKeys(10) self.assertEqual(0, len(candidates._slabs)) self.assertTrue(candidates.is_available(9)) candidates.mark_completed(5, 12) self.assertEqual(0, len(candidates._slabs)) self.assertFalse(candidates.is_available(9)) self.assertTrue(candidates.is_available(4)) def test_adjust_min(self): candidates = CompletedKeys(10) self.assertEqual(0, len(candidates._slabs)) self.assertTrue(candidates.is_available(2)) candidates.mark_completed(0, 3) self.assertEqual(0, len(candidates._slabs)) self.assertFalse(candidates.is_available(2)) self.assertTrue(candidates.is_available(4)) def test_inside_block(self): candidates = CompletedKeys(10) candidates.mark_completed(1, 8) self.assertEqual(1, len(candidates._slabs)) candidates.mark_completed(2, 5) self.assertEqual(1, len(candidates._slabs)) self.assertFalse(candidates.is_available(1)) self.assertFalse(candidates.is_available(5)) def test_wrap_block(self): candidates = CompletedKeys(10) candidates.mark_completed(2, 5) self.assertEqual(1, len(candidates._slabs)) candidates.mark_completed(1, 8) self.assertEqual(1, len(candidates._slabs)) self.assertFalse(candidates.is_available(1)) self.assertFalse(candidates.is_available(5)) def test_non_contiguous(self): candidates = CompletedKeys(10) candidates.mark_completed(1, 5) self.assertEqual(1, len(candidates._slabs)) self.assertTrue(candidates.is_available(5)) self.assertTrue(candidates.is_available(6)) candidates.mark_completed(6, 8) self.assertEqual(2, len(candidates._slabs)) self.assertTrue(candidates.is_available(5)) self.assertFalse(candidates.is_available(6)) def test_big_merge(self): candidates = CompletedKeys(10) candidates.mark_completed(1, 5) self.assertEqual(1, len(candidates._slabs)) candidates.mark_completed(6, 8) self.assertEqual(2, len(candidates._slabs)) candidates.mark_completed(5, 6) self.assertEqual(1, len(candidates._slabs)) def test_range_limits(self): candidates = CompletedKeys(10) self.assertFalse(candidates.is_available(-1)) self.assertFalse(candidates.is_available(10)) self.assertTrue(candidates.is_available(9)) self.assertTrue(candidates.is_available(0)) def test_random_saturation(self): candidates = CompletedKeys(100) with self.assertRaises(NoAvailableKeysError): for _ in range(101): start = candidates.get_block_start_index(10) self.assertTrue(candidates.is_available(start)) candidates.mark_completed(start, start + 10) def test_huge_dataset(self): candidates = CompletedKeys(1024 * 1024) start_time = datetime.now() iterations = 0 with self.assertRaises(NoAvailableKeysError): while (datetime.now() - start_time) < timedelta(seconds=10): start = candidates.get_block_start_index(1024) self.assertTrue(candidates.is_available(start)) candidates.mark_completed(start, start + random.randint(512, 1024)) iterations += 1 self.assertGreater(iterations, 1024) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) unittest.main()