import features import logging import requests from data.database import CloseForLongOperation from urlparse import urljoin logger = logging.getLogger(__name__) # NOTE: This objects are used directly in the external-notification-data and vulnerability-service # on the frontend, so be careful with changing their existing keys. PRIORITY_LEVELS = { 'Unknown': { 'title': 'Unknown', 'index': '6', 'level': 'info', 'description': 'Unknown is either a security problem that has not been assigned ' + 'to a priority yet or a priority that our system did not recognize', 'banner_required': False }, 'Negligible': { 'title': 'Negligible', 'index': '5', 'level': 'info', 'description': 'Negligible is technically a security problem, but is only theoretical ' + 'in nature, requires a very special situation, has almost no install base, ' + 'or does no real damage.', 'banner_required': False }, 'Low': { 'title': 'Low', 'index': '4', 'level': 'warning', 'description': 'Low is a security problem, but is hard to exploit due to environment, ' + 'requires a user-assisted attack, a small install base, or does very ' + 'little damage.', 'banner_required': False }, 'Medium': { 'title': 'Medium', 'value': 'Medium', 'index': '3', 'level': 'warning', 'description': 'Medium is a real security problem, and is exploitable for many people. ' + 'Includes network daemon denial of service attacks, cross-site scripting, ' + 'and gaining user privileges.', 'banner_required': False }, 'High': { 'title': 'High', 'value': 'High', 'index': '2', 'level': 'warning', 'description': 'High is a real problem, exploitable for many people in a default installation. ' + 'Includes serious remote denial of services, local root privilege escalations, ' + 'or data loss.', 'banner_required': False }, 'Critical': { 'title': 'Critical', 'value': 'Critical', 'index': '1', 'level': 'error', 'description': 'Critical is a world-burning problem, exploitable for nearly all people in ' + 'a installation of the package. Includes remote root privilege escalations, ' + 'or massive data loss.', 'banner_required': True }, 'Defcon1': { 'title': 'Defcon 1', 'value': 'Defcon1', 'index': '0', 'level': 'error', 'description': 'Defcon1 is a Critical problem which has been manually highlighted ' + 'by the Quay team. It requires immediate attention.', 'banner_required': True } } def get_priority_for_index(index): for priority in PRIORITY_LEVELS: if PRIORITY_LEVELS[priority]['index'] == index: return priority return 'Unknown' class SecurityConfigValidator(object): def __init__(self, app, config_provider): self._config_provider = config_provider if not features.SECURITY_SCANNER: return self._security_config = app.config['SECURITY_SCANNER'] if self._security_config is None: return self._certificate = self._get_filepath('CA_CERTIFICATE_FILENAME') or False self._public_key = self._get_filepath('PUBLIC_KEY_FILENAME') self._private_key = self._get_filepath('PRIVATE_KEY_FILENAME') if self._public_key and self._private_key: self._keys = (self._public_key, self._private_key) else: self._keys = None def _get_filepath(self, key): config = self._security_config if key in config: with self._config_provider.get_volume_file(config[key]) as f: return f.name return None def cert(self): return self._certificate def keypair(self): return self._keys def valid(self): if not features.SECURITY_SCANNER: return False if not self._security_config: logger.debug('Missing SECURITY_SCANNER block in configuration') return False if not 'ENDPOINT' in self._security_config: logger.debug('Missing ENDPOINT field in SECURITY_SCANNER configuration') return False endpoint = self._security_config['ENDPOINT'] or '' if not endpoint.startswith('http://') and not endpoint.startswith('https://'): logger.debug('ENDPOINT field in SECURITY_SCANNER configuration must start with http or https') return False if endpoint.startswith('https://') and (self._certificate is False or self._keys is None): logger.debug('Certificate and key pair required for talking to security worker over HTTPS') return False return True class SecurityScannerAPI(object): """ Helper class for talking to the Security Scan service (Clair). """ def __init__(self, app, config_provider): self.app = app self.config_provider = config_provider self._security_config = None config_validator = SecurityConfigValidator(app, config_provider) if not config_validator.valid(): logger.warning('Invalid config provided to SecurityScannerAPI') return self._security_config = app.config.get('SECURITY_SCANNER') self._certificate = config_validator.cert() self._keys = config_validator.keypair() def check_layer_vulnerable(self, layer_id, cve_id): """ Checks with Clair whether the given layer is vulnerable to the given CVE. """ try: body = { 'LayersIDs': [layer_id] } response = self.call('vulnerabilities/%s/affected-layers', body, cve_id) except requests.exceptions.RequestException: logger.exception('Got exception when trying to call Clair endpoint') return False if response.status_code != 200: return False try: response_data = response.json() except ValueError: logger.exception('Got exception when trying to parse Clair response') return False if (not layer_id in response_data or not response_data[layer_id].get('Vulnerable', False)): return False return True def call(self, relative_url, body=None, *args, **kwargs): """ Issues an HTTP call to the sec API at the given relative URL. This function disconnects from the database while awaiting a response from the API server. """ security_config = self._security_config if security_config is None: raise Exception('Cannot call unconfigured security system') api_url = urljoin(security_config['ENDPOINT'], '/' + security_config['API_VERSION']) + '/' url = urljoin(api_url, relative_url % args) client = self.app.config['HTTPCLIENT'] timeout = security_config.get('API_TIMEOUT_SECONDS', 1) logger.debug('Looking up sec information: %s', url) with CloseForLongOperation(self.app.config): if body is not None: return client.post(url, json=body, params=kwargs, timeout=timeout, cert=self._keys, verify=self._certificate) else: return client.get(url, params=kwargs, timeout=timeout, cert=self._keys, verify=self._certificate)