diff --git a/conf/init/service/securityworker/run b/conf/init/service/securityworker/run index c40f9aa4b..22a47af96 100755 --- a/conf/init/service/securityworker/run +++ b/conf/init/service/securityworker/run @@ -3,6 +3,6 @@ echo 'Starting security scanner worker' cd / -venv/bin/python -m workers.securityworker 2>&1 +venv/bin/python -m workers.securityworker.securityworker 2>&1 echo 'Security scanner worker exited' diff --git a/workers/securityworker.py b/workers/securityworker.py deleted file mode 100644 index fb59754ac..000000000 --- a/workers/securityworker.py +++ /dev/null @@ -1,119 +0,0 @@ -import logging.config -import time - -from math import log10 - -import features - -from app import app, secscan_api, prometheus -from workers.worker import Worker -from data.database import UseThenDisconnect -from data.model.image import (get_images_eligible_for_scan, get_image_pk_field, - get_max_id_for_sec_scan, get_min_id_for_sec_scan) -from util.secscan.api import SecurityConfigValidator, APIRequestFailure -from util.secscan.analyzer import LayerAnalyzer, PreemptedException -from util.migrate.allocator import yield_random_entries -from util.log import logfile_path -from endpoints.v2 import v2_bp - - -DEFAULT_INDEXING_INTERVAL = 30 - - -logger = logging.getLogger(__name__) -unscanned_images_gauge = prometheus.create_gauge('unscanned_images', - 'Number of images that clair needs to scan.') -max_unscanned_images_gauge = prometheus.create_gauge('max_unscanned_image_id', - 'Max ID of the unscanned images.') - -def index_images(min_id, target_version, analyzer): - def batch_query(): - return get_images_eligible_for_scan(target_version) - - # Get the ID of the last image we can analyze. Will be None if there are no images in the - # database. - max_id = get_max_id_for_sec_scan() - if max_id is None: - return None - - if min_id is None or min_id > max_id: - logger.info('Could not find any available images for scanning.') - return None - - max_unscanned_images_gauge.Set(max_id) - - # 4^log10(total) gives us a scalable batch size into the billions. - batch_size = int(4 ** log10(max(10, max_id - min_id))) - - with UseThenDisconnect(app.config): - to_scan_generator = yield_random_entries( - batch_query, - get_image_pk_field(), - batch_size, - max_id, - min_id, - ) - for candidate, abt, num_remaining in to_scan_generator: - try: - analyzer.analyze_recursively(candidate) - except PreemptedException: - logger.info('Another worker pre-empted us for layer: %s', candidate.id) - abt.set() - except APIRequestFailure: - logger.exception('Security scanner service unavailable') - return - - unscanned_images_gauge.Set(num_remaining) - - # If we reach this point, we analyzed every images up to max_id, next time the worker runs, - # we want to start from the next image. - return max_id + 1 - -class SecurityWorker(Worker): - def __init__(self): - super(SecurityWorker, self).__init__() - validator = SecurityConfigValidator(app.config) - if not validator.valid(): - logger.warning('Failed to validate security scan configuration') - return - - self._target_version = app.config.get('SECURITY_SCANNER_ENGINE_VERSION_TARGET', 3) - self._analyzer = LayerAnalyzer(app.config, secscan_api) - self._min_id = None - - interval = app.config.get('SECURITY_SCANNER_INDEXING_INTERVAL', DEFAULT_INDEXING_INTERVAL) - self.add_operation(self._index_images, interval) - - def _index_images(self): - new_min_id = index_images(self.min_id, self._target_version, self._analyzer) - if new_min_id is not None: - self.min_id = new_min_id - - @property - def min_id(self): - """ If it hasn't already been determined, finds the ID of the first image to be analyzed. - First checks the config, then the database, and returns None if there are no images - available for scanning. - """ - if self._min_id is None: - self._min_id = app.config.get('SECURITY_SCANNER_INDEXING_MIN_ID') - if self._min_id is None: - self._min_id = get_min_id_for_sec_scan(self._target_version) - return self._min_id - - @min_id.setter - def min_id(self, new_min_id): - self._min_id = new_min_id - - -if __name__ == '__main__': - app.register_blueprint(v2_bp, url_prefix='/v2') - - if not features.SECURITY_SCANNER: - logger.debug('Security scanner disabled; skipping SecurityWorker') - while True: - time.sleep(100000) - - logging.config.fileConfig(logfile_path(debug=True), disable_existing_loggers=False) - worker = SecurityWorker() - worker.start() diff --git a/workers/securityworker/__init__.py b/workers/securityworker/__init__.py new file mode 100644 index 000000000..a101f6795 --- /dev/null +++ b/workers/securityworker/__init__.py @@ -0,0 +1,35 @@ +import logging.config + +from app import app, prometheus +from data.database import UseThenDisconnect +from workers.securityworker.models_pre_oci import pre_oci_model as model +from util.secscan.api import APIRequestFailure +from util.secscan.analyzer import PreemptedException + +logger = logging.getLogger(__name__) +unscanned_images_gauge = prometheus.create_gauge('unscanned_images', + 'Number of images that clair needs to scan.') + +def index_images(target_version, analyzer, token=None): + """ Performs security indexing of all images in the database not scanned at the target version. + If a token is provided, scanning will begin where the token indicates it previously completed. + """ + iterator, next_token = model.candidates_to_scan(target_version, start_token=token) + if iterator is None: + logger.debug('Found no additional images to scan') + return None + + with UseThenDisconnect(app.config): + for candidate, abt, num_remaining in iterator: + try: + analyzer.analyze_recursively(candidate) + except PreemptedException: + logger.info('Another worker pre-empted us for layer: %s', candidate.id) + abt.set() + except APIRequestFailure: + logger.exception('Security scanner service unavailable') + return + + unscanned_images_gauge.Set(num_remaining) + + return next_token diff --git a/workers/securityworker/models_interface.py b/workers/securityworker/models_interface.py new file mode 100644 index 000000000..6d872ca4e --- /dev/null +++ b/workers/securityworker/models_interface.py @@ -0,0 +1,28 @@ +from abc import ABCMeta, abstractmethod +from collections import namedtuple + +from six import add_metaclass + +class ScanToken(namedtuple('NextScanToken', ['min_id'])): + """ + ScanToken represents an opaque token that can be passed between runs of the security worker + to continue scanning whereever the previous run left off. Note that the data of the token is + *opaque* to the security worker, and the security worker should *not* pull any data out or modify + the token in any way. + """ + +@add_metaclass(ABCMeta) +class SecurityWorkerDataInterface(object): + """ + Interface that represents all data store interactions required by the security worker. + """ + + @abstractmethod + def candidates_to_scan(self, target_version, start_token=None): + """ + Returns a tuple consisting of an iterator of all the candidates to scan and a NextScanToken. + The iterator returns a tuple for each iteration consisting of the candidate Image, the abort + signal, and the number of remaining candidates. If the iterator returned is None, there are + no candidates to process. + """ + pass diff --git a/workers/securityworker/models_pre_oci.py b/workers/securityworker/models_pre_oci.py new file mode 100644 index 000000000..a665cac17 --- /dev/null +++ b/workers/securityworker/models_pre_oci.py @@ -0,0 +1,52 @@ +from math import log10 + +from app import app +from data.model.image import (get_images_eligible_for_scan, get_image_pk_field, + get_max_id_for_sec_scan, get_min_id_for_sec_scan) +from util.migrate.allocator import yield_random_entries + +from workers.securityworker.models_interface import ( + ScanToken, + SecurityWorkerDataInterface +) + +class PreOCIModel(SecurityWorkerDataInterface): + def candidates_to_scan(self, target_version, start_token=None): + def batch_query(): + return get_images_eligible_for_scan(target_version) + + # Find the minimum ID. + min_id = None + if start_token is not None: + min_id = start_token.min_id + else: + min_id = app.config.get('SECURITY_SCANNER_INDEXING_MIN_ID') + if min_id is None: + min_id = get_min_id_for_sec_scan(target_version) + + # Get the ID of the last image we can analyze. Will be None if there are no images in the + # database. + max_id = get_max_id_for_sec_scan() + if max_id is None: + return (None, None) + + if min_id is None or min_id > max_id: + return (None, None) + + # 4^log10(total) gives us a scalable batch size into the billions. + batch_size = int(4 ** log10(max(10, max_id - min_id))) + + # TODO: Once we have a clean shared NamedTuple for Images, send that to the secscan analyzer + # rather than the database Image itself. + iterator = yield_random_entries( + batch_query, + get_image_pk_field(), + batch_size, + max_id, + min_id, + ) + + return (iterator, ScanToken(max_id + 1)) + + +pre_oci_model = PreOCIModel() diff --git a/workers/securityworker/securityworker.py b/workers/securityworker/securityworker.py new file mode 100644 index 000000000..732631e3a --- /dev/null +++ b/workers/securityworker/securityworker.py @@ -0,0 +1,47 @@ +import logging.config +import time + +import features + +from app import app, secscan_api +from workers.worker import Worker +from workers.securityworker import index_images +from util.secscan.api import SecurityConfigValidator +from util.secscan.analyzer import LayerAnalyzer +from util.log import logfile_path +from endpoints.v2 import v2_bp + +logger = logging.getLogger(__name__) + +DEFAULT_INDEXING_INTERVAL = 30 + +class SecurityWorker(Worker): + def __init__(self): + super(SecurityWorker, self).__init__() + validator = SecurityConfigValidator(app.config) + if not validator.valid(): + logger.warning('Failed to validate security scan configuration') + return + + self._target_version = app.config.get('SECURITY_SCANNER_ENGINE_VERSION_TARGET', 3) + self._analyzer = LayerAnalyzer(app.config, secscan_api) + self._next_token = None + + interval = app.config.get('SECURITY_SCANNER_INDEXING_INTERVAL', DEFAULT_INDEXING_INTERVAL) + self.add_operation(self._index_images, interval) + + def _index_images(self): + self._next_token = index_images(self._target_version, self._analyzer, self._next_token) + + +if __name__ == '__main__': + app.register_blueprint(v2_bp, url_prefix='/v2') + + if not features.SECURITY_SCANNER: + logger.debug('Security scanner disabled; skipping SecurityWorker') + while True: + time.sleep(100000) + + logging.config.fileConfig(logfile_path(debug=True), disable_existing_loggers=False) + worker = SecurityWorker() + worker.start() diff --git a/workers/test/test_securityworker.py b/workers/securityworker/test/test_securityworker.py similarity index 80% rename from workers/test/test_securityworker.py rename to workers/securityworker/test/test_securityworker.py index de3927450..4b52f8def 100644 --- a/workers/test/test_securityworker.py +++ b/workers/securityworker/test/test_securityworker.py @@ -5,5 +5,5 @@ from workers.securityworker import index_images def test_securityworker_realdb(initialized_db): mock_analyzer = Mock() - assert index_images(0, 1, mock_analyzer) is not None + assert index_images(1, mock_analyzer) is not None mock_analyzer.analyze_recursively.assert_called()