diff --git a/test/test_backfill_allocator.py b/test/test_backfill_allocator.py index 2cb6c1313..631fb203c 100644 --- a/test/test_backfill_allocator.py +++ b/test/test_backfill_allocator.py @@ -9,6 +9,7 @@ from util.migrate.allocator import CompletedKeys, NoAvailableKeysError, yield_ra class CompletedTestCase(unittest.TestCase): def test_merge_blocks_operations(self): candidates = CompletedKeys(10) + self.assertEqual(10, candidates.num_remaining) candidates.mark_completed(1, 5) self.assertTrue(candidates.is_available(5)) @@ -18,84 +19,104 @@ class CompletedTestCase(unittest.TestCase): self.assertFalse(candidates.is_available(11)) self.assertFalse(candidates.is_available(10)) self.assertEqual(1, len(candidates._slabs)) + self.assertEqual(6, candidates.num_remaining) candidates.mark_completed(5, 6) self.assertFalse(candidates.is_available(5)) self.assertTrue(candidates.is_available(6)) self.assertEqual(1, len(candidates._slabs)) + self.assertEqual(5, candidates.num_remaining) candidates.mark_completed(3, 8) self.assertTrue(candidates.is_available(9)) self.assertTrue(candidates.is_available(8)) self.assertFalse(candidates.is_available(7)) self.assertEqual(1, len(candidates._slabs)) + self.assertEqual(3, candidates.num_remaining) def test_adjust_max(self): candidates = CompletedKeys(10) + self.assertEqual(10, candidates.num_remaining) self.assertEqual(0, len(candidates._slabs)) self.assertTrue(candidates.is_available(9)) candidates.mark_completed(5, 12) self.assertEqual(0, len(candidates._slabs)) + self.assertEqual(5, candidates.num_remaining) self.assertFalse(candidates.is_available(9)) self.assertTrue(candidates.is_available(4)) def test_adjust_min(self): candidates = CompletedKeys(10) + self.assertEqual(10, candidates.num_remaining) self.assertEqual(0, len(candidates._slabs)) self.assertTrue(candidates.is_available(2)) candidates.mark_completed(0, 3) self.assertEqual(0, len(candidates._slabs)) + self.assertEqual(7, candidates.num_remaining) self.assertFalse(candidates.is_available(2)) self.assertTrue(candidates.is_available(4)) def test_inside_block(self): candidates = CompletedKeys(10) + self.assertEqual(10, candidates.num_remaining) candidates.mark_completed(1, 8) self.assertEqual(1, len(candidates._slabs)) + self.assertEqual(3, candidates.num_remaining) candidates.mark_completed(2, 5) self.assertEqual(1, len(candidates._slabs)) + self.assertEqual(3, candidates.num_remaining) self.assertFalse(candidates.is_available(1)) self.assertFalse(candidates.is_available(5)) def test_wrap_block(self): candidates = CompletedKeys(10) + self.assertEqual(10, candidates.num_remaining) candidates.mark_completed(2, 5) self.assertEqual(1, len(candidates._slabs)) + self.assertEqual(7, candidates.num_remaining) candidates.mark_completed(1, 8) self.assertEqual(1, len(candidates._slabs)) + self.assertEqual(3, candidates.num_remaining) self.assertFalse(candidates.is_available(1)) self.assertFalse(candidates.is_available(5)) def test_non_contiguous(self): candidates = CompletedKeys(10) + self.assertEqual(10, candidates.num_remaining) candidates.mark_completed(1, 5) self.assertEqual(1, len(candidates._slabs)) + self.assertEqual(6, candidates.num_remaining) self.assertTrue(candidates.is_available(5)) self.assertTrue(candidates.is_available(6)) candidates.mark_completed(6, 8) self.assertEqual(2, len(candidates._slabs)) + self.assertEqual(4, candidates.num_remaining) self.assertTrue(candidates.is_available(5)) self.assertFalse(candidates.is_available(6)) def test_big_merge(self): candidates = CompletedKeys(10) + self.assertEqual(10, candidates.num_remaining) candidates.mark_completed(1, 5) self.assertEqual(1, len(candidates._slabs)) + self.assertEqual(6, candidates.num_remaining) candidates.mark_completed(6, 8) self.assertEqual(2, len(candidates._slabs)) + self.assertEqual(4, candidates.num_remaining) candidates.mark_completed(5, 6) self.assertEqual(1, len(candidates._slabs)) + self.assertEqual(3, candidates.num_remaining) def test_range_limits(self): candidates = CompletedKeys(10) @@ -113,6 +134,8 @@ class CompletedTestCase(unittest.TestCase): self.assertTrue(candidates.is_available(start)) candidates.mark_completed(start, start + 10) + self.assertEqual(0, candidates.num_remaining) + def test_huge_dataset(self): candidates = CompletedKeys(1024 * 1024) start_time = datetime.now() @@ -125,6 +148,7 @@ class CompletedTestCase(unittest.TestCase): iterations += 1 self.assertGreater(iterations, 1024) + self.assertEqual(0, candidates.num_remaining) class FakeQuery(object): diff --git a/tools/auditmanifests.py b/tools/auditmanifests.py index 43e702d93..922568b53 100644 --- a/tools/auditmanifests.py +++ b/tools/auditmanifests.py @@ -24,7 +24,8 @@ def remove_stale_manifests(): max_manifest_id = TagManifest.select(fn.Max(TagManifest.id)).scalar() problematic = 0 checked = 0 - for found, _ in yield_random_entries(batch_query, TagManifest.id, BATCH_SIZE, max_manifest_id): + manifest_gen = yield_random_entries(batch_query, TagManifest.id, BATCH_SIZE, max_manifest_id) + for found, _, _ in manifest_gen: checked += 1 parsed = SignedManifest(found.json_data, validate=False) logger.debug('Auditing manifest with id: %s for %s/%s', found.digest, parsed.namespace, diff --git a/util/migrate/allocator.py b/util/migrate/allocator.py index 3661095f6..d37abb6e9 100644 --- a/util/migrate/allocator.py +++ b/util/migrate/allocator.py @@ -17,6 +17,7 @@ class CompletedKeys(object): def __init__(self, max_index, min_index=0): self._max_index = max_index self._min_index = min_index + self.num_remaining = max_index - min_index self._slabs = RBTree() def _get_previous_or_none(self, index): @@ -40,14 +41,18 @@ class CompletedKeys(object): def mark_completed(self, start_index, past_last_index): logger.debug('Marking the range completed: %s-%s', start_index, past_last_index) + num_completed = min(past_last_index, self._max_index) - max(start_index, self._min_index) + # Find the item directly before this and see if there is overlap to_discard = set() try: prev_start, prev_length = self._slabs.floor_item(start_index) - if prev_start + prev_length >= start_index: + max_prev_completed = prev_start + prev_length + if max_prev_completed >= start_index: # we are going to merge with the range before us logger.debug('Merging with the prev range: %s-%s', prev_start, prev_start + prev_length) to_discard.add(prev_start) + num_completed = max(num_completed - (max_prev_completed - start_index), 0) start_index = prev_start past_last_index = max(past_last_index, prev_start + prev_length) except KeyError: @@ -55,8 +60,13 @@ class CompletedKeys(object): # Find all keys between the start and last index and merge them into one block for merge_start, merge_length in self._slabs.iter_items(start_index, past_last_index + 1): + if merge_start in to_discard: + logger.debug('Already merged with block %s-%s', merge_start, merge_start + merge_length) + continue + candidate_next_index = merge_start + merge_length logger.debug('Merging with block %s-%s', merge_start, candidate_next_index) + num_completed -= merge_length - max(candidate_next_index - past_last_index, 0) to_discard.add(merge_start) past_last_index = max(past_last_index, candidate_next_index) @@ -73,13 +83,16 @@ class CompletedKeys(object): discard = True if to_discard: - logger.debug('Discarding %s obsolte blocks', len(to_discard)) + logger.debug('Discarding %s obsolete blocks', len(to_discard)) self._slabs.remove_items(to_discard) if not discard: logger.debug('Writing new block with range: %s-%s', start_index, past_last_index) self._slabs.insert(start_index, past_last_index - start_index) + # Update the number of remaining items with the adjustments we've made + assert num_completed >= 0 + self.num_remaining -= num_completed logger.debug('Total blocks: %s', len(self._slabs)) def get_block_start_index(self, block_size_estimate): @@ -145,9 +158,11 @@ def yield_random_entries(batch_query, primary_key_field, batch_size, max_id, min continue logger.info('Found %s candidates, processing block', len(all_candidates)) + batch_completed = 0 for candidate in all_candidates: abort_early = Event() - yield candidate, abort_early + yield candidate, abort_early, allocator.num_remaining - batch_completed + batch_completed += 1 if abort_early.is_set(): logger.info('Overlap with another worker, aborting') break diff --git a/workers/globalpromstats.py b/workers/globalpromstats.py index ae4f43f27..199d0373f 100644 --- a/workers/globalpromstats.py +++ b/workers/globalpromstats.py @@ -43,13 +43,6 @@ class GlobalPrometheusStatsWorker(Worker): metric_queue.org_count.Set(model.organization.get_active_org_count()) metric_queue.robot_count.Set(model.user.get_robot_count()) - if features.SECURITY_SCANNER: - # Clair repo counts. - unscanned_images_gauge.Set( - get_count_of_images_eligible_for_scan(app.config.get('SECURITY_SCANNER_ENGINE_VERSION_TARGET', 2)) - ) - images_gauge.Set(total_image_count()) - def main(): logging.config.fileConfig('conf/logging_debug.conf', disable_existing_loggers=False) diff --git a/workers/securityworker.py b/workers/securityworker.py index a4d72c5ec..ebdf6d5d2 100644 --- a/workers/securityworker.py +++ b/workers/securityworker.py @@ -7,25 +7,28 @@ from app import app, secscan_api, prometheus from workers.worker import Worker from data.database import UseThenDisconnect from data.model.image import (get_images_eligible_for_scan, get_max_id_for_sec_scan, - get_min_id_for_sec_scan, get_image_id) + get_min_id_for_sec_scan, get_image_id, total_image_count) from util.secscan.api import SecurityConfigValidator from util.secscan.analyzer import LayerAnalyzer, PreemptedException from util.migrate.allocator import yield_random_entries from endpoints.v2 import v2_bp + BATCH_SIZE = 50 INDEXING_INTERVAL = 30 + logger = logging.getLogger(__name__) unscanned_images_gauge = prometheus.create_gauge('unscanned_images', 'Number of images that clair needs to scan.') images_gauge = prometheus.create_gauge('all_images', 'Total number of images that clair can scan.') + class SecurityWorker(Worker): def __init__(self): super(SecurityWorker, self).__init__() validator = SecurityConfigValidator(app.config) if validator.valid(): - self._target_version = app.config.get('SECURITY_SCANNER_ENGINE_VERSION_TARGET', 2) + self._target_version = app.config.get('SECURITY_SCANNER_ENGINE_VERSION_TARGET', 3) self._analyzer = LayerAnalyzer(app.config, secscan_api) # Get the ID of the first image we want to analyze. @@ -46,14 +49,24 @@ class SecurityWorker(Worker): return with UseThenDisconnect(app.config): - for candidate, abt in yield_random_entries(batch_query, get_image_id(), BATCH_SIZE, max_id, - self._min_id): + to_scan_generator = yield_random_entries( + batch_query, + get_image_id(), + BATCH_SIZE, + max_id, + self._min_id, + ) + for candidate, abt, num_remaining in to_scan_generator: try: self._analyzer.analyze_recursively(candidate) except PreemptedException: logger.info('Another worker pre-empted us for layer: %s', candidate.id) abt.set() + unscanned_images_gauge.Set(num_remaining) + images_gauge.Set(total_image_count()) + + # If we reach this point, we analyzed every images up to max_id, next time the worker runs, # we want to start from the next image. self._min_id = max_id + 1