From ec81148d7373d3d0c1f55681c10f480ca823becc Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 28 Jun 2017 14:03:57 +0300 Subject: [PATCH 1/3] Add super basic security worker test --- workers/securityworker.py | 88 +++++++++++++++-------------- workers/test/test_securityworker.py | 9 +++ 2 files changed, 55 insertions(+), 42 deletions(-) create mode 100644 workers/test/test_securityworker.py diff --git a/workers/securityworker.py b/workers/securityworker.py index 487b2f537..fb59754ac 100644 --- a/workers/securityworker.py +++ b/workers/securityworker.py @@ -26,6 +26,49 @@ unscanned_images_gauge = prometheus.create_gauge('unscanned_images', max_unscanned_images_gauge = prometheus.create_gauge('max_unscanned_image_id', 'Max ID of the unscanned images.') +def index_images(min_id, target_version, analyzer): + def batch_query(): + return get_images_eligible_for_scan(target_version) + + # Get the ID of the last image we can analyze. Will be None if there are no images in the + # database. + max_id = get_max_id_for_sec_scan() + if max_id is None: + return None + + if min_id is None or min_id > max_id: + logger.info('Could not find any available images for scanning.') + return None + + max_unscanned_images_gauge.Set(max_id) + + # 4^log10(total) gives us a scalable batch size into the billions. + batch_size = int(4 ** log10(max(10, max_id - min_id))) + + with UseThenDisconnect(app.config): + to_scan_generator = yield_random_entries( + batch_query, + get_image_pk_field(), + batch_size, + max_id, + min_id, + ) + for candidate, abt, num_remaining in to_scan_generator: + try: + analyzer.analyze_recursively(candidate) + except PreemptedException: + logger.info('Another worker pre-empted us for layer: %s', candidate.id) + abt.set() + except APIRequestFailure: + logger.exception('Security scanner service unavailable') + return + + unscanned_images_gauge.Set(num_remaining) + + # If we reach this point, we analyzed every images up to max_id, next time the worker runs, + # we want to start from the next image. + return max_id + 1 + class SecurityWorker(Worker): def __init__(self): super(SecurityWorker, self).__init__() @@ -42,48 +85,9 @@ class SecurityWorker(Worker): self.add_operation(self._index_images, interval) def _index_images(self): - def batch_query(): - return get_images_eligible_for_scan(self._target_version) - - # Get the ID of the last image we can analyze. Will be None if there are no images in the - # database. - max_id = get_max_id_for_sec_scan() - if max_id is None: - return - - if self.min_id is None or self.min_id > max_id: - logger.info('Could not find any available images for scanning.') - return - - max_unscanned_images_gauge.Set(max_id) - - # 4^log10(total) gives us a scalable batch size into the billions. - batch_size = int(4 ** log10(max(10, max_id - self.min_id))) - - with UseThenDisconnect(app.config): - to_scan_generator = yield_random_entries( - batch_query, - get_image_pk_field(), - batch_size, - max_id, - self.min_id, - ) - for candidate, abt, num_remaining in to_scan_generator: - try: - self._analyzer.analyze_recursively(candidate) - except PreemptedException: - logger.info('Another worker pre-empted us for layer: %s', candidate.id) - abt.set() - except APIRequestFailure: - logger.exception('Security scanner service unavailable') - return - - unscanned_images_gauge.Set(num_remaining) - - # If we reach this point, we analyzed every images up to max_id, next time the worker runs, - # we want to start from the next image. - self.min_id = max_id + 1 - + new_min_id = index_images(self.min_id, self._target_version, self._analyzer) + if new_min_id is not None: + self.min_id = new_min_id @property def min_id(self): diff --git a/workers/test/test_securityworker.py b/workers/test/test_securityworker.py new file mode 100644 index 000000000..de3927450 --- /dev/null +++ b/workers/test/test_securityworker.py @@ -0,0 +1,9 @@ +from mock import patch, Mock + +from test.fixtures import * +from workers.securityworker import index_images + +def test_securityworker_realdb(initialized_db): + mock_analyzer = Mock() + assert index_images(0, 1, mock_analyzer) is not None + mock_analyzer.analyze_recursively.assert_called() From 1ddb09ac1133b39124705abc116f0593d7c0f46d Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 28 Jun 2017 14:50:52 +0300 Subject: [PATCH 2/3] Change security worker to use data interface --- conf/init/service/securityworker/run | 2 +- workers/securityworker.py | 119 ------------------ workers/securityworker/__init__.py | 35 ++++++ workers/securityworker/models_interface.py | 28 +++++ workers/securityworker/models_pre_oci.py | 52 ++++++++ workers/securityworker/securityworker.py | 47 +++++++ .../test/test_securityworker.py | 2 +- 7 files changed, 164 insertions(+), 121 deletions(-) delete mode 100644 workers/securityworker.py create mode 100644 workers/securityworker/__init__.py create mode 100644 workers/securityworker/models_interface.py create mode 100644 workers/securityworker/models_pre_oci.py create mode 100644 workers/securityworker/securityworker.py rename workers/{ => securityworker}/test/test_securityworker.py (80%) diff --git a/conf/init/service/securityworker/run b/conf/init/service/securityworker/run index c40f9aa4b..22a47af96 100755 --- a/conf/init/service/securityworker/run +++ b/conf/init/service/securityworker/run @@ -3,6 +3,6 @@ echo 'Starting security scanner worker' cd / -venv/bin/python -m workers.securityworker 2>&1 +venv/bin/python -m workers.securityworker.securityworker 2>&1 echo 'Security scanner worker exited' diff --git a/workers/securityworker.py b/workers/securityworker.py deleted file mode 100644 index fb59754ac..000000000 --- a/workers/securityworker.py +++ /dev/null @@ -1,119 +0,0 @@ -import logging.config -import time - -from math import log10 - -import features - -from app import app, secscan_api, prometheus -from workers.worker import Worker -from data.database import UseThenDisconnect -from data.model.image import (get_images_eligible_for_scan, get_image_pk_field, - get_max_id_for_sec_scan, get_min_id_for_sec_scan) -from util.secscan.api import SecurityConfigValidator, APIRequestFailure -from util.secscan.analyzer import LayerAnalyzer, PreemptedException -from util.migrate.allocator import yield_random_entries -from util.log import logfile_path -from endpoints.v2 import v2_bp - - -DEFAULT_INDEXING_INTERVAL = 30 - - -logger = logging.getLogger(__name__) -unscanned_images_gauge = prometheus.create_gauge('unscanned_images', - 'Number of images that clair needs to scan.') -max_unscanned_images_gauge = prometheus.create_gauge('max_unscanned_image_id', - 'Max ID of the unscanned images.') - -def index_images(min_id, target_version, analyzer): - def batch_query(): - return get_images_eligible_for_scan(target_version) - - # Get the ID of the last image we can analyze. Will be None if there are no images in the - # database. - max_id = get_max_id_for_sec_scan() - if max_id is None: - return None - - if min_id is None or min_id > max_id: - logger.info('Could not find any available images for scanning.') - return None - - max_unscanned_images_gauge.Set(max_id) - - # 4^log10(total) gives us a scalable batch size into the billions. - batch_size = int(4 ** log10(max(10, max_id - min_id))) - - with UseThenDisconnect(app.config): - to_scan_generator = yield_random_entries( - batch_query, - get_image_pk_field(), - batch_size, - max_id, - min_id, - ) - for candidate, abt, num_remaining in to_scan_generator: - try: - analyzer.analyze_recursively(candidate) - except PreemptedException: - logger.info('Another worker pre-empted us for layer: %s', candidate.id) - abt.set() - except APIRequestFailure: - logger.exception('Security scanner service unavailable') - return - - unscanned_images_gauge.Set(num_remaining) - - # If we reach this point, we analyzed every images up to max_id, next time the worker runs, - # we want to start from the next image. - return max_id + 1 - -class SecurityWorker(Worker): - def __init__(self): - super(SecurityWorker, self).__init__() - validator = SecurityConfigValidator(app.config) - if not validator.valid(): - logger.warning('Failed to validate security scan configuration') - return - - self._target_version = app.config.get('SECURITY_SCANNER_ENGINE_VERSION_TARGET', 3) - self._analyzer = LayerAnalyzer(app.config, secscan_api) - self._min_id = None - - interval = app.config.get('SECURITY_SCANNER_INDEXING_INTERVAL', DEFAULT_INDEXING_INTERVAL) - self.add_operation(self._index_images, interval) - - def _index_images(self): - new_min_id = index_images(self.min_id, self._target_version, self._analyzer) - if new_min_id is not None: - self.min_id = new_min_id - - @property - def min_id(self): - """ If it hasn't already been determined, finds the ID of the first image to be analyzed. - First checks the config, then the database, and returns None if there are no images - available for scanning. - """ - if self._min_id is None: - self._min_id = app.config.get('SECURITY_SCANNER_INDEXING_MIN_ID') - if self._min_id is None: - self._min_id = get_min_id_for_sec_scan(self._target_version) - return self._min_id - - @min_id.setter - def min_id(self, new_min_id): - self._min_id = new_min_id - - -if __name__ == '__main__': - app.register_blueprint(v2_bp, url_prefix='/v2') - - if not features.SECURITY_SCANNER: - logger.debug('Security scanner disabled; skipping SecurityWorker') - while True: - time.sleep(100000) - - logging.config.fileConfig(logfile_path(debug=True), disable_existing_loggers=False) - worker = SecurityWorker() - worker.start() diff --git a/workers/securityworker/__init__.py b/workers/securityworker/__init__.py new file mode 100644 index 000000000..a101f6795 --- /dev/null +++ b/workers/securityworker/__init__.py @@ -0,0 +1,35 @@ +import logging.config + +from app import app, prometheus +from data.database import UseThenDisconnect +from workers.securityworker.models_pre_oci import pre_oci_model as model +from util.secscan.api import APIRequestFailure +from util.secscan.analyzer import PreemptedException + +logger = logging.getLogger(__name__) +unscanned_images_gauge = prometheus.create_gauge('unscanned_images', + 'Number of images that clair needs to scan.') + +def index_images(target_version, analyzer, token=None): + """ Performs security indexing of all images in the database not scanned at the target version. + If a token is provided, scanning will begin where the token indicates it previously completed. + """ + iterator, next_token = model.candidates_to_scan(target_version, start_token=token) + if iterator is None: + logger.debug('Found no additional images to scan') + return None + + with UseThenDisconnect(app.config): + for candidate, abt, num_remaining in iterator: + try: + analyzer.analyze_recursively(candidate) + except PreemptedException: + logger.info('Another worker pre-empted us for layer: %s', candidate.id) + abt.set() + except APIRequestFailure: + logger.exception('Security scanner service unavailable') + return + + unscanned_images_gauge.Set(num_remaining) + + return next_token diff --git a/workers/securityworker/models_interface.py b/workers/securityworker/models_interface.py new file mode 100644 index 000000000..6d872ca4e --- /dev/null +++ b/workers/securityworker/models_interface.py @@ -0,0 +1,28 @@ +from abc import ABCMeta, abstractmethod +from collections import namedtuple + +from six import add_metaclass + +class ScanToken(namedtuple('NextScanToken', ['min_id'])): + """ + ScanToken represents an opaque token that can be passed between runs of the security worker + to continue scanning whereever the previous run left off. Note that the data of the token is + *opaque* to the security worker, and the security worker should *not* pull any data out or modify + the token in any way. + """ + +@add_metaclass(ABCMeta) +class SecurityWorkerDataInterface(object): + """ + Interface that represents all data store interactions required by the security worker. + """ + + @abstractmethod + def candidates_to_scan(self, target_version, start_token=None): + """ + Returns a tuple consisting of an iterator of all the candidates to scan and a NextScanToken. + The iterator returns a tuple for each iteration consisting of the candidate Image, the abort + signal, and the number of remaining candidates. If the iterator returned is None, there are + no candidates to process. + """ + pass diff --git a/workers/securityworker/models_pre_oci.py b/workers/securityworker/models_pre_oci.py new file mode 100644 index 000000000..a665cac17 --- /dev/null +++ b/workers/securityworker/models_pre_oci.py @@ -0,0 +1,52 @@ +from math import log10 + +from app import app +from data.model.image import (get_images_eligible_for_scan, get_image_pk_field, + get_max_id_for_sec_scan, get_min_id_for_sec_scan) +from util.migrate.allocator import yield_random_entries + +from workers.securityworker.models_interface import ( + ScanToken, + SecurityWorkerDataInterface +) + +class PreOCIModel(SecurityWorkerDataInterface): + def candidates_to_scan(self, target_version, start_token=None): + def batch_query(): + return get_images_eligible_for_scan(target_version) + + # Find the minimum ID. + min_id = None + if start_token is not None: + min_id = start_token.min_id + else: + min_id = app.config.get('SECURITY_SCANNER_INDEXING_MIN_ID') + if min_id is None: + min_id = get_min_id_for_sec_scan(target_version) + + # Get the ID of the last image we can analyze. Will be None if there are no images in the + # database. + max_id = get_max_id_for_sec_scan() + if max_id is None: + return (None, None) + + if min_id is None or min_id > max_id: + return (None, None) + + # 4^log10(total) gives us a scalable batch size into the billions. + batch_size = int(4 ** log10(max(10, max_id - min_id))) + + # TODO: Once we have a clean shared NamedTuple for Images, send that to the secscan analyzer + # rather than the database Image itself. + iterator = yield_random_entries( + batch_query, + get_image_pk_field(), + batch_size, + max_id, + min_id, + ) + + return (iterator, ScanToken(max_id + 1)) + + +pre_oci_model = PreOCIModel() diff --git a/workers/securityworker/securityworker.py b/workers/securityworker/securityworker.py new file mode 100644 index 000000000..732631e3a --- /dev/null +++ b/workers/securityworker/securityworker.py @@ -0,0 +1,47 @@ +import logging.config +import time + +import features + +from app import app, secscan_api +from workers.worker import Worker +from workers.securityworker import index_images +from util.secscan.api import SecurityConfigValidator +from util.secscan.analyzer import LayerAnalyzer +from util.log import logfile_path +from endpoints.v2 import v2_bp + +logger = logging.getLogger(__name__) + +DEFAULT_INDEXING_INTERVAL = 30 + +class SecurityWorker(Worker): + def __init__(self): + super(SecurityWorker, self).__init__() + validator = SecurityConfigValidator(app.config) + if not validator.valid(): + logger.warning('Failed to validate security scan configuration') + return + + self._target_version = app.config.get('SECURITY_SCANNER_ENGINE_VERSION_TARGET', 3) + self._analyzer = LayerAnalyzer(app.config, secscan_api) + self._next_token = None + + interval = app.config.get('SECURITY_SCANNER_INDEXING_INTERVAL', DEFAULT_INDEXING_INTERVAL) + self.add_operation(self._index_images, interval) + + def _index_images(self): + self._next_token = index_images(self._target_version, self._analyzer, self._next_token) + + +if __name__ == '__main__': + app.register_blueprint(v2_bp, url_prefix='/v2') + + if not features.SECURITY_SCANNER: + logger.debug('Security scanner disabled; skipping SecurityWorker') + while True: + time.sleep(100000) + + logging.config.fileConfig(logfile_path(debug=True), disable_existing_loggers=False) + worker = SecurityWorker() + worker.start() diff --git a/workers/test/test_securityworker.py b/workers/securityworker/test/test_securityworker.py similarity index 80% rename from workers/test/test_securityworker.py rename to workers/securityworker/test/test_securityworker.py index de3927450..4b52f8def 100644 --- a/workers/test/test_securityworker.py +++ b/workers/securityworker/test/test_securityworker.py @@ -5,5 +5,5 @@ from workers.securityworker import index_images def test_securityworker_realdb(initialized_db): mock_analyzer = Mock() - assert index_images(0, 1, mock_analyzer) is not None + assert index_images(1, mock_analyzer) is not None mock_analyzer.analyze_recursively.assert_called() From 27ed3bedcca92419f580bf9909bf9b4e53627eae Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 29 Jun 2017 09:43:04 +0300 Subject: [PATCH 3/3] yapf --- workers/securityworker/__init__.py | 1 + workers/securityworker/models_interface.py | 2 ++ workers/securityworker/models_pre_oci.py | 11 ++++------- workers/securityworker/securityworker.py | 1 + workers/securityworker/test/test_securityworker.py | 1 + 5 files changed, 9 insertions(+), 7 deletions(-) diff --git a/workers/securityworker/__init__.py b/workers/securityworker/__init__.py index a101f6795..8c2bc44a7 100644 --- a/workers/securityworker/__init__.py +++ b/workers/securityworker/__init__.py @@ -10,6 +10,7 @@ logger = logging.getLogger(__name__) unscanned_images_gauge = prometheus.create_gauge('unscanned_images', 'Number of images that clair needs to scan.') + def index_images(target_version, analyzer, token=None): """ Performs security indexing of all images in the database not scanned at the target version. If a token is provided, scanning will begin where the token indicates it previously completed. diff --git a/workers/securityworker/models_interface.py b/workers/securityworker/models_interface.py index 6d872ca4e..76295a427 100644 --- a/workers/securityworker/models_interface.py +++ b/workers/securityworker/models_interface.py @@ -3,6 +3,7 @@ from collections import namedtuple from six import add_metaclass + class ScanToken(namedtuple('NextScanToken', ['min_id'])): """ ScanToken represents an opaque token that can be passed between runs of the security worker @@ -11,6 +12,7 @@ class ScanToken(namedtuple('NextScanToken', ['min_id'])): the token in any way. """ + @add_metaclass(ABCMeta) class SecurityWorkerDataInterface(object): """ diff --git a/workers/securityworker/models_pre_oci.py b/workers/securityworker/models_pre_oci.py index a665cac17..0115be908 100644 --- a/workers/securityworker/models_pre_oci.py +++ b/workers/securityworker/models_pre_oci.py @@ -5,10 +5,8 @@ from data.model.image import (get_images_eligible_for_scan, get_image_pk_field, get_max_id_for_sec_scan, get_min_id_for_sec_scan) from util.migrate.allocator import yield_random_entries -from workers.securityworker.models_interface import ( - ScanToken, - SecurityWorkerDataInterface -) +from workers.securityworker.models_interface import (ScanToken, SecurityWorkerDataInterface) + class PreOCIModel(SecurityWorkerDataInterface): def candidates_to_scan(self, target_version, start_token=None): @@ -34,7 +32,7 @@ class PreOCIModel(SecurityWorkerDataInterface): return (None, None) # 4^log10(total) gives us a scalable batch size into the billions. - batch_size = int(4 ** log10(max(10, max_id - min_id))) + batch_size = int(4**log10(max(10, max_id - min_id))) # TODO: Once we have a clean shared NamedTuple for Images, send that to the secscan analyzer # rather than the database Image itself. @@ -43,8 +41,7 @@ class PreOCIModel(SecurityWorkerDataInterface): get_image_pk_field(), batch_size, max_id, - min_id, - ) + min_id,) return (iterator, ScanToken(max_id + 1)) diff --git a/workers/securityworker/securityworker.py b/workers/securityworker/securityworker.py index 732631e3a..1038e7a04 100644 --- a/workers/securityworker/securityworker.py +++ b/workers/securityworker/securityworker.py @@ -15,6 +15,7 @@ logger = logging.getLogger(__name__) DEFAULT_INDEXING_INTERVAL = 30 + class SecurityWorker(Worker): def __init__(self): super(SecurityWorker, self).__init__() diff --git a/workers/securityworker/test/test_securityworker.py b/workers/securityworker/test/test_securityworker.py index 4b52f8def..dfa9ff490 100644 --- a/workers/securityworker/test/test_securityworker.py +++ b/workers/securityworker/test/test_securityworker.py @@ -3,6 +3,7 @@ from mock import patch, Mock from test.fixtures import * from workers.securityworker import index_images + def test_securityworker_realdb(initialized_db): mock_analyzer = Mock() assert index_images(1, mock_analyzer) is not None