This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/util/secscan/api.py
Joseph Schorr 405eca074c Security scanner flow changes and auto-retry
Changes the security scanner code to raise exceptions now for non-successful operations. One of the new exceptions raised is MissingParentLayerException, which, when raised, will cause the security worker to perform a full rescan of all parent images for the current layer, before trying once more to scan the current layer. This should allow the system to be "self-healing" in the case where the security scanner engine somehow loses or corrupts a parent layer.
2016-12-16 15:38:09 -05:00

320 lines
13 KiB
Python

import logging
import requests
from flask import url_for
from urlparse import urljoin
from data.database import CloseForLongOperation
from data import model
from data.model.storage import get_storage_locations
from util.secscan.validator import SecurityConfigValidator
from util.security.instancekeys import InstanceKeys
from util.security.registry_jwt import generate_bearer_token, build_context_and_subject
from util import get_app_url
TOKEN_VALIDITY_LIFETIME_S = 60 # Amount of time the security scanner has to call the layer URL
UNKNOWN_PARENT_LAYER_ERROR_MSG = 'worker: parent layer is unknown, it must be processed first'
logger = logging.getLogger(__name__)
class AnalyzeLayerException(Exception):
""" Exception raised when a layer fails to analyze due to a request issue. """
class AnalyzeLayerRetryException(Exception):
""" Exception raised when a layer fails to analyze due to a request issue, and the request should
be retried.
"""
class MissingParentLayerException(AnalyzeLayerException):
""" Exception raised when the parent of the layer is missing from the security scanner. """
class InvalidLayerException(AnalyzeLayerException):
""" Exception raised when the layer itself cannot be handled by the security scanner. """
class APIRequestFailure(Exception):
""" Exception raised when there is a failure to conduct an API request. """
_API_METHOD_INSERT = 'layers'
_API_METHOD_GET_LAYER = 'layers/%s'
_API_METHOD_MARK_NOTIFICATION_READ = 'notifications/%s'
_API_METHOD_GET_NOTIFICATION = 'notifications/%s'
_API_METHOD_PING = 'metrics'
class SecurityScannerAPI(object):
""" Helper class for talking to the Security Scan service (Clair). """
def __init__(self, app, config, storage, client=None, skip_validation=False):
if not skip_validation:
config_validator = SecurityConfigValidator(config)
if not config_validator.valid():
logger.warning('Invalid config provided to SecurityScannerAPI')
return
self._app = app
self._config = config
self._instance_keys = InstanceKeys(app)
self._client = client or config['HTTPCLIENT']
self._storage = storage
self._default_storage_locations = config['DISTRIBUTED_STORAGE_PREFERENCE']
self._target_version = config.get('SECURITY_SCANNER_ENGINE_VERSION_TARGET', 2)
def _get_image_url_and_auth(self, image):
""" Returns a tuple of the url and the auth header value that must be used
to fetch the layer data itself. If the image can't be addressed, we return
None.
"""
path = model.storage.get_layer_path(image.storage)
locations = self._default_storage_locations
if not self._storage.exists(locations, path):
locations = get_storage_locations(image.storage.uuid)
if not locations or not self._storage.exists(locations, path):
logger.warning('Could not find a valid location to download layer %s.%s out of %s',
image.docker_image_id, image.storage.uuid, locations)
return None, None
uri = self._storage.get_direct_download_url(locations, path)
auth_header = None
if uri is None:
# Use the registry API instead, with a signed JWT giving access
repo_name = image.repository.name
namespace_name = image.repository.namespace_user.username
repository_and_namespace = '/'.join([namespace_name, repo_name])
# Generate the JWT which will authorize this
audience = self._app.config['SERVER_HOSTNAME']
context, subject = build_context_and_subject(None, None, None)
access = [{
'type': 'repository',
'name': repository_and_namespace,
'actions': ['pull'],
}]
auth_token = generate_bearer_token(audience, subject, context, access,
TOKEN_VALIDITY_LIFETIME_S, self._instance_keys)
auth_header = 'Bearer ' + auth_token
with self._app.test_request_context('/'):
relative_layer_url = url_for('v2.download_blob', repository=repository_and_namespace,
digest=image.storage.content_checksum)
uri = urljoin(get_app_url(self._config), relative_layer_url)
return uri, auth_header
def _new_analyze_request(self, image):
""" Create the request body to submit the given image for analysis. If the image's URL cannot
be found, returns None.
"""
url, auth_header = self._get_image_url_and_auth(image)
if url is None:
return None
layer_request = {
'Name': '%s.%s' % (image.docker_image_id, image.storage.uuid),
'Path': url,
'Format': 'Docker',
}
if auth_header is not None:
layer_request['Headers'] = {
'Authorization': auth_header,
}
if image.parent.docker_image_id and image.parent.storage.uuid:
layer_request['ParentName'] = '%s.%s' % (image.parent.docker_image_id,
image.parent.storage.uuid)
return {
'Layer': layer_request,
}
def ping(self):
""" Calls GET on the metrics endpoint of the security scanner to ensure it is running
and properly configured. Returns the HTTP response.
"""
try:
return self._call('GET', _API_METHOD_PING)
except requests.exceptions.Timeout:
logger.exception('Timeout when trying to connect to security scanner endpoint')
raise Exception('Timeout when trying to connect to security scanner endpoint')
except requests.exceptions.ConnectionError:
logger.exception('Connection error when trying to connect to security scanner endpoint')
raise Exception('Connection error when trying to connect to security scanner endpoint')
except (requests.exceptions.RequestException, ValueError):
logger.exception('Exception when trying to connect to security scanner endpoint')
raise Exception('Exception when trying to connect to security scanner endpoint')
def analyze_layer(self, layer):
""" Posts the given layer to the security scanner for analysis, blocking until complete.
Returns the analysis version on success or raises an exception deriving from
AnalyzeLayerException on failure. Callers should handle all cases of AnalyzeLayerException.
"""
request = self._new_analyze_request(layer)
if not request:
raise AnalyzeLayerException
logger.info('Analyzing layer %s', request['Layer']['Name'])
try:
response = self._call('POST', _API_METHOD_INSERT, body=request)
json_response = response.json()
except requests.exceptions.Timeout:
logger.exception('Timeout when trying to post layer data response for %s', layer.id)
raise AnalyzeLayerRetryException
except requests.exceptions.ConnectionError:
logger.exception('Connection error when trying to post layer data response for %s', layer.id)
raise AnalyzeLayerRetryException
except (requests.exceptions.RequestException, ValueError) as re:
logger.exception('Failed to post layer data response for %s: %s', layer.id, re)
raise AnalyzeLayerException
# Handle any errors from the security scanner.
if response.status_code != 201:
message = json_response.get('Error').get('Message', '')
logger.warning('A warning event occurred when analyzing layer %s (status code %s): %s',
request['Layer']['Name'], response.status_code, message)
# 400 means the layer could not be analyzed due to a bad request.
if response.status_code == 400:
if message == UNKNOWN_PARENT_LAYER_ERROR_MSG:
raise MissingParentLayerException('Bad request to security scanner: %s' % message)
else:
raise AnalyzeLayerException('Bad request to security scanner: %s' % message)
# 422 means that the layer could not be analyzed:
# - the layer could not be extracted (might be a manifest or an invalid .tar.gz)
# - the layer operating system / package manager is unsupported
elif response.status_code == 422:
raise InvalidLayerException
# Otherwise, it is some other error and we should retry.
else:
raise AnalyzeLayerRetryException
# Return the parsed API version.
return json_response['Layer']['IndexedByVersion']
def check_layer_vulnerable(self, layer_id, cve_name):
""" Checks to see if the layer with the given ID is vulnerable to the specified CVE. """
layer_data = self._get_layer_data(layer_id, include_vulnerabilities=True)
if layer_data is None or 'Layer' not in layer_data or 'Features' not in layer_data['Layer']:
return False
for feature in layer_data['Layer']['Features']:
for vuln in feature.get('Vulnerabilities', []):
if vuln['Name'] == cve_name:
return True
return False
def get_notification(self, notification_name, layer_limit=100, page=None):
""" Gets the data for a specific notification, with optional page token.
Returns a tuple of the data (None on failure) and whether to retry.
"""
try:
params = {
'limit': layer_limit
}
if page is not None:
params['page'] = page
response = self._call('GET', _API_METHOD_GET_NOTIFICATION % notification_name, params=params)
json_response = response.json()
except requests.exceptions.Timeout:
logger.exception('Timeout when trying to get notification for %s', notification_name)
return None, True
except requests.exceptions.ConnectionError:
logger.exception('Connection error when trying to get notification for %s', notification_name)
return None, True
except (requests.exceptions.RequestException, ValueError):
logger.exception('Failed to get notification for %s', notification_name)
return None, False
if response.status_code != 200:
return None, response.status_code != 404 and response.status_code != 400
return json_response, False
def mark_notification_read(self, notification_name):
""" Marks a security scanner notification as read. """
try:
response = self._call('DELETE', _API_METHOD_MARK_NOTIFICATION_READ % notification_name)
return response.status_code / 100 == 2
except requests.exceptions.RequestException:
logger.exception('Failed to mark notification as read: %s', notification_name)
return False
def get_layer_data(self, layer, include_features=False, include_vulnerabilities=False):
""" Returns the layer data for the specified layer. On error, returns None. """
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
return self._get_layer_data(layer_id, include_features, include_vulnerabilities)
def _get_layer_data(self, layer_id, include_features=False, include_vulnerabilities=False):
try:
params = {}
if include_features:
params = {'features': True}
if include_vulnerabilities:
params = {'vulnerabilities': True}
response = self._call('GET', _API_METHOD_GET_LAYER % layer_id, params=params)
logger.debug('Got response %s for vulnerabilities for layer %s',
response.status_code, layer_id)
json_response = response.json()
except requests.exceptions.Timeout:
raise APIRequestFailure('API call timed out')
except requests.exceptions.ConnectionError:
raise APIRequestFailure('Could not connect to security service')
except (requests.exceptions.RequestException, ValueError):
logger.exception('Failed to get layer data response for %s', layer_id)
raise APIRequestFailure()
if response.status_code == 404:
return None
return json_response
def _call(self, method, relative_url, params=None, body=None):
""" Issues an HTTP call to the sec API at the given relative URL.
This function disconnects from the database while awaiting a response
from the API server.
"""
if self._config is None:
raise Exception('Cannot call unconfigured security system')
client = self._client
headers = {'Connection': 'close'}
timeout = self._config.get('SECURITY_SCANNER_API_TIMEOUT_SECONDS', 10)
endpoint = self._config['SECURITY_SCANNER_ENDPOINT']
if method != 'GET':
timeout = self._config.get('SECURITY_SCANNER_API_BATCH_TIMEOUT_SECONDS', timeout)
endpoint = self._config.get('SECURITY_SCANNER_ENDPOINT_BATCH') or endpoint
api_url = urljoin(endpoint, '/' + self._config.get('SECURITY_SCANNER_API_VERSION', 'v1')) + '/'
url = urljoin(api_url, relative_url)
signer_proxy_url = self._config.get('JWTPROXY_SIGNER', 'localhost:8080')
with CloseForLongOperation(self._config):
logger.debug('%sing security URL %s', method.upper(), url)
return client.request(method, url, json=body, params=params, timeout=timeout,
verify='/conf/mitm.cert', headers=headers,
proxies={
'https': 'https://' + signer_proxy_url,
'http': 'http://' + signer_proxy_url
})