Merge pull request #2742 from coreos-inc/joseph.schorr/QUAY-665/securityworker-interface
Data interface for securityworker
This commit is contained in:
commit
04114cf105
7 changed files with 174 additions and 116 deletions
|
@ -3,6 +3,6 @@
|
|||
echo 'Starting security scanner worker'
|
||||
|
||||
cd /
|
||||
venv/bin/python -m workers.securityworker 2>&1
|
||||
venv/bin/python -m workers.securityworker.securityworker 2>&1
|
||||
|
||||
echo 'Security scanner worker exited'
|
||||
|
|
|
@ -1,115 +0,0 @@
|
|||
import logging.config
|
||||
import time
|
||||
|
||||
from math import log10
|
||||
|
||||
import features
|
||||
|
||||
from app import app, secscan_api, prometheus
|
||||
from workers.worker import Worker
|
||||
from data.database import UseThenDisconnect
|
||||
from data.model.image import (get_images_eligible_for_scan, get_image_pk_field,
|
||||
get_max_id_for_sec_scan, get_min_id_for_sec_scan)
|
||||
from util.secscan.api import SecurityConfigValidator, APIRequestFailure
|
||||
from util.secscan.analyzer import LayerAnalyzer, PreemptedException
|
||||
from util.migrate.allocator import yield_random_entries
|
||||
from util.log import logfile_path
|
||||
from endpoints.v2 import v2_bp
|
||||
|
||||
|
||||
DEFAULT_INDEXING_INTERVAL = 30
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
unscanned_images_gauge = prometheus.create_gauge('unscanned_images',
|
||||
'Number of images that clair needs to scan.')
|
||||
max_unscanned_images_gauge = prometheus.create_gauge('max_unscanned_image_id',
|
||||
'Max ID of the unscanned images.')
|
||||
|
||||
class SecurityWorker(Worker):
|
||||
def __init__(self):
|
||||
super(SecurityWorker, self).__init__()
|
||||
validator = SecurityConfigValidator(app.config)
|
||||
if not validator.valid():
|
||||
logger.warning('Failed to validate security scan configuration')
|
||||
return
|
||||
|
||||
self._target_version = app.config.get('SECURITY_SCANNER_ENGINE_VERSION_TARGET', 3)
|
||||
self._analyzer = LayerAnalyzer(app.config, secscan_api)
|
||||
self._min_id = None
|
||||
|
||||
interval = app.config.get('SECURITY_SCANNER_INDEXING_INTERVAL', DEFAULT_INDEXING_INTERVAL)
|
||||
self.add_operation(self._index_images, interval)
|
||||
|
||||
def _index_images(self):
|
||||
def batch_query():
|
||||
return get_images_eligible_for_scan(self._target_version)
|
||||
|
||||
# Get the ID of the last image we can analyze. Will be None if there are no images in the
|
||||
# database.
|
||||
max_id = get_max_id_for_sec_scan()
|
||||
if max_id is None:
|
||||
return
|
||||
|
||||
if self.min_id is None or self.min_id > max_id:
|
||||
logger.info('Could not find any available images for scanning.')
|
||||
return
|
||||
|
||||
max_unscanned_images_gauge.Set(max_id)
|
||||
|
||||
# 4^log10(total) gives us a scalable batch size into the billions.
|
||||
batch_size = int(4 ** log10(max(10, max_id - self.min_id)))
|
||||
|
||||
with UseThenDisconnect(app.config):
|
||||
to_scan_generator = yield_random_entries(
|
||||
batch_query,
|
||||
get_image_pk_field(),
|
||||
batch_size,
|
||||
max_id,
|
||||
self.min_id,
|
||||
)
|
||||
for candidate, abt, num_remaining in to_scan_generator:
|
||||
try:
|
||||
self._analyzer.analyze_recursively(candidate)
|
||||
except PreemptedException:
|
||||
logger.info('Another worker pre-empted us for layer: %s', candidate.id)
|
||||
abt.set()
|
||||
except APIRequestFailure:
|
||||
logger.exception('Security scanner service unavailable')
|
||||
return
|
||||
|
||||
unscanned_images_gauge.Set(num_remaining)
|
||||
|
||||
# If we reach this point, we analyzed every images up to max_id, next time the worker runs,
|
||||
# we want to start from the next image.
|
||||
self.min_id = max_id + 1
|
||||
|
||||
|
||||
@property
|
||||
def min_id(self):
|
||||
""" If it hasn't already been determined, finds the ID of the first image to be analyzed.
|
||||
First checks the config, then the database, and returns None if there are no images
|
||||
available for scanning.
|
||||
"""
|
||||
if self._min_id is None:
|
||||
self._min_id = app.config.get('SECURITY_SCANNER_INDEXING_MIN_ID')
|
||||
if self._min_id is None:
|
||||
self._min_id = get_min_id_for_sec_scan(self._target_version)
|
||||
return self._min_id
|
||||
|
||||
@min_id.setter
|
||||
def min_id(self, new_min_id):
|
||||
self._min_id = new_min_id
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.register_blueprint(v2_bp, url_prefix='/v2')
|
||||
|
||||
if not features.SECURITY_SCANNER:
|
||||
logger.debug('Security scanner disabled; skipping SecurityWorker')
|
||||
while True:
|
||||
time.sleep(100000)
|
||||
|
||||
logging.config.fileConfig(logfile_path(debug=True), disable_existing_loggers=False)
|
||||
worker = SecurityWorker()
|
||||
worker.start()
|
36
workers/securityworker/__init__.py
Normal file
36
workers/securityworker/__init__.py
Normal file
|
@ -0,0 +1,36 @@
|
|||
import logging.config
|
||||
|
||||
from app import app, prometheus
|
||||
from data.database import UseThenDisconnect
|
||||
from workers.securityworker.models_pre_oci import pre_oci_model as model
|
||||
from util.secscan.api import APIRequestFailure
|
||||
from util.secscan.analyzer import PreemptedException
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
unscanned_images_gauge = prometheus.create_gauge('unscanned_images',
|
||||
'Number of images that clair needs to scan.')
|
||||
|
||||
|
||||
def index_images(target_version, analyzer, token=None):
|
||||
""" Performs security indexing of all images in the database not scanned at the target version.
|
||||
If a token is provided, scanning will begin where the token indicates it previously completed.
|
||||
"""
|
||||
iterator, next_token = model.candidates_to_scan(target_version, start_token=token)
|
||||
if iterator is None:
|
||||
logger.debug('Found no additional images to scan')
|
||||
return None
|
||||
|
||||
with UseThenDisconnect(app.config):
|
||||
for candidate, abt, num_remaining in iterator:
|
||||
try:
|
||||
analyzer.analyze_recursively(candidate)
|
||||
except PreemptedException:
|
||||
logger.info('Another worker pre-empted us for layer: %s', candidate.id)
|
||||
abt.set()
|
||||
except APIRequestFailure:
|
||||
logger.exception('Security scanner service unavailable')
|
||||
return
|
||||
|
||||
unscanned_images_gauge.Set(num_remaining)
|
||||
|
||||
return next_token
|
30
workers/securityworker/models_interface.py
Normal file
30
workers/securityworker/models_interface.py
Normal file
|
@ -0,0 +1,30 @@
|
|||
from abc import ABCMeta, abstractmethod
|
||||
from collections import namedtuple
|
||||
|
||||
from six import add_metaclass
|
||||
|
||||
|
||||
class ScanToken(namedtuple('NextScanToken', ['min_id'])):
|
||||
"""
|
||||
ScanToken represents an opaque token that can be passed between runs of the security worker
|
||||
to continue scanning whereever the previous run left off. Note that the data of the token is
|
||||
*opaque* to the security worker, and the security worker should *not* pull any data out or modify
|
||||
the token in any way.
|
||||
"""
|
||||
|
||||
|
||||
@add_metaclass(ABCMeta)
|
||||
class SecurityWorkerDataInterface(object):
|
||||
"""
|
||||
Interface that represents all data store interactions required by the security worker.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def candidates_to_scan(self, target_version, start_token=None):
|
||||
"""
|
||||
Returns a tuple consisting of an iterator of all the candidates to scan and a NextScanToken.
|
||||
The iterator returns a tuple for each iteration consisting of the candidate Image, the abort
|
||||
signal, and the number of remaining candidates. If the iterator returned is None, there are
|
||||
no candidates to process.
|
||||
"""
|
||||
pass
|
49
workers/securityworker/models_pre_oci.py
Normal file
49
workers/securityworker/models_pre_oci.py
Normal file
|
@ -0,0 +1,49 @@
|
|||
from math import log10
|
||||
|
||||
from app import app
|
||||
from data.model.image import (get_images_eligible_for_scan, get_image_pk_field,
|
||||
get_max_id_for_sec_scan, get_min_id_for_sec_scan)
|
||||
from util.migrate.allocator import yield_random_entries
|
||||
|
||||
from workers.securityworker.models_interface import (ScanToken, SecurityWorkerDataInterface)
|
||||
|
||||
|
||||
class PreOCIModel(SecurityWorkerDataInterface):
|
||||
def candidates_to_scan(self, target_version, start_token=None):
|
||||
def batch_query():
|
||||
return get_images_eligible_for_scan(target_version)
|
||||
|
||||
# Find the minimum ID.
|
||||
min_id = None
|
||||
if start_token is not None:
|
||||
min_id = start_token.min_id
|
||||
else:
|
||||
min_id = app.config.get('SECURITY_SCANNER_INDEXING_MIN_ID')
|
||||
if min_id is None:
|
||||
min_id = get_min_id_for_sec_scan(target_version)
|
||||
|
||||
# Get the ID of the last image we can analyze. Will be None if there are no images in the
|
||||
# database.
|
||||
max_id = get_max_id_for_sec_scan()
|
||||
if max_id is None:
|
||||
return (None, None)
|
||||
|
||||
if min_id is None or min_id > max_id:
|
||||
return (None, None)
|
||||
|
||||
# 4^log10(total) gives us a scalable batch size into the billions.
|
||||
batch_size = int(4**log10(max(10, max_id - min_id)))
|
||||
|
||||
# TODO: Once we have a clean shared NamedTuple for Images, send that to the secscan analyzer
|
||||
# rather than the database Image itself.
|
||||
iterator = yield_random_entries(
|
||||
batch_query,
|
||||
get_image_pk_field(),
|
||||
batch_size,
|
||||
max_id,
|
||||
min_id,)
|
||||
|
||||
return (iterator, ScanToken(max_id + 1))
|
||||
|
||||
|
||||
pre_oci_model = PreOCIModel()
|
48
workers/securityworker/securityworker.py
Normal file
48
workers/securityworker/securityworker.py
Normal file
|
@ -0,0 +1,48 @@
|
|||
import logging.config
|
||||
import time
|
||||
|
||||
import features
|
||||
|
||||
from app import app, secscan_api
|
||||
from workers.worker import Worker
|
||||
from workers.securityworker import index_images
|
||||
from util.secscan.api import SecurityConfigValidator
|
||||
from util.secscan.analyzer import LayerAnalyzer
|
||||
from util.log import logfile_path
|
||||
from endpoints.v2 import v2_bp
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_INDEXING_INTERVAL = 30
|
||||
|
||||
|
||||
class SecurityWorker(Worker):
|
||||
def __init__(self):
|
||||
super(SecurityWorker, self).__init__()
|
||||
validator = SecurityConfigValidator(app.config)
|
||||
if not validator.valid():
|
||||
logger.warning('Failed to validate security scan configuration')
|
||||
return
|
||||
|
||||
self._target_version = app.config.get('SECURITY_SCANNER_ENGINE_VERSION_TARGET', 3)
|
||||
self._analyzer = LayerAnalyzer(app.config, secscan_api)
|
||||
self._next_token = None
|
||||
|
||||
interval = app.config.get('SECURITY_SCANNER_INDEXING_INTERVAL', DEFAULT_INDEXING_INTERVAL)
|
||||
self.add_operation(self._index_images, interval)
|
||||
|
||||
def _index_images(self):
|
||||
self._next_token = index_images(self._target_version, self._analyzer, self._next_token)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.register_blueprint(v2_bp, url_prefix='/v2')
|
||||
|
||||
if not features.SECURITY_SCANNER:
|
||||
logger.debug('Security scanner disabled; skipping SecurityWorker')
|
||||
while True:
|
||||
time.sleep(100000)
|
||||
|
||||
logging.config.fileConfig(logfile_path(debug=True), disable_existing_loggers=False)
|
||||
worker = SecurityWorker()
|
||||
worker.start()
|
10
workers/securityworker/test/test_securityworker.py
Normal file
10
workers/securityworker/test/test_securityworker.py
Normal file
|
@ -0,0 +1,10 @@
|
|||
from mock import patch, Mock
|
||||
|
||||
from test.fixtures import *
|
||||
from workers.securityworker import index_images
|
||||
|
||||
|
||||
def test_securityworker_realdb(initialized_db):
|
||||
mock_analyzer = Mock()
|
||||
assert index_images(1, mock_analyzer) is not None
|
||||
mock_analyzer.analyze_recursively.assert_called()
|
Reference in a new issue