import logging import requests from flask import url_for from urlparse import urljoin from data.database import CloseForLongOperation from data import model from data.model.storage import get_storage_locations from util.secscan.validator import SecurityConfigValidator from util.security.instancekeys import InstanceKeys from util.security.registry_jwt import generate_bearer_token, build_context_and_subject from util import get_app_url TOKEN_VALIDITY_LIFETIME_S = 60 # Amount of time the security scanner has to call the layer URL logger = logging.getLogger(__name__) class AnalyzeLayerException(Exception): """ Exception raised when a layer fails to analyze due to a *client-side* issue. """ class APIRequestFailure(Exception): """ Exception raised when there is a failure to conduct an API request. """ _API_METHOD_INSERT = 'layers' _API_METHOD_GET_LAYER = 'layers/%s' _API_METHOD_DELETE_LAYER = 'layers/%s' _API_METHOD_MARK_NOTIFICATION_READ = 'notifications/%s' _API_METHOD_GET_NOTIFICATION = 'notifications/%s' _API_METHOD_PING = 'metrics' class SecurityScannerAPI(object): """ Helper class for talking to the Security Scan service (Clair). """ def __init__(self, app, config, storage, client=None, skip_validation=False): if not skip_validation: config_validator = SecurityConfigValidator(config) if not config_validator.valid(): logger.warning('Invalid config provided to SecurityScannerAPI') return self._app = app self._config = config self._instance_keys = InstanceKeys(app) self._client = client or config['HTTPCLIENT'] self._storage = storage self._default_storage_locations = config['DISTRIBUTED_STORAGE_PREFERENCE'] self._target_version = config.get('SECURITY_SCANNER_ENGINE_VERSION_TARGET', 2) def _get_image_url_and_auth(self, image): """ Returns a tuple of the url and the auth header value that must be used to fetch the layer data itself. If the image can't be addressed, we return None. """ path = model.storage.get_layer_path(image.storage) locations = self._default_storage_locations if not self._storage.exists(locations, path): locations = get_storage_locations(image.storage.uuid) if not locations or not self._storage.exists(locations, path): logger.warning('Could not find a valid location to download layer %s.%s out of %s', image.docker_image_id, image.storage.uuid, locations) return None, None uri = self._storage.get_direct_download_url(locations, path) auth_header = None if uri is None: # Use the registry API instead, with a signed JWT giving access repo_name = image.repository.name namespace_name = image.repository.namespace_user.username repository_and_namespace = '/'.join([namespace_name, repo_name]) # Generate the JWT which will authorize this audience = self._app.config['SERVER_HOSTNAME'] context, subject = build_context_and_subject(None, None, None) access = [{ 'type': 'repository', 'name': repository_and_namespace, 'actions': ['pull'], }] auth_token = generate_bearer_token(audience, subject, context, access, TOKEN_VALIDITY_LIFETIME_S, self._instance_keys) auth_header = 'Bearer ' + auth_token with self._app.test_request_context('/'): relative_layer_url = url_for('v2.download_blob', repository=repository_and_namespace, digest=image.storage.content_checksum) uri = urljoin(get_app_url(self._config), relative_layer_url) return uri, auth_header def _new_analyze_request(self, image): """ Create the request body to submit the given image for analysis. If the image's URL cannot be found, returns None. """ url, auth_header = self._get_image_url_and_auth(image) if url is None: return None layer_request = { 'Name': '%s.%s' % (image.docker_image_id, image.storage.uuid), 'Path': url, 'Format': 'Docker', } if auth_header is not None: layer_request['Headers'] = { 'Authorization': auth_header, } if image.parent.docker_image_id and image.parent.storage.uuid: layer_request['ParentName'] = '%s.%s' % (image.parent.docker_image_id, image.parent.storage.uuid) return { 'Layer': layer_request, } def delete_layers(self, layers): """ Deletes the given layers in the security scanner. """ if self._config is None: # Not configured. return for layer in layers: layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) self.delete_layer(layer_id) def delete_layer(self, layer_id): """ Calls DELETE on the layer with the given ID in the security scanner, removing it from its database. """ try: response = self._call('DELETE', _API_METHOD_DELETE_LAYER % layer_id) return response.status_code / 100 == 2 except requests.exceptions.RequestException: logger.exception('Failed to delete layer: %s', layer_id) return False def ping(self): """ Calls GET on the metrics endpoint of the security scanner to ensure it is running and properly configured. Returns the HTTP response. """ try: return self._call('GET', _API_METHOD_PING) except requests.exceptions.Timeout: logger.exception('Timeout when trying to connect to security scanner endpoint') raise Exception('Timeout when trying to connect to security scanner endpoint') except requests.exceptions.ConnectionError: logger.exception('Connection error when trying to connect to security scanner endpoint') raise Exception('Connection error when trying to connect to security scanner endpoint') except (requests.exceptions.RequestException, ValueError): logger.exception('Exception when trying to connect to security scanner endpoint') raise Exception('Exception when trying to connect to security scanner endpoint') def analyze_layer(self, layer): """ Posts the given layer to the security scanner for analysis, blocking until complete. Returns a tuple containing the analysis version (on success, None on failure) and whether the request should be retried. """ request = self._new_analyze_request(layer) if not request: return None, False logger.info('Analyzing layer %s', request['Layer']['Name']) try: response = self._call('POST', _API_METHOD_INSERT, body=request) json_response = response.json() except requests.exceptions.Timeout: logger.exception('Timeout when trying to post layer data response for %s', layer.id) return None, True except requests.exceptions.ConnectionError: logger.exception('Connection error when trying to post layer data response for %s', layer.id) return None, True except (requests.exceptions.RequestException, ValueError): logger.exception('Failed to post layer data response for %s', layer.id) return None, False # Handle any errors from the security scanner. if response.status_code != 201: message = json_response.get('Error').get('Message', '') logger.warning('A warning event occurred when analyzing layer %s (status code %s): %s', request['Layer']['Name'], response.status_code, message) # 400 means the layer could not be analyzed due to a bad request. if response.status_code == 400: logger.error('Bad request when calling security scanner for layer %s: %s', response.status_code, json_response) raise AnalyzeLayerException('Bad request to security scanner') # 422 means that the layer could not be analyzed: # - the layer could not be extracted (manifest?) # - the layer operating system / package manager is unsupported return None, response.status_code != 422 api_version = json_response['Layer']['IndexedByVersion'] return api_version, False def check_layer_vulnerable(self, layer_id, cve_name): """ Checks to see if the layer with the given ID is vulnerable to the specified CVE. """ layer_data = self._get_layer_data(layer_id, include_vulnerabilities=True) if layer_data is None or 'Layer' not in layer_data or 'Features' not in layer_data['Layer']: return False for feature in layer_data['Layer']['Features']: for vuln in feature.get('Vulnerabilities', []): if vuln['Name'] == cve_name: return True return False def get_notification(self, notification_name, layer_limit=100, page=None): """ Gets the data for a specific notification, with optional page token. Returns a tuple of the data (None on failure) and whether to retry. """ try: params = { 'limit': layer_limit } if page is not None: params['page'] = page response = self._call('GET', _API_METHOD_GET_NOTIFICATION % notification_name, params=params) json_response = response.json() except requests.exceptions.Timeout: logger.exception('Timeout when trying to get notification for %s', notification_name) return None, True except requests.exceptions.ConnectionError: logger.exception('Connection error when trying to get notification for %s', notification_name) return None, True except (requests.exceptions.RequestException, ValueError): logger.exception('Failed to get notification for %s', notification_name) return None, False if response.status_code != 200: return None, response.status_code != 404 and response.status_code != 400 return json_response, False def mark_notification_read(self, notification_name): """ Marks a security scanner notification as read. """ try: response = self._call('DELETE', _API_METHOD_MARK_NOTIFICATION_READ % notification_name) return response.status_code / 100 == 2 except requests.exceptions.RequestException: logger.exception('Failed to mark notification as read: %s', notification_name) return False def get_layer_data(self, layer, include_features=False, include_vulnerabilities=False): """ Returns the layer data for the specified layer. On error, returns None. """ layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid) return self._get_layer_data(layer_id, include_features, include_vulnerabilities) def _get_layer_data(self, layer_id, include_features=False, include_vulnerabilities=False): try: params = {} if include_features: params = {'features': True} if include_vulnerabilities: params = {'vulnerabilities': True} response = self._call('GET', _API_METHOD_GET_LAYER % layer_id, params=params) logger.debug('Got response %s for vulnerabilities for layer %s', response.status_code, layer_id) json_response = response.json() except requests.exceptions.Timeout: raise APIRequestFailure('API call timed out') except requests.exceptions.ConnectionError: raise APIRequestFailure('Could not connect to security service') except (requests.exceptions.RequestException, ValueError): logger.exception('Failed to get layer data response for %s', layer_id) raise APIRequestFailure() if response.status_code == 404: return None return json_response def _call(self, method, relative_url, params=None, body=None): """ Issues an HTTP call to the sec API at the given relative URL. This function disconnects from the database while awaiting a response from the API server. """ if self._config is None: raise Exception('Cannot call unconfigured security system') client = self._client headers = {'Connection': 'close'} timeout = self._config.get('SECURITY_SCANNER_API_TIMEOUT_SECONDS', 10) endpoint = self._config['SECURITY_SCANNER_ENDPOINT'] if method != 'GET': timeout = self._config.get('SECURITY_SCANNER_API_BATCH_TIMEOUT_SECONDS', timeout) endpoint = self._config.get('SECURITY_SCANNER_ENDPOINT_BATCH') or endpoint api_url = urljoin(endpoint, '/' + self._config.get('SECURITY_SCANNER_API_VERSION', 'v1')) + '/' url = urljoin(api_url, relative_url) signer_proxy_url = self._config.get('JWTPROXY_SIGNER', 'localhost:8080') with CloseForLongOperation(self._config): logger.debug('%sing security URL %s', method.upper(), url) return client.request(method, url, json=body, params=params, timeout=timeout, verify='/conf/mitm.cert', headers=headers, proxies={ 'https': 'https://' + signer_proxy_url, 'http': 'http://' + signer_proxy_url })