diff --git a/workers/securityworker.py b/workers/securityworker.py index 487b2f537..fb59754ac 100644 --- a/workers/securityworker.py +++ b/workers/securityworker.py @@ -26,6 +26,49 @@ unscanned_images_gauge = prometheus.create_gauge('unscanned_images', max_unscanned_images_gauge = prometheus.create_gauge('max_unscanned_image_id', 'Max ID of the unscanned images.') +def index_images(min_id, target_version, analyzer): + def batch_query(): + return get_images_eligible_for_scan(target_version) + + # Get the ID of the last image we can analyze. Will be None if there are no images in the + # database. + max_id = get_max_id_for_sec_scan() + if max_id is None: + return None + + if min_id is None or min_id > max_id: + logger.info('Could not find any available images for scanning.') + return None + + max_unscanned_images_gauge.Set(max_id) + + # 4^log10(total) gives us a scalable batch size into the billions. + batch_size = int(4 ** log10(max(10, max_id - min_id))) + + with UseThenDisconnect(app.config): + to_scan_generator = yield_random_entries( + batch_query, + get_image_pk_field(), + batch_size, + max_id, + min_id, + ) + for candidate, abt, num_remaining in to_scan_generator: + try: + analyzer.analyze_recursively(candidate) + except PreemptedException: + logger.info('Another worker pre-empted us for layer: %s', candidate.id) + abt.set() + except APIRequestFailure: + logger.exception('Security scanner service unavailable') + return + + unscanned_images_gauge.Set(num_remaining) + + # If we reach this point, we analyzed every images up to max_id, next time the worker runs, + # we want to start from the next image. + return max_id + 1 + class SecurityWorker(Worker): def __init__(self): super(SecurityWorker, self).__init__() @@ -42,48 +85,9 @@ class SecurityWorker(Worker): self.add_operation(self._index_images, interval) def _index_images(self): - def batch_query(): - return get_images_eligible_for_scan(self._target_version) - - # Get the ID of the last image we can analyze. Will be None if there are no images in the - # database. - max_id = get_max_id_for_sec_scan() - if max_id is None: - return - - if self.min_id is None or self.min_id > max_id: - logger.info('Could not find any available images for scanning.') - return - - max_unscanned_images_gauge.Set(max_id) - - # 4^log10(total) gives us a scalable batch size into the billions. - batch_size = int(4 ** log10(max(10, max_id - self.min_id))) - - with UseThenDisconnect(app.config): - to_scan_generator = yield_random_entries( - batch_query, - get_image_pk_field(), - batch_size, - max_id, - self.min_id, - ) - for candidate, abt, num_remaining in to_scan_generator: - try: - self._analyzer.analyze_recursively(candidate) - except PreemptedException: - logger.info('Another worker pre-empted us for layer: %s', candidate.id) - abt.set() - except APIRequestFailure: - logger.exception('Security scanner service unavailable') - return - - unscanned_images_gauge.Set(num_remaining) - - # If we reach this point, we analyzed every images up to max_id, next time the worker runs, - # we want to start from the next image. - self.min_id = max_id + 1 - + new_min_id = index_images(self.min_id, self._target_version, self._analyzer) + if new_min_id is not None: + self.min_id = new_min_id @property def min_id(self): diff --git a/workers/test/test_securityworker.py b/workers/test/test_securityworker.py new file mode 100644 index 000000000..de3927450 --- /dev/null +++ b/workers/test/test_securityworker.py @@ -0,0 +1,9 @@ +from mock import patch, Mock + +from test.fixtures import * +from workers.securityworker import index_images + +def test_securityworker_realdb(initialized_db): + mock_analyzer = Mock() + assert index_images(0, 1, mock_analyzer) is not None + mock_analyzer.analyze_recursively.assert_called()