diff --git a/requirements-nover.txt b/requirements-nover.txt index 0ebe5534d..e55df79f7 100644 --- a/requirements-nover.txt +++ b/requirements-nover.txt @@ -56,3 +56,4 @@ toposort rfc3987 pyjwkest jsonpath-rw +bintrees \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index a6f4c28a3..1fe1e4248 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ APScheduler==3.0.3 autobahn==0.9.3-3 Babel==1.3 beautifulsoup4==4.4.0 +bintrees==2.0.2 blinker==1.3 boto==2.38.0 cachetools==1.0.3 diff --git a/test/test_backfill_allocator.py b/test/test_backfill_allocator.py new file mode 100644 index 000000000..40000cdb5 --- /dev/null +++ b/test/test_backfill_allocator.py @@ -0,0 +1,132 @@ +import unittest +import logging +import random + +from datetime import datetime, timedelta +from util.migrate.allocator import CompletedKeys, NoAvailableKeysError + + +class CompletedTestCase(unittest.TestCase): + def test_merge_blocks_operations(self): + candidates = CompletedKeys(10) + candidates.mark_completed(1, 5) + + self.assertTrue(candidates.is_available(5)) + self.assertTrue(candidates.is_available(0)) + self.assertFalse(candidates.is_available(1)) + self.assertFalse(candidates.is_available(4)) + self.assertFalse(candidates.is_available(11)) + self.assertFalse(candidates.is_available(10)) + self.assertEqual(1, len(candidates._slabs)) + + candidates.mark_completed(5, 6) + self.assertFalse(candidates.is_available(5)) + self.assertTrue(candidates.is_available(6)) + self.assertEqual(1, len(candidates._slabs)) + + candidates.mark_completed(3, 8) + self.assertTrue(candidates.is_available(9)) + self.assertTrue(candidates.is_available(8)) + self.assertFalse(candidates.is_available(7)) + self.assertEqual(1, len(candidates._slabs)) + + def test_adjust_max(self): + candidates = CompletedKeys(10) + self.assertEqual(0, len(candidates._slabs)) + + self.assertTrue(candidates.is_available(9)) + candidates.mark_completed(5, 12) + self.assertEqual(0, len(candidates._slabs)) + + self.assertFalse(candidates.is_available(9)) + self.assertTrue(candidates.is_available(4)) + + def test_adjust_min(self): + candidates = CompletedKeys(10) + self.assertEqual(0, len(candidates._slabs)) + + self.assertTrue(candidates.is_available(2)) + candidates.mark_completed(0, 3) + self.assertEqual(0, len(candidates._slabs)) + + self.assertFalse(candidates.is_available(2)) + self.assertTrue(candidates.is_available(4)) + + def test_inside_block(self): + candidates = CompletedKeys(10) + candidates.mark_completed(1, 8) + self.assertEqual(1, len(candidates._slabs)) + + candidates.mark_completed(2, 5) + self.assertEqual(1, len(candidates._slabs)) + self.assertFalse(candidates.is_available(1)) + self.assertFalse(candidates.is_available(5)) + + def test_wrap_block(self): + candidates = CompletedKeys(10) + candidates.mark_completed(2, 5) + self.assertEqual(1, len(candidates._slabs)) + + candidates.mark_completed(1, 8) + self.assertEqual(1, len(candidates._slabs)) + self.assertFalse(candidates.is_available(1)) + self.assertFalse(candidates.is_available(5)) + + def test_non_contiguous(self): + candidates = CompletedKeys(10) + + candidates.mark_completed(1, 5) + self.assertEqual(1, len(candidates._slabs)) + self.assertTrue(candidates.is_available(5)) + self.assertTrue(candidates.is_available(6)) + + candidates.mark_completed(6, 8) + self.assertEqual(2, len(candidates._slabs)) + self.assertTrue(candidates.is_available(5)) + self.assertFalse(candidates.is_available(6)) + + def test_big_merge(self): + candidates = CompletedKeys(10) + + candidates.mark_completed(1, 5) + self.assertEqual(1, len(candidates._slabs)) + + candidates.mark_completed(6, 8) + self.assertEqual(2, len(candidates._slabs)) + + candidates.mark_completed(5, 6) + self.assertEqual(1, len(candidates._slabs)) + + def test_range_limits(self): + candidates = CompletedKeys(10) + self.assertFalse(candidates.is_available(-1)) + self.assertFalse(candidates.is_available(10)) + + self.assertTrue(candidates.is_available(9)) + self.assertTrue(candidates.is_available(0)) + + def test_random_saturation(self): + candidates = CompletedKeys(100) + with self.assertRaises(NoAvailableKeysError): + for _ in range(101): + start = candidates.get_block_start_index(10) + self.assertTrue(candidates.is_available(start)) + candidates.mark_completed(start, start + 10) + + def test_huge_dataset(self): + candidates = CompletedKeys(1024 * 1024) + start_time = datetime.now() + iterations = 0 + with self.assertRaises(NoAvailableKeysError): + while (datetime.now() - start_time) < timedelta(seconds=10): + start = candidates.get_block_start_index(1024) + self.assertTrue(candidates.is_available(start)) + candidates.mark_completed(start, start + random.randint(512, 1024)) + iterations += 1 + + self.assertGreater(iterations, 1024) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + unittest.main() diff --git a/util/migrate/__init__.py b/util/migrate/__init__.py index e1770742d..809bcaef8 100644 --- a/util/migrate/__init__.py +++ b/util/migrate/__init__.py @@ -2,7 +2,6 @@ import logging from sqlalchemy.types import TypeDecorator, Text from sqlalchemy.dialects.mysql import TEXT as MySQLText, LONGTEXT -from random import shuffle logger = logging.getLogger(__name__) @@ -21,56 +20,3 @@ class UTF8LongText(TypeDecorator): return dialect.type_descriptor(LONGTEXT(charset='utf8mb4', collation='utf8mb4_unicode_ci')) else: return dialect.type_descriptor(Text()) - - -def _chance_duplication(pop_size, samples): - """ The chance of randomly selecting a duplicate when you choose the specified number of samples - from the specified population size. - """ - pairs = (samples * (samples - 1)) / 2.0 - unique = (pop_size - 1.0)/pop_size - all_unique = pow(unique, pairs) - return 1 - all_unique - - -def _num_checks(pop_size, desired): - """ Binary search for the proper number of entries to use to get the specified collision - probability. - """ - s_max = pop_size - s_min = 0 - last_test = -1 - s_test = s_max - - while s_max > s_min and last_test != s_test: - last_test = s_test - s_test = (s_max + s_min)/2 - chance = _chance_duplication(pop_size, s_test) - if chance > desired: - s_max = s_test - 1 - else: - s_min = s_test - - return s_test - - -def yield_random_entries(batch_query, batch_size, collision_chance): - """ This method will yield semi-random items from a query in a database friendly way until no - more items match the base query modifier. It will pull batches of batch_size from the query - and yield enough items from each batch so that concurrent workers have a reduced chance of - selecting the same items. For example, if your batches return 10,000 entries, and you desire - only a .03 collision_chance, we will only use 25 random entries before going back to the db - for a new batch. - """ - - # Seed with some data which will pass the condition, but will be immediately discarded - all_candidates = [1] - while len(all_candidates) > 0: - all_candidates = list(batch_query().limit(batch_size)) - shuffle(all_candidates) - num_selections = max(1, _num_checks(len(all_candidates), collision_chance)) - logger.debug('Found %s/%s matching entries, processing %s', len(all_candidates), batch_size, - num_selections) - candidates = all_candidates[0:num_selections] - for candidate in candidates: - yield candidate diff --git a/util/migrate/allocator.py b/util/migrate/allocator.py new file mode 100644 index 000000000..999416ace --- /dev/null +++ b/util/migrate/allocator.py @@ -0,0 +1,156 @@ +import logging +import random + +from bintrees import RBTree +from threading import Event + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class NoAvailableKeysError(ValueError): + pass + + +class CompletedKeys(object): + def __init__(self, max_index): + self._max_index = max_index + self._min_index = 0 + self._slabs = RBTree() + + def _get_previous_or_none(self, index): + try: + return self._slabs.floor_item(index) + except KeyError: + return None + + def is_available(self, index): + logger.debug('Testing index %s', index) + if index >= self._max_index or index < self._min_index: + logger.debug('Index out of range') + return False + + try: + prev_start, prev_length = self._slabs.floor_item(index) + logger.debug('Prev range: %s-%s', prev_start, prev_start + prev_length) + return (prev_start + prev_length) <= index + except KeyError: + return True + + def mark_completed(self, start_index, past_last_index): + logger.debug('Marking the range completed: %s-%s', start_index, past_last_index) + # Find the item directly before this and see if there is overlap + to_discard = set() + try: + prev_start, prev_length = self._slabs.floor_item(start_index) + if prev_start + prev_length >= start_index: + # we are going to merge with the range before us + logger.debug('Merging with the prev range: %s-%s', prev_start, prev_start + prev_length) + to_discard.add(prev_start) + start_index = prev_start + past_last_index = max(past_last_index, prev_start + prev_length) + except KeyError: + pass + + # Find all keys between the start and last index and merge them into one block + for merge_start, merge_length in self._slabs.iter_items(start_index, past_last_index + 1): + candidate_next_index = merge_start + merge_length + logger.debug('Merging with block %s-%s', merge_start, candidate_next_index) + to_discard.add(merge_start) + past_last_index = max(past_last_index, candidate_next_index) + + # write the new block which is fully merged + discard = False + if past_last_index >= self._max_index: + logger.debug('Discarding block and setting new max to: %s', start_index) + self._max_index = start_index + discard = True + + if start_index <= self._min_index: + logger.debug('Discarding block and setting new min to: %s', past_last_index) + self._min_index = past_last_index + discard = True + + if to_discard: + logger.debug('Discarding %s obsolte blocks', len(to_discard)) + self._slabs.remove_items(to_discard) + + if not discard: + logger.debug('Writing new block with range: %s-%s', start_index, past_last_index) + self._slabs.insert(start_index, past_last_index - start_index) + + logger.debug('Total blocks: %s', len(self._slabs)) + + def get_block_start_index(self, block_size_estimate): + logger.debug('Total range: %s-%s', self._min_index, self._max_index) + if self._max_index <= self._min_index: + raise NoAvailableKeysError('All indexes have been marked completed') + + num_holes = len(self._slabs) + 1 + random_hole = random.randint(0, num_holes - 1) + logger.debug('Selected random hole %s with %s total holes', random_hole, num_holes) + + hole_start = self._min_index + past_hole_end = self._max_index + + # Now that we have picked a hole, we need to define the bounds + if random_hole > 0: + # There will be a slab before this hole, find where it ends + bound_entries = self._slabs.nsmallest(random_hole + 1)[-2:] + left_index, left_len = bound_entries[0] + logger.debug('Left range %s-%s', left_index, left_index + left_len) + hole_start = left_index + left_len + + if len(bound_entries) > 1: + right_index, right_len = bound_entries[1] + logger.debug('Right range %s-%s', right_index, right_index + right_len) + past_hole_end, _ = bound_entries[1] + elif not self._slabs.is_empty(): + right_index, right_len = self._slabs.nsmallest(1)[0] + logger.debug('Right range %s-%s', right_index, right_index + right_len) + past_hole_end, _ = self._slabs.nsmallest(1)[0] + + # Now that we have our hole bounds, select a random block from [0:len - block_size_estimate] + logger.debug('Selecting from hole range: %s-%s', hole_start, past_hole_end) + rand_max_bound = max(hole_start, past_hole_end - block_size_estimate) + logger.debug('Rand max bound: %s', rand_max_bound) + return random.randint(hole_start, rand_max_bound) + + +def yield_random_entries(batch_query, primary_key_field, batch_size, max_id): + """ This method will yield items from random blocks in the database. We will track metadata + about which keys are available for work, and we will complete the backfill when there is no + more work to be done. The method yields tupes of (candidate, Event), and if the work was + already done by another worker, the caller should set the event. Batch candidates must have + an "id" field which can be inspected. + """ + + allocator = CompletedKeys(max_id + 1) + + try: + while True: + start_index = allocator.get_block_start_index(batch_size) + all_candidates = list(batch_query() + .limit(batch_size) + .where(primary_key_field >= start_index)) + + if len(all_candidates) == 0: + logger.debug('No candidates, new highest id: %s', start_index) + allocator.mark_completed(start_index, max_id) + continue + + logger.debug('Found %s candidates, processing block') + for candidate in all_candidates: + abort_early = Event() + yield candidate, abort_early + if abort_early.is_set(): + logger.debug('Overlap with another worker, aborting') + break + + completed_through = candidate.id + 1 + logger.debug('Marking id range as completed: %s-%s', start_index, completed_through) + allocator.mark_completed(start_index, completed_through) + + except NoAvailableKeysError: + logger.debug('No more work') diff --git a/util/migrate/backfill_content_checksums.py b/util/migrate/backfill_content_checksums.py index 645b5539e..86770a35b 100644 --- a/util/migrate/backfill_content_checksums.py +++ b/util/migrate/backfill_content_checksums.py @@ -3,12 +3,12 @@ import logging from peewee import JOIN_LEFT_OUTER from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField, - TextField) + TextField, fn) from data.database import BaseModel, db, db_for_update, CloseForLongOperation from app import app, storage from digest import checksums -from util.migrate import yield_random_entries +from util.migrate.allocator import yield_random_entries logger = logging.getLogger(__name__) @@ -76,7 +76,9 @@ def backfill_content_checksums(): .select(ImageStorage.id, ImageStorage.uuid) .where(ImageStorage.content_checksum >> None, ImageStorage.uploading == False)) - for candidate_storage in yield_random_entries(batch_query, 10000, 0.1): + max_id = ImageStorage.select(fn.Max(ImageStorage.id)).scalar() + + for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, 1000, max_id): logger.debug('Computing content checksum for storage: %s', candidate_storage.uuid) locations = _get_image_storage_locations(candidate_storage.id) @@ -97,12 +99,13 @@ def backfill_content_checksums(): to_update = db_for_update(ImageStorage.get(ImageStorage.id == candidate_storage.id)) if to_update.content_checksum is not None: logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid) + abort.set() else: logger.debug('Setting content checksum to %s for %s', checksum, candidate_storage.uuid) to_update.content_checksum = checksum to_update.save() -if __name__ == "__main__": +if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) # logging.getLogger('peewee').setLevel(logging.CRITICAL) backfill_content_checksums() diff --git a/util/migrate/backfill_parent_id.py b/util/migrate/backfill_parent_id.py index 1d2cf4136..fc2db3bff 100644 --- a/util/migrate/backfill_parent_id.py +++ b/util/migrate/backfill_parent_id.py @@ -1,8 +1,41 @@ import logging -from data.database import Image, ImageStorage, db, db_for_update +from data.database import BaseModel, db, db_for_update +from peewee import (fn, CharField, BigIntegerField, ForeignKeyField, BooleanField, DateTimeField, + TextField, IntegerField) from app import app -from util.migrate import yield_random_entries +from util.migrate.allocator import yield_random_entries + +class Repository(BaseModel): + pass + + +# Vendor the information from tables we will be writing to at the time of this migration +class ImageStorage(BaseModel): + uuid = CharField(index=True, unique=True) + checksum = CharField(null=True) + image_size = BigIntegerField(null=True) + uncompressed_size = BigIntegerField(null=True) + uploading = BooleanField(default=True, null=True) + cas_path = BooleanField(default=True) + content_checksum = CharField(null=True, index=True) + + +class Image(BaseModel): + docker_image_id = CharField(index=True) + repository = ForeignKeyField(Repository) + ancestors = CharField(index=True, default='/', max_length=64535, null=True) + storage = ForeignKeyField(ImageStorage, index=True, null=True) + created = DateTimeField(null=True) + comment = TextField(null=True) + command = TextField(null=True) + aggregate_size = BigIntegerField(null=True) + v1_json_metadata = TextField(null=True) + v1_checksum = CharField(null=True) + + security_indexed = BooleanField(default=False) + security_indexed_engine = IntegerField(default=-1) + parent_id = IntegerField(index=True, null=True) logger = logging.getLogger(__name__) @@ -21,20 +54,23 @@ def backfill_parent_id(): .where(Image.parent_id >> None, Image.ancestors != '/', ImageStorage.uploading == False)) - for to_backfill in yield_random_entries(fetch_batch, 10000, 0.3): + max_id = Image.select(fn.Max(Image.id)).scalar() + + for to_backfill, abort in yield_random_entries(fetch_batch, Image.id, 1000, max_id): with app.config['DB_TRANSACTION_FACTORY'](db): try: image = db_for_update(Image .select() - .where(Image.id == to_backfill.id)).get() + .where(Image.id == to_backfill.id, Image.parent_id >> None)).get() image.parent_id = int(to_backfill.ancestors.split('/')[-2]) image.save() except Image.DoesNotExist: - pass + logger.info('Collision with another worker, aborting batch') + abort.set() logger.debug('backfill_parent_id: Completed') -if __name__ == "__main__": +if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) logging.getLogger('peewee').setLevel(logging.CRITICAL) diff --git a/util/migrate/backfill_v1_checksums.py b/util/migrate/backfill_v1_checksums.py index aef7af72e..e9516cf91 100644 --- a/util/migrate/backfill_v1_checksums.py +++ b/util/migrate/backfill_v1_checksums.py @@ -1,9 +1,9 @@ import logging from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField, - TextField) + TextField, fn) from data.database import BaseModel, db, db_for_update -from util.migrate import yield_random_entries +from util.migrate.allocator import yield_random_entries from app import app @@ -48,20 +48,22 @@ def backfill_checksums(): .where(Image.v1_checksum >> None, ImageStorage.uploading == False, ~(ImageStorage.checksum >> None))) - for candidate_image in yield_random_entries(batch_query, 10000, 0.1): - logger.debug('Computing content checksum for storage: %s', candidate_image.id) + max_id = Image.select(fn.Max(Image.id)).scalar() + for candidate_image, abort in yield_random_entries(batch_query, Image.id, 1000, max_id): with app.config['DB_TRANSACTION_FACTORY'](db): try: image = db_for_update(Image .select(Image, ImageStorage) .join(ImageStorage) - .where(Image.id == candidate_image.id)).get() + .where(Image.id == candidate_image.id, + Image.v1_checksum >> None)).get() image.v1_checksum = image.storage.checksum image.save() except Image.DoesNotExist: - pass + logger.info('Collision with another worker, aborting batch') + abort.set() if __name__ == "__main__":