303 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			303 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| import requests
 | |
| 
 | |
| from flask import url_for
 | |
| from urlparse import urljoin
 | |
| 
 | |
| from data.database import CloseForLongOperation
 | |
| from data import model
 | |
| from data.model.storage import get_storage_locations
 | |
| from util.secscan.validator import SecurityConfigValidator
 | |
| from util.security.instancekeys import InstanceKeys
 | |
| from util.security.registry_jwt import generate_bearer_token, build_context_and_subject
 | |
| from util import get_app_url
 | |
| 
 | |
| 
 | |
| TOKEN_VALIDITY_LIFETIME_S = 60  # Amount of time the security scanner has to call the layer URL
 | |
| 
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| class AnalyzeLayerException(Exception):
 | |
|   """ Exception raised when a layer fails to analyze due to a *client-side* issue. """
 | |
| 
 | |
| class APIRequestFailure(Exception):
 | |
|   """ Exception raised when there is a failure to conduct an API request. """
 | |
| 
 | |
| 
 | |
| _API_METHOD_INSERT = 'layers'
 | |
| _API_METHOD_GET_LAYER = 'layers/%s'
 | |
| _API_METHOD_MARK_NOTIFICATION_READ = 'notifications/%s'
 | |
| _API_METHOD_GET_NOTIFICATION = 'notifications/%s'
 | |
| _API_METHOD_PING = 'metrics'
 | |
| 
 | |
| 
 | |
| class SecurityScannerAPI(object):
 | |
|   """ Helper class for talking to the Security Scan service (Clair). """
 | |
|   def __init__(self, app, config, storage, client=None, skip_validation=False):
 | |
|     if not skip_validation:
 | |
|       config_validator = SecurityConfigValidator(config)
 | |
|       if not config_validator.valid():
 | |
|         logger.warning('Invalid config provided to SecurityScannerAPI')
 | |
|         return
 | |
| 
 | |
|     self._app = app
 | |
|     self._config = config
 | |
|     self._instance_keys = InstanceKeys(app)
 | |
|     self._client = client or config['HTTPCLIENT']
 | |
|     self._storage = storage
 | |
|     self._default_storage_locations = config['DISTRIBUTED_STORAGE_PREFERENCE']
 | |
|     self._target_version = config.get('SECURITY_SCANNER_ENGINE_VERSION_TARGET', 2)
 | |
| 
 | |
| 
 | |
|   def _get_image_url_and_auth(self, image):
 | |
|     """ Returns a tuple of the url and the auth header value that must be used
 | |
|         to fetch the layer data itself. If the image can't be addressed, we return
 | |
|         None.
 | |
|     """
 | |
|     path = model.storage.get_layer_path(image.storage)
 | |
|     locations = self._default_storage_locations
 | |
| 
 | |
|     if not self._storage.exists(locations, path):
 | |
|       locations = get_storage_locations(image.storage.uuid)
 | |
|       if not locations or not self._storage.exists(locations, path):
 | |
|         logger.warning('Could not find a valid location to download layer %s.%s out of %s',
 | |
|                        image.docker_image_id, image.storage.uuid, locations)
 | |
|         return None, None
 | |
| 
 | |
|     uri = self._storage.get_direct_download_url(locations, path)
 | |
|     auth_header = None
 | |
|     if uri is None:
 | |
|       # Use the registry API instead, with a signed JWT giving access
 | |
|       repo_name = image.repository.name
 | |
|       namespace_name = image.repository.namespace_user.username
 | |
|       repository_and_namespace = '/'.join([namespace_name, repo_name])
 | |
| 
 | |
|       # Generate the JWT which will authorize this
 | |
|       audience = self._app.config['SERVER_HOSTNAME']
 | |
|       context, subject = build_context_and_subject(None, None, None)
 | |
|       access = [{
 | |
|         'type': 'repository',
 | |
|         'name': repository_and_namespace,
 | |
|         'actions': ['pull'],
 | |
|       }]
 | |
| 
 | |
|       auth_token = generate_bearer_token(audience, subject, context, access,
 | |
|                                          TOKEN_VALIDITY_LIFETIME_S, self._instance_keys)
 | |
|       auth_header = 'Bearer ' + auth_token
 | |
| 
 | |
|       with self._app.test_request_context('/'):
 | |
|         relative_layer_url = url_for('v2.download_blob', repository=repository_and_namespace,
 | |
|                                      digest=image.storage.content_checksum)
 | |
|       uri = urljoin(get_app_url(self._config), relative_layer_url)
 | |
| 
 | |
|     return uri, auth_header
 | |
| 
 | |
| 
 | |
|   def _new_analyze_request(self, image):
 | |
|     """ Create the request body to submit the given image for analysis. If the image's URL cannot
 | |
|         be found, returns None.
 | |
|     """
 | |
|     url, auth_header = self._get_image_url_and_auth(image)
 | |
|     if url is None:
 | |
|       return None
 | |
| 
 | |
|     layer_request = {
 | |
|       'Name': '%s.%s' % (image.docker_image_id, image.storage.uuid),
 | |
|       'Path': url,
 | |
|       'Format': 'Docker',
 | |
|     }
 | |
| 
 | |
|     if auth_header is not None:
 | |
|       layer_request['Headers'] = {
 | |
|         'Authorization': auth_header,
 | |
|       }
 | |
| 
 | |
|     if image.parent.docker_image_id and image.parent.storage.uuid:
 | |
|       layer_request['ParentName'] = '%s.%s' % (image.parent.docker_image_id,
 | |
|                                                image.parent.storage.uuid)
 | |
| 
 | |
|     return {
 | |
|       'Layer': layer_request,
 | |
|     }
 | |
| 
 | |
| 
 | |
|   def ping(self):
 | |
|     """ Calls GET on the metrics endpoint of the security scanner to ensure it is running
 | |
|         and properly configured. Returns the HTTP response.
 | |
|     """
 | |
|     try:
 | |
|       return self._call('GET', _API_METHOD_PING)
 | |
|     except requests.exceptions.Timeout:
 | |
|       logger.exception('Timeout when trying to connect to security scanner endpoint')
 | |
|       raise Exception('Timeout when trying to connect to security scanner endpoint')
 | |
|     except requests.exceptions.ConnectionError:
 | |
|       logger.exception('Connection error when trying to connect to security scanner endpoint')
 | |
|       raise Exception('Connection error when trying to connect to security scanner endpoint')
 | |
|     except (requests.exceptions.RequestException, ValueError):
 | |
|       logger.exception('Exception when trying to connect to security scanner endpoint')
 | |
|       raise Exception('Exception when trying to connect to security scanner endpoint')
 | |
| 
 | |
| 
 | |
|   def analyze_layer(self, layer):
 | |
|     """ Posts the given layer to the security scanner for analysis, blocking until complete.
 | |
|         Returns a tuple containing the analysis version (on success, None on failure) and
 | |
|         whether the request should be retried.
 | |
|     """
 | |
|     request = self._new_analyze_request(layer)
 | |
|     if not request:
 | |
|       return None, False
 | |
| 
 | |
|     logger.info('Analyzing layer %s', request['Layer']['Name'])
 | |
|     try:
 | |
|       response = self._call('POST', _API_METHOD_INSERT, body=request)
 | |
|       json_response = response.json()
 | |
|     except requests.exceptions.Timeout:
 | |
|       logger.exception('Timeout when trying to post layer data response for %s', layer.id)
 | |
|       return None, True
 | |
|     except requests.exceptions.ConnectionError:
 | |
|       logger.exception('Connection error when trying to post layer data response for %s', layer.id)
 | |
|       return None, True
 | |
|     except (requests.exceptions.RequestException, ValueError):
 | |
|       logger.exception('Failed to post layer data response for %s', layer.id)
 | |
|       return None, False
 | |
| 
 | |
| 
 | |
|     # Handle any errors from the security scanner.
 | |
|     if response.status_code != 201:
 | |
|       message = json_response.get('Error').get('Message', '')
 | |
|       logger.warning('A warning event occurred when analyzing layer %s (status code %s): %s',
 | |
|                      request['Layer']['Name'], response.status_code, message)
 | |
| 
 | |
|       # 400 means the layer could not be analyzed due to a bad request.
 | |
|       if response.status_code == 400:
 | |
|         logger.error('Bad request when calling security scanner for layer %s: %s',
 | |
|                      response.status_code, json_response)
 | |
|         raise AnalyzeLayerException('Bad request to security scanner')
 | |
| 
 | |
|       # 422 means that the layer could not be analyzed:
 | |
|       # - the layer could not be extracted (manifest?)
 | |
|       # - the layer operating system / package manager is unsupported
 | |
|       return None, response.status_code != 422
 | |
| 
 | |
|     api_version = json_response['Layer']['IndexedByVersion']
 | |
|     return api_version, False
 | |
| 
 | |
| 
 | |
|   def check_layer_vulnerable(self, layer_id, cve_name):
 | |
|     """ Checks to see if the layer with the given ID is vulnerable to the specified CVE. """
 | |
|     layer_data = self._get_layer_data(layer_id, include_vulnerabilities=True)
 | |
|     if layer_data is None or 'Layer' not in layer_data or 'Features' not in layer_data['Layer']:
 | |
|       return False
 | |
| 
 | |
|     for feature in layer_data['Layer']['Features']:
 | |
|       for vuln in feature.get('Vulnerabilities', []):
 | |
|         if vuln['Name'] == cve_name:
 | |
|           return True
 | |
| 
 | |
|     return False
 | |
| 
 | |
| 
 | |
|   def get_notification(self, notification_name, layer_limit=100, page=None):
 | |
|     """ Gets the data for a specific notification, with optional page token.
 | |
|         Returns a tuple of the data (None on failure) and whether to retry.
 | |
|     """
 | |
|     try:
 | |
|       params = {
 | |
|         'limit': layer_limit
 | |
|       }
 | |
| 
 | |
|       if page is not None:
 | |
|         params['page'] = page
 | |
| 
 | |
|       response = self._call('GET', _API_METHOD_GET_NOTIFICATION % notification_name, params=params)
 | |
|       json_response = response.json()
 | |
|     except requests.exceptions.Timeout:
 | |
|       logger.exception('Timeout when trying to get notification for %s', notification_name)
 | |
|       return None, True
 | |
|     except requests.exceptions.ConnectionError:
 | |
|       logger.exception('Connection error when trying to get notification for %s', notification_name)
 | |
|       return None, True
 | |
|     except (requests.exceptions.RequestException, ValueError):
 | |
|       logger.exception('Failed to get notification for %s', notification_name)
 | |
|       return None, False
 | |
| 
 | |
|     if response.status_code != 200:
 | |
|       return None, response.status_code != 404 and response.status_code != 400
 | |
| 
 | |
|     return json_response, False
 | |
| 
 | |
| 
 | |
|   def mark_notification_read(self, notification_name):
 | |
|     """ Marks a security scanner notification as read. """
 | |
|     try:
 | |
|       response = self._call('DELETE', _API_METHOD_MARK_NOTIFICATION_READ % notification_name)
 | |
|       return response.status_code / 100 == 2
 | |
|     except requests.exceptions.RequestException:
 | |
|       logger.exception('Failed to mark notification as read: %s', notification_name)
 | |
|       return False
 | |
| 
 | |
| 
 | |
|   def get_layer_data(self, layer, include_features=False, include_vulnerabilities=False):
 | |
|     """ Returns the layer data for the specified layer. On error, returns None. """
 | |
|     layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
 | |
|     return self._get_layer_data(layer_id, include_features, include_vulnerabilities)
 | |
| 
 | |
| 
 | |
|   def _get_layer_data(self, layer_id, include_features=False, include_vulnerabilities=False):
 | |
|     try:
 | |
|       params = {}
 | |
|       if include_features:
 | |
|         params = {'features': True}
 | |
| 
 | |
|       if include_vulnerabilities:
 | |
|         params = {'vulnerabilities': True}
 | |
| 
 | |
|       response = self._call('GET', _API_METHOD_GET_LAYER % layer_id, params=params)
 | |
|       logger.debug('Got response %s for vulnerabilities for layer %s',
 | |
|                    response.status_code, layer_id)
 | |
|       json_response = response.json()
 | |
|     except requests.exceptions.Timeout:
 | |
|       raise APIRequestFailure('API call timed out')
 | |
|     except requests.exceptions.ConnectionError:
 | |
|       raise APIRequestFailure('Could not connect to security service')
 | |
|     except (requests.exceptions.RequestException, ValueError):
 | |
|       logger.exception('Failed to get layer data response for %s', layer_id)
 | |
|       raise APIRequestFailure()
 | |
| 
 | |
|     if response.status_code == 404:
 | |
|       return None
 | |
| 
 | |
|     return json_response
 | |
| 
 | |
| 
 | |
|   def _call(self, method, relative_url, params=None, body=None):
 | |
|     """ Issues an HTTP call to the sec API at the given relative URL.
 | |
|         This function disconnects from the database while awaiting a response
 | |
|         from the API server.
 | |
|     """
 | |
|     if self._config is None:
 | |
|       raise Exception('Cannot call unconfigured security system')
 | |
| 
 | |
|     client = self._client
 | |
|     headers = {'Connection': 'close'}
 | |
| 
 | |
|     timeout = self._config.get('SECURITY_SCANNER_API_TIMEOUT_SECONDS', 10)
 | |
|     endpoint = self._config['SECURITY_SCANNER_ENDPOINT']
 | |
|     if method != 'GET':
 | |
|       timeout = self._config.get('SECURITY_SCANNER_API_BATCH_TIMEOUT_SECONDS', timeout)
 | |
|       endpoint = self._config.get('SECURITY_SCANNER_ENDPOINT_BATCH') or endpoint
 | |
| 
 | |
|     api_url = urljoin(endpoint, '/' + self._config.get('SECURITY_SCANNER_API_VERSION', 'v1')) + '/'
 | |
|     url = urljoin(api_url, relative_url)
 | |
|     signer_proxy_url = self._config.get('JWTPROXY_SIGNER', 'localhost:8080')
 | |
| 
 | |
|     with CloseForLongOperation(self._config):
 | |
|       logger.debug('%sing security URL %s', method.upper(), url)
 | |
|       return client.request(method, url, json=body, params=params, timeout=timeout,
 | |
|                             verify='/conf/mitm.cert', headers=headers,
 | |
|                             proxies={
 | |
|                               'https': 'https://' + signer_proxy_url,
 | |
|                               'http': 'http://' + signer_proxy_url
 | |
|                             })
 |