import features import logging import requests import json from urlparse import urljoin logger = logging.getLogger(__name__) class SecurityScanEndpoint(object): """ Helper class for talking to the Security Scan service (Clair). """ def __init__(self, app, config_provider): self.app = app self.config_provider = config_provider if not features.SECURITY_SCANNER: return self.security_config = app.config['SECURITY_SCANNER'] self.certificate = self._getfilepath('CA_CERTIFICATE_FILENAME') or False self.public_key = self._getfilepath('PUBLIC_KEY_FILENAME') self.private_key = self._getfilepath('PRIVATE_KEY_FILENAME') if self.public_key and self.private_key: self.keys = (self.public_key, self.private_key) else: self.keys = None def _getfilepath(self, config_key): security_config = self.security_config if config_key in security_config: with self.config_provider.get_volume_file(security_config[config_key]) as f: return f.name return None def call_api(self, relative_url, *args, **kwargs): """ Issues an HTTP call to the sec API at the given relative URL. """ security_config = self.security_config api_url = urljoin(security_config['ENDPOINT'], '/' + security_config['API_VERSION']) + '/' url = urljoin(api_url, relative_url % args) client = self.app.config['HTTPCLIENT'] timeout = security_config.get('API_TIMEOUT_SECONDS', 1) logger.debug('Looking up sec information: %s', url) return client.get(url, params=kwargs, timeout=timeout, cert=self.keys, verify=self.certificate)