Add a security scanner api config object for params

Change SecScanAPI to use a uri creation func instead of test context

Pass config provider through validator context

Remove app config dependency for validators
This commit is contained in:
Sam Chow 2018-05-29 13:50:51 -04:00
parent 554d4f47a8
commit 7df8ed4a60
47 changed files with 305 additions and 166 deletions

View file

@ -107,12 +107,3 @@ def get_priority_for_index(index):
return priority
return 'Unknown'
def create_url_from_app(app):
"""
Higher order function that returns a function that when called, will generate a url for that given app
:param app: Flask app
:return:
type: Flask -> (str -> url)
"""

View file

@ -7,16 +7,12 @@ from urlparse import urljoin
import requests
from flask import url_for
from data.database import CloseForLongOperation
from data import model
from data.model.storage import get_storage_locations
from util import get_app_url, slash_join
from util.abchelpers import nooper
from util.failover import failover, FailoverException
from util.secscan.validator import SecurityConfigValidator
from util.security.instancekeys import InstanceKeys, instance_keys_context_from_app_config
from util.security.registry_jwt import generate_bearer_token, build_context_and_subject
from _init import CONF_DIR
@ -70,16 +66,16 @@ def compute_layer_id(layer):
class SecurityScannerAPI(object):
""" Helper class for talking to the Security Scan service (usually Clair). """
def __init__(self, app, config, storage, client=None, skip_validation=False):
def __init__(self, config, storage, server_hostname=None, client=None, skip_validation=False, uri_creator=None, instance_keys=None):
feature_enabled = config.get('FEATURE_SECURITY_SCANNER', False)
has_valid_config = skip_validation
if not skip_validation and feature_enabled:
config_validator = SecurityConfigValidator(config)
config_validator = SecurityConfigValidator(feature_enabled, config.get('SECURITY_SCANNER_ENDPOINT'))
has_valid_config = config_validator.valid()
if feature_enabled and has_valid_config:
self.state = ImplementedSecurityScannerAPI(app, config, storage, client=client)
self.state = ImplementedSecurityScannerAPI(config, storage, server_hostname, client=client, uri_creator=uri_creator, instance_keys=instance_keys)
else:
self.state = NoopSecurityScannerAPI()
@ -150,20 +146,25 @@ class NoopSecurityScannerAPI(SecurityScannerAPIInterface):
class ImplementedSecurityScannerAPI(SecurityScannerAPIInterface):
""" Helper class for talking to the Security Scan service (Clair). """
def __init__(self, app_config, config, storage, client=None):
self._app_config = app_config
# TODO(sam) refactor this to not take an app config, and instead just the things it needs as a config object
def __init__(self, config, storage, server_hostname, client=None, uri_creator=None, instance_keys=None):
self._config = config
self._instance_keys = InstanceKeys(instance_keys_context_from_app_config(app_config))
self._client = client or config['HTTPCLIENT']
self._instance_keys = instance_keys
self._client = client
self._storage = storage
self._server_hostname = server_hostname
self._default_storage_locations = config['DISTRIBUTED_STORAGE_PREFERENCE']
self._target_version = config.get('SECURITY_SCANNER_ENGINE_VERSION_TARGET', 2)
self._uri_creator = uri_creator
def _get_image_url_and_auth(self, image):
""" Returns a tuple of the url and the auth header value that must be used
to fetch the layer data itself. If the image can't be addressed, we return
None.
"""
if self._instance_keys is None:
raise Exception('No Instance keys provided to Security Scanner API')
path = model.storage.get_layer_path(image.storage)
locations = self._default_storage_locations
@ -183,7 +184,7 @@ class ImplementedSecurityScannerAPI(SecurityScannerAPIInterface):
repository_and_namespace = '/'.join([namespace_name, repo_name])
# Generate the JWT which will authorize this
audience = self._app_config['SERVER_HOSTNAME']
audience = self._server_hostname
context, subject = build_context_and_subject()
access = [{
'type': 'repository',
@ -195,10 +196,7 @@ class ImplementedSecurityScannerAPI(SecurityScannerAPIInterface):
TOKEN_VALIDITY_LIFETIME_S, self._instance_keys)
auth_header = 'Bearer ' + auth_token
with self._app.test_request_context('/'):
relative_layer_url = url_for('v2.download_blob', repository=repository_and_namespace,
digest=image.storage.content_checksum)
uri = urljoin(get_app_url(self._config), relative_layer_url)
uri = self._uri_creator(repository_and_namespace, image.storage.content_checksum)
return uri, auth_header

View file

@ -1,28 +1,27 @@
import logging
import features
logger = logging.getLogger(__name__)
class SecurityConfigValidator(object):
""" Helper class for validating the security scanner configuration. """
def __init__(self, config):
if not features.SECURITY_SCANNER:
def __init__(self, feature_sec_scan, sec_scan_endpoint):
if not feature_sec_scan:
return
self._config = config
self._feature_sec_scan = feature_sec_scan
self._sec_scan_endpoint = sec_scan_endpoint
def valid(self):
if not features.SECURITY_SCANNER:
if not self._feature_sec_scan:
return False
if self._config.get('SECURITY_SCANNER_ENDPOINT') is None:
if self._sec_scan_endpoint is None:
logger.debug('Missing SECURITY_SCANNER_ENDPOINT configuration')
return False
endpoint = self._config.get('SECURITY_SCANNER_ENDPOINT')
endpoint = self._sec_scan_endpoint
if not endpoint.startswith('http://') and not endpoint.startswith('https://'):
logger.debug('SECURITY_SCANNER_ENDPOINT configuration must start with http or https')
return False