import features import logging import requests from app import app from database import CloseForLongOperation from urlparse import urljoin logger = logging.getLogger(__name__) class SecurityScannerAPI(object): """ Helper class for talking to the Security Scan service (Clair). """ def __init__(self, app, config_provider): self.app = app self.config_provider = config_provider if not features.SECURITY_SCANNER: return self.security_config = app.config['SECURITY_SCANNER'] self.certificate = self._getfilepath('CA_CERTIFICATE_FILENAME') or False self.public_key = self._getfilepath('PUBLIC_KEY_FILENAME') self.private_key = self._getfilepath('PRIVATE_KEY_FILENAME') if self.public_key and self.private_key: self.keys = (self.public_key, self.private_key) else: self.keys = None def _getfilepath(self, config_key): security_config = self.security_config if config_key in security_config: with self.config_provider.get_volume_file(security_config[config_key]) as f: return f.name return None def check_layer_vulnerable(self, layer_id, cve_id): """ Checks with Clair whether the given layer is vulnerable to the given CVE. """ try: body = { 'LayersIDs': [layer_id] } response = self.call('vulnerabilities/%s/affected-layers', body, cve_id) except requests.exceptions.RequestException: logger.exception('Got exception when trying to call Clair endpoint') return False if response.status_code != 200: return False try: response_data = response.json() except ValueError: logger.exception('Got exception when trying to parse Clair response') return False if (not layer_id in response_data or not response_data[layer_id].get('Vulnerable', False)): return False return True def call(self, relative_url, body=None, *args, **kwargs): """ Issues an HTTP call to the sec API at the given relative URL. This function disconnects from the database while awaiting a response from the API server. """ security_config = self.security_config api_url = urljoin(security_config['ENDPOINT'], '/' + security_config['API_VERSION']) + '/' url = urljoin(api_url, relative_url % args) client = self.app.config['HTTPCLIENT'] timeout = security_config.get('API_TIMEOUT_SECONDS', 1) logger.debug('Looking up sec information: %s', url) with CloseForLongOperation(app.config): if body is not None: return client.post(url, json=body, params=kwargs, timeout=timeout, cert=self.keys, verify=self.certificate) else: return client.get(url, params=kwargs, timeout=timeout, cert=self.keys, verify=self.certificate)