import features import logging logger = logging.getLogger(__name__) class SecurityConfigValidator(object): """ Helper class for validating the security scanner configuration. """ def __init__(self, config, config_provider): self._config_provider = config_provider if not features.SECURITY_SCANNER: return self._security_config = config['SECURITY_SCANNER'] if self._security_config is None: return self._certificate = self._get_filepath('CA_CERTIFICATE_FILENAME') or False self._public_key = self._get_filepath('PUBLIC_KEY_FILENAME') self._private_key = self._get_filepath('PRIVATE_KEY_FILENAME') if self._public_key and self._private_key: self._keys = (self._public_key, self._private_key) else: self._keys = None def _get_filepath(self, key): config = self._security_config if key in config: with self._config_provider.get_volume_file(config[key]) as f: return f.name return None def cert(self): return self._certificate def keypair(self): return self._keys def valid(self): if not features.SECURITY_SCANNER: return False if not self._security_config: logger.debug('Missing SECURITY_SCANNER block in configuration') return False if not 'ENDPOINT' in self._security_config: logger.debug('Missing ENDPOINT field in SECURITY_SCANNER configuration') return False endpoint = self._security_config['ENDPOINT'] or '' if not endpoint.startswith('http://') and not endpoint.startswith('https://'): logger.debug('ENDPOINT field in SECURITY_SCANNER configuration must start with http or https') return False if endpoint.startswith('https://') and (self._certificate is False or self._keys is None): logger.debug('Certificate and key pair required for talking to security worker over HTTPS') return False return True