Change security worker to use data interface
This commit is contained in:
parent
ec81148d73
commit
1ddb09ac11
7 changed files with 164 additions and 121 deletions
|
@ -3,6 +3,6 @@
|
||||||
echo 'Starting security scanner worker'
|
echo 'Starting security scanner worker'
|
||||||
|
|
||||||
cd /
|
cd /
|
||||||
venv/bin/python -m workers.securityworker 2>&1
|
venv/bin/python -m workers.securityworker.securityworker 2>&1
|
||||||
|
|
||||||
echo 'Security scanner worker exited'
|
echo 'Security scanner worker exited'
|
||||||
|
|
|
@ -1,119 +0,0 @@
|
||||||
import logging.config
|
|
||||||
import time
|
|
||||||
|
|
||||||
from math import log10
|
|
||||||
|
|
||||||
import features
|
|
||||||
|
|
||||||
from app import app, secscan_api, prometheus
|
|
||||||
from workers.worker import Worker
|
|
||||||
from data.database import UseThenDisconnect
|
|
||||||
from data.model.image import (get_images_eligible_for_scan, get_image_pk_field,
|
|
||||||
get_max_id_for_sec_scan, get_min_id_for_sec_scan)
|
|
||||||
from util.secscan.api import SecurityConfigValidator, APIRequestFailure
|
|
||||||
from util.secscan.analyzer import LayerAnalyzer, PreemptedException
|
|
||||||
from util.migrate.allocator import yield_random_entries
|
|
||||||
from util.log import logfile_path
|
|
||||||
from endpoints.v2 import v2_bp
|
|
||||||
|
|
||||||
|
|
||||||
DEFAULT_INDEXING_INTERVAL = 30
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
unscanned_images_gauge = prometheus.create_gauge('unscanned_images',
|
|
||||||
'Number of images that clair needs to scan.')
|
|
||||||
max_unscanned_images_gauge = prometheus.create_gauge('max_unscanned_image_id',
|
|
||||||
'Max ID of the unscanned images.')
|
|
||||||
|
|
||||||
def index_images(min_id, target_version, analyzer):
|
|
||||||
def batch_query():
|
|
||||||
return get_images_eligible_for_scan(target_version)
|
|
||||||
|
|
||||||
# Get the ID of the last image we can analyze. Will be None if there are no images in the
|
|
||||||
# database.
|
|
||||||
max_id = get_max_id_for_sec_scan()
|
|
||||||
if max_id is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
if min_id is None or min_id > max_id:
|
|
||||||
logger.info('Could not find any available images for scanning.')
|
|
||||||
return None
|
|
||||||
|
|
||||||
max_unscanned_images_gauge.Set(max_id)
|
|
||||||
|
|
||||||
# 4^log10(total) gives us a scalable batch size into the billions.
|
|
||||||
batch_size = int(4 ** log10(max(10, max_id - min_id)))
|
|
||||||
|
|
||||||
with UseThenDisconnect(app.config):
|
|
||||||
to_scan_generator = yield_random_entries(
|
|
||||||
batch_query,
|
|
||||||
get_image_pk_field(),
|
|
||||||
batch_size,
|
|
||||||
max_id,
|
|
||||||
min_id,
|
|
||||||
)
|
|
||||||
for candidate, abt, num_remaining in to_scan_generator:
|
|
||||||
try:
|
|
||||||
analyzer.analyze_recursively(candidate)
|
|
||||||
except PreemptedException:
|
|
||||||
logger.info('Another worker pre-empted us for layer: %s', candidate.id)
|
|
||||||
abt.set()
|
|
||||||
except APIRequestFailure:
|
|
||||||
logger.exception('Security scanner service unavailable')
|
|
||||||
return
|
|
||||||
|
|
||||||
unscanned_images_gauge.Set(num_remaining)
|
|
||||||
|
|
||||||
# If we reach this point, we analyzed every images up to max_id, next time the worker runs,
|
|
||||||
# we want to start from the next image.
|
|
||||||
return max_id + 1
|
|
||||||
|
|
||||||
class SecurityWorker(Worker):
|
|
||||||
def __init__(self):
|
|
||||||
super(SecurityWorker, self).__init__()
|
|
||||||
validator = SecurityConfigValidator(app.config)
|
|
||||||
if not validator.valid():
|
|
||||||
logger.warning('Failed to validate security scan configuration')
|
|
||||||
return
|
|
||||||
|
|
||||||
self._target_version = app.config.get('SECURITY_SCANNER_ENGINE_VERSION_TARGET', 3)
|
|
||||||
self._analyzer = LayerAnalyzer(app.config, secscan_api)
|
|
||||||
self._min_id = None
|
|
||||||
|
|
||||||
interval = app.config.get('SECURITY_SCANNER_INDEXING_INTERVAL', DEFAULT_INDEXING_INTERVAL)
|
|
||||||
self.add_operation(self._index_images, interval)
|
|
||||||
|
|
||||||
def _index_images(self):
|
|
||||||
new_min_id = index_images(self.min_id, self._target_version, self._analyzer)
|
|
||||||
if new_min_id is not None:
|
|
||||||
self.min_id = new_min_id
|
|
||||||
|
|
||||||
@property
|
|
||||||
def min_id(self):
|
|
||||||
""" If it hasn't already been determined, finds the ID of the first image to be analyzed.
|
|
||||||
First checks the config, then the database, and returns None if there are no images
|
|
||||||
available for scanning.
|
|
||||||
"""
|
|
||||||
if self._min_id is None:
|
|
||||||
self._min_id = app.config.get('SECURITY_SCANNER_INDEXING_MIN_ID')
|
|
||||||
if self._min_id is None:
|
|
||||||
self._min_id = get_min_id_for_sec_scan(self._target_version)
|
|
||||||
return self._min_id
|
|
||||||
|
|
||||||
@min_id.setter
|
|
||||||
def min_id(self, new_min_id):
|
|
||||||
self._min_id = new_min_id
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
app.register_blueprint(v2_bp, url_prefix='/v2')
|
|
||||||
|
|
||||||
if not features.SECURITY_SCANNER:
|
|
||||||
logger.debug('Security scanner disabled; skipping SecurityWorker')
|
|
||||||
while True:
|
|
||||||
time.sleep(100000)
|
|
||||||
|
|
||||||
logging.config.fileConfig(logfile_path(debug=True), disable_existing_loggers=False)
|
|
||||||
worker = SecurityWorker()
|
|
||||||
worker.start()
|
|
35
workers/securityworker/__init__.py
Normal file
35
workers/securityworker/__init__.py
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
import logging.config
|
||||||
|
|
||||||
|
from app import app, prometheus
|
||||||
|
from data.database import UseThenDisconnect
|
||||||
|
from workers.securityworker.models_pre_oci import pre_oci_model as model
|
||||||
|
from util.secscan.api import APIRequestFailure
|
||||||
|
from util.secscan.analyzer import PreemptedException
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
unscanned_images_gauge = prometheus.create_gauge('unscanned_images',
|
||||||
|
'Number of images that clair needs to scan.')
|
||||||
|
|
||||||
|
def index_images(target_version, analyzer, token=None):
|
||||||
|
""" Performs security indexing of all images in the database not scanned at the target version.
|
||||||
|
If a token is provided, scanning will begin where the token indicates it previously completed.
|
||||||
|
"""
|
||||||
|
iterator, next_token = model.candidates_to_scan(target_version, start_token=token)
|
||||||
|
if iterator is None:
|
||||||
|
logger.debug('Found no additional images to scan')
|
||||||
|
return None
|
||||||
|
|
||||||
|
with UseThenDisconnect(app.config):
|
||||||
|
for candidate, abt, num_remaining in iterator:
|
||||||
|
try:
|
||||||
|
analyzer.analyze_recursively(candidate)
|
||||||
|
except PreemptedException:
|
||||||
|
logger.info('Another worker pre-empted us for layer: %s', candidate.id)
|
||||||
|
abt.set()
|
||||||
|
except APIRequestFailure:
|
||||||
|
logger.exception('Security scanner service unavailable')
|
||||||
|
return
|
||||||
|
|
||||||
|
unscanned_images_gauge.Set(num_remaining)
|
||||||
|
|
||||||
|
return next_token
|
28
workers/securityworker/models_interface.py
Normal file
28
workers/securityworker/models_interface.py
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
from abc import ABCMeta, abstractmethod
|
||||||
|
from collections import namedtuple
|
||||||
|
|
||||||
|
from six import add_metaclass
|
||||||
|
|
||||||
|
class ScanToken(namedtuple('NextScanToken', ['min_id'])):
|
||||||
|
"""
|
||||||
|
ScanToken represents an opaque token that can be passed between runs of the security worker
|
||||||
|
to continue scanning whereever the previous run left off. Note that the data of the token is
|
||||||
|
*opaque* to the security worker, and the security worker should *not* pull any data out or modify
|
||||||
|
the token in any way.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@add_metaclass(ABCMeta)
|
||||||
|
class SecurityWorkerDataInterface(object):
|
||||||
|
"""
|
||||||
|
Interface that represents all data store interactions required by the security worker.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def candidates_to_scan(self, target_version, start_token=None):
|
||||||
|
"""
|
||||||
|
Returns a tuple consisting of an iterator of all the candidates to scan and a NextScanToken.
|
||||||
|
The iterator returns a tuple for each iteration consisting of the candidate Image, the abort
|
||||||
|
signal, and the number of remaining candidates. If the iterator returned is None, there are
|
||||||
|
no candidates to process.
|
||||||
|
"""
|
||||||
|
pass
|
52
workers/securityworker/models_pre_oci.py
Normal file
52
workers/securityworker/models_pre_oci.py
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
from math import log10
|
||||||
|
|
||||||
|
from app import app
|
||||||
|
from data.model.image import (get_images_eligible_for_scan, get_image_pk_field,
|
||||||
|
get_max_id_for_sec_scan, get_min_id_for_sec_scan)
|
||||||
|
from util.migrate.allocator import yield_random_entries
|
||||||
|
|
||||||
|
from workers.securityworker.models_interface import (
|
||||||
|
ScanToken,
|
||||||
|
SecurityWorkerDataInterface
|
||||||
|
)
|
||||||
|
|
||||||
|
class PreOCIModel(SecurityWorkerDataInterface):
|
||||||
|
def candidates_to_scan(self, target_version, start_token=None):
|
||||||
|
def batch_query():
|
||||||
|
return get_images_eligible_for_scan(target_version)
|
||||||
|
|
||||||
|
# Find the minimum ID.
|
||||||
|
min_id = None
|
||||||
|
if start_token is not None:
|
||||||
|
min_id = start_token.min_id
|
||||||
|
else:
|
||||||
|
min_id = app.config.get('SECURITY_SCANNER_INDEXING_MIN_ID')
|
||||||
|
if min_id is None:
|
||||||
|
min_id = get_min_id_for_sec_scan(target_version)
|
||||||
|
|
||||||
|
# Get the ID of the last image we can analyze. Will be None if there are no images in the
|
||||||
|
# database.
|
||||||
|
max_id = get_max_id_for_sec_scan()
|
||||||
|
if max_id is None:
|
||||||
|
return (None, None)
|
||||||
|
|
||||||
|
if min_id is None or min_id > max_id:
|
||||||
|
return (None, None)
|
||||||
|
|
||||||
|
# 4^log10(total) gives us a scalable batch size into the billions.
|
||||||
|
batch_size = int(4 ** log10(max(10, max_id - min_id)))
|
||||||
|
|
||||||
|
# TODO: Once we have a clean shared NamedTuple for Images, send that to the secscan analyzer
|
||||||
|
# rather than the database Image itself.
|
||||||
|
iterator = yield_random_entries(
|
||||||
|
batch_query,
|
||||||
|
get_image_pk_field(),
|
||||||
|
batch_size,
|
||||||
|
max_id,
|
||||||
|
min_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
return (iterator, ScanToken(max_id + 1))
|
||||||
|
|
||||||
|
|
||||||
|
pre_oci_model = PreOCIModel()
|
47
workers/securityworker/securityworker.py
Normal file
47
workers/securityworker/securityworker.py
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
import logging.config
|
||||||
|
import time
|
||||||
|
|
||||||
|
import features
|
||||||
|
|
||||||
|
from app import app, secscan_api
|
||||||
|
from workers.worker import Worker
|
||||||
|
from workers.securityworker import index_images
|
||||||
|
from util.secscan.api import SecurityConfigValidator
|
||||||
|
from util.secscan.analyzer import LayerAnalyzer
|
||||||
|
from util.log import logfile_path
|
||||||
|
from endpoints.v2 import v2_bp
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
DEFAULT_INDEXING_INTERVAL = 30
|
||||||
|
|
||||||
|
class SecurityWorker(Worker):
|
||||||
|
def __init__(self):
|
||||||
|
super(SecurityWorker, self).__init__()
|
||||||
|
validator = SecurityConfigValidator(app.config)
|
||||||
|
if not validator.valid():
|
||||||
|
logger.warning('Failed to validate security scan configuration')
|
||||||
|
return
|
||||||
|
|
||||||
|
self._target_version = app.config.get('SECURITY_SCANNER_ENGINE_VERSION_TARGET', 3)
|
||||||
|
self._analyzer = LayerAnalyzer(app.config, secscan_api)
|
||||||
|
self._next_token = None
|
||||||
|
|
||||||
|
interval = app.config.get('SECURITY_SCANNER_INDEXING_INTERVAL', DEFAULT_INDEXING_INTERVAL)
|
||||||
|
self.add_operation(self._index_images, interval)
|
||||||
|
|
||||||
|
def _index_images(self):
|
||||||
|
self._next_token = index_images(self._target_version, self._analyzer, self._next_token)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
app.register_blueprint(v2_bp, url_prefix='/v2')
|
||||||
|
|
||||||
|
if not features.SECURITY_SCANNER:
|
||||||
|
logger.debug('Security scanner disabled; skipping SecurityWorker')
|
||||||
|
while True:
|
||||||
|
time.sleep(100000)
|
||||||
|
|
||||||
|
logging.config.fileConfig(logfile_path(debug=True), disable_existing_loggers=False)
|
||||||
|
worker = SecurityWorker()
|
||||||
|
worker.start()
|
|
@ -5,5 +5,5 @@ from workers.securityworker import index_images
|
||||||
|
|
||||||
def test_securityworker_realdb(initialized_db):
|
def test_securityworker_realdb(initialized_db):
|
||||||
mock_analyzer = Mock()
|
mock_analyzer = Mock()
|
||||||
assert index_images(0, 1, mock_analyzer) is not None
|
assert index_images(1, mock_analyzer) is not None
|
||||||
mock_analyzer.analyze_recursively.assert_called()
|
mock_analyzer.analyze_recursively.assert_called()
|
Reference in a new issue