From e86a34286885414f064b9395da21f8503730e7d2 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Thu, 12 Nov 2015 15:46:31 -0500 Subject: [PATCH 1/3] create class for security config validation --- util/secscan/api.py | 72 ++++++++++++++++++++++++++++++++------------- 1 file changed, 52 insertions(+), 20 deletions(-) diff --git a/util/secscan/api.py b/util/secscan/api.py index b0138eeef..331e45294 100644 --- a/util/secscan/api.py +++ b/util/secscan/api.py @@ -98,6 +98,54 @@ def get_priority_for_index(index): return 'Unknown' +class SecurityConfigValidator(object): + def __init__(self, app, config_provider): + self._config_provider = config_provider + + if not features.SECURITY_SCANNER: + return + + self._security_config = app.config['SECURITY_SCANNER'] + if self._security_config is None: + return + + self._certificate = self._get_filepath('CA_CERTIFICATE_FILENAME') or False + self._public_key = self._get_filepath('PUBLIC_KEY_FILENAME') + self._private_key = self._get_filepath('PRIVATE_KEY_FILENAME') + + if self._public_key and self._private_key: + self._keys = (self._public_key, self._private_key) + else: + self._keys = None + + def _get_filepath(self, key): + config = self._security_config + + if key in config: + with self._config_provider.get_volume_file(config[key]) as f: + return f.name + + return None + + def cert(self): + return self._certificate + + def keypair(self): + return self._keys + + def valid(self): + config = self._security_config + + if (not features.SECURITY_SCANNER + or not config + or not 'ENDPOINT' in config + or not 'ENGINE_VERSION_TARGET' in config + or not 'DISTRIBUTED_STORAGE_PREFERENCE' in config + or (self._certificate is False and self._keys is None)): + return False + + return True + class SecurityScannerAPI(object): """ Helper class for talking to the Security Scan service (Clair). """ @@ -105,28 +153,12 @@ class SecurityScannerAPI(object): self.app = app self.config_provider = config_provider - if not features.SECURITY_SCANNER: + config_validator = SecurityConfigValidator(app, config_provider) + if not config_validator.valid(): return - self.security_config = app.config['SECURITY_SCANNER'] - - self.certificate = self._getfilepath('CA_CERTIFICATE_FILENAME') or False - self.public_key = self._getfilepath('PUBLIC_KEY_FILENAME') - self.private_key = self._getfilepath('PRIVATE_KEY_FILENAME') - - if self.public_key and self.private_key: - self.keys = (self.public_key, self.private_key) - else: - self.keys = None - - def _getfilepath(self, config_key): - security_config = self.security_config - - if config_key in security_config: - with self.config_provider.get_volume_file(security_config[config_key]) as f: - return f.name - - return None + self.certificate = config_validator.cert() + self.keys = config_validator.keypair() def check_layer_vulnerable(self, layer_id, cve_id): """ Checks with Clair whether the given layer is vulnerable to the given CVE. """ From f6a34c5d0627f4478dd0c25f2684166a485f1a90 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Wed, 11 Nov 2015 15:41:46 -0500 Subject: [PATCH 2/3] refactor securityworker Fixes #772. --- workers/securityworker.py | 291 ++++++++++++++++++-------------------- 1 file changed, 137 insertions(+), 154 deletions(-) diff --git a/workers/securityworker.py b/workers/securityworker.py index 26d360754..1f4328741 100644 --- a/workers/securityworker.py +++ b/workers/securityworker.py @@ -6,17 +6,16 @@ import features import time import os import random -import json from endpoints.notificationhelper import spawn_notification from collections import defaultdict -from sys import exc_info from peewee import JOIN_LEFT_OUTER -from app import app, storage, OVERRIDE_CONFIG_DIRECTORY, secscan_api +from app import app, config_provider, storage, OVERRIDE_CONFIG_DIRECTORY, secscan_api from workers.worker import Worker from data.database import (Image, ImageStorage, ImageStorageLocation, ImageStoragePlacement, db_random_func, UseThenDisconnect, RepositoryTag, Repository, ExternalNotificationEvent, RepositoryNotification) +from util.secscan.api import SecurityConfigValidator logger = logging.getLogger(__name__) @@ -25,12 +24,12 @@ INDEXING_INTERVAL = 10 API_METHOD_INSERT = '/v1/layers' API_METHOD_VERSION = '/v1/versions/engine' -def _get_image_to_export(version): +def _get_images_to_export_list(version): Parent = Image.alias() ParentImageStorage = ImageStorage.alias() rimages = [] - # Without parent + # Collect the images without parents candidates = (Image .select(Image.id, Image.docker_image_id, ImageStorage.uuid) .join(ImageStorage) @@ -54,7 +53,7 @@ def _get_image_to_export(version): 'parent_docker_image_id': None, 'parent_storage_uuid': None}) - # With analyzed parent + # Collect the images with analyzed parents. candidates = (Image .select(Image.id, Image.docker_image_id, @@ -90,9 +89,8 @@ def _get_image_to_export(version): 'parent_docker_image_id': image[3], 'parent_storage_uuid': image[4]}) - # Re-shuffle, otherwise the images without parents will always be on the top + # Shuffle the images, otherwise the images without parents will always be on the top random.shuffle(rimages) - return rimages def _get_storage_locations(uuid): @@ -102,12 +100,8 @@ def _get_storage_locations(uuid): .switch(ImageStoragePlacement) .join(ImageStorage, JOIN_LEFT_OUTER) .where(ImageStorage.uuid == uuid)) - return query.get() - locations = list() - for location in query: - locations.append(location.location.name) - return locations + return [location.location.name for location in query] def _update_image(image, indexed, version): query = (Image @@ -116,174 +110,163 @@ def _update_image(image, indexed, version): .where(Image.docker_image_id == image['docker_image_id'], ImageStorage.uuid == image['storage_uuid'])) - updated_images = list() - for row in query: - updated_images.append(row.id) - (Image - .update(security_indexed=indexed, security_indexed_engine=version) - .where(Image.id << updated_images) - .execute()) + .update(security_indexed=indexed, security_indexed_engine=version) + .where(Image.id << [row.id for row in query]) + .execute()) + class SecurityWorker(Worker): def __init__(self): super(SecurityWorker, self).__init__() - if self._load_configuration(): + validator = SecurityConfigValidator(app.config, config_provider) + if validator.valid(): + secscan_config = app.config.get('SECURITY_SCANNER') + self._api = secscan_config['ENDPOINT'] + self._target_version = secscan_config['ENGINE_VERSION_TARGET'] + self._default_storage_locations = app.config['DISTRIBUTED_STORAGE_PREFERENCE'] + self._cert = validator.cert() + self._keys = validator.keypair() + self.add_operation(self._index_images, INDEXING_INTERVAL) - def _load_configuration(self): - # Load configuration - config = app.config.get('SECURITY_SCANNER') + def _get_image_url(self, image): + """ Gets the download URL for an image and if the storage doesn't exist, + marks the image as unindexed. """ + path = storage.image_layer_path(image['storage_uuid']) + locations = self._default_storage_locations - if (not config - or not 'ENDPOINT' in config or not 'ENGINE_VERSION_TARGET' in config - or not 'DISTRIBUTED_STORAGE_PREFERENCE' in app.config): - logger.exception('No configuration found for the security worker') - return False - self._api = config['ENDPOINT'] - self._target_version = config['ENGINE_VERSION_TARGET'] - self._default_storage_locations = app.config['DISTRIBUTED_STORAGE_PREFERENCE'] + if not storage.exists(locations, path): + locations = _get_storage_locations(image['storage_uuid']) - self._ca = False - self._cert = None - if 'CA_CERTIFICATE_FILENAME' in config: - self._ca = os.path.join(OVERRIDE_CONFIG_DIRECTORY, config['CA_CERTIFICATE_FILENAME']) - if not os.path.isfile(self._ca): - logger.exception('Could not find configured CA file') - return False - if 'PRIVATE_KEY_FILENAME' in config and 'PUBLIC_KEY_FILENAME' in config: - self._cert = ( - os.path.join(OVERRIDE_CONFIG_DIRECTORY, config['PUBLIC_KEY_FILENAME']), - os.path.join(OVERRIDE_CONFIG_DIRECTORY, config['PRIVATE_KEY_FILENAME']), - ) - if not os.path.isfile(self._cert[0]) or not os.path.isfile(self._cert[1]): - logger.exception('Could not find configured key pair files') - return False + if not storage.exists(locations, path): + logger.warning('Could not find a valid location to download layer %s', + image['docker_image_id']+'.'+image['storage_uuid']) + try: + _update_image(image, False, self._target_version) + except: + logger.exception('Failed to update unindexed image') + return None - return True + uri = storage.get_direct_download_url(locations, path) + # Local storage hack + if uri is None: + uri = path + + return uri + + def _new_request(self, image): + url = self._get_image_url(image) + if url is None: + return None + + request = { + 'ID': image['docker_image_id']+'.'+image['storage_uuid'], + 'Path': url, + } + + if image['parent_docker_image_id'] is not None and image['parent_storage_uuid'] is not None: + request['ParentID'] = image['parent_docker_image_id']+'.'+image['parent_storage_uuid'] + + return request + + def _analyze_image(self, image): + request = self._new_request(image) + if request is None: + return None + + try: + logger.info('Analyzing %s', request['ID']) + # Using invalid certificates doesn't return proper errors because of + # https://github.com/shazow/urllib3/issues/556 + httpResponse = requests.post(self._api + API_METHOD_INSERT, json=request, + cert=self._keys, verify=self._cert) + jsonResponse = httpResponse.json() + except: + logger.exception('An exception occurred when analyzing layer ID %s', request['ID']) + return None + + # Handle any errors from the security scanner. + if httpResponse.status_code != 201: + if 'Message' in jsonResponse: + if 'OS and/or package manager are not supported' in jsonResponse['Message']: + # The current engine could not index this layer + logger.warning('A warning event occurred when analyzing layer ID %s : %s', + request['ID'], jsonResponse['Message']) + + # Hopefully, there is no version lower than the target one running + try: + _update_image(image, False, self._target_version) + except: + logger.exception('Failed to update image to be unindexed') + else: + logger.warning('Failed to handle JSON message "%s" when analyzing layer ID %s', + jsonResponse['Message'], request['ID']) + return None + else: + logger.warning('No message found in JSON response when analyzing layer ID %s', request['ID']) + return None + + api_version = jsonResponse['Version'] + if api_version < self._target_version: + logger.warning('An engine runs on version %d but the target version is %d') + + try: + _update_image(image, True, api_version) + logger.debug('Layer %s analyzed successfully', request['ID']) + except: + logger.exception('Failed to update image to be indexed') + + logger.debug('Loading vulnerabilities for layer %s', image['image_id']) + try: + response = secscan_api.call('layers/%s/vulnerabilities', None, request['ID']) + logger.debug('Got response %s for vulnerabilities for layer %s', + response.status_code, image['image_id']) + if response.status_code == 404: + return None + except: + logger.exception('Failed to get vulnerability response for %s', image['image_id']) + return None + + return response.json() def _index_images(self): - logger.debug('Starting indexing') + logger.debug('Started indexing') with UseThenDisconnect(app.config): while True: - logger.debug('Looking up images to index') - - # Get images to analyze + images = [] try: - images = _get_image_to_export(self._target_version) - if not images: - logger.debug('No more image to analyze') - return - + logger.debug('Looking up images to index') + images = _get_images_to_export_list(self._target_version) except Image.DoesNotExist: - logger.debug('No more image to analyze') + pass + + if not images: + logger.debug('No more images left to analyze') return + logger.debug('Found %d images to index' % len(images)) - logger.debug('Found images to index: %s', images) - for img in images: - # Get layer storage URL - path = storage.image_layer_path(img['storage_uuid']) - locations = self._default_storage_locations - - if not storage.exists(locations, path): - locations = _get_storage_locations(img['storage_uuid']) - - if not storage.exists(locations, path): - logger.warning('Could not find a valid location to download layer %s', - img['docker_image_id']+'.'+img['storage_uuid']) - _update_image(img, False, self._target_version) - continue - - uri = storage.get_direct_download_url(locations, path) - if uri == None: - # Local storage hack - uri = path - - # Forge request - request = { - 'ID': img['docker_image_id']+'.'+img['storage_uuid'], - 'Path': uri - } - if img['parent_docker_image_id'] is not None and img['parent_storage_uuid'] is not None: - request['ParentID'] = img['parent_docker_image_id']+'.'+img['parent_storage_uuid'] - - # Post request - try: - logger.info('Analyzing %s', request['ID']) - # Using invalid certificates doesn't return proper errors because of - # https://github.com/shazow/urllib3/issues/556 - httpResponse = requests.post(self._api + API_METHOD_INSERT, json=request, - cert=self._cert, verify=self._ca) - except: - logger.exception('An exception occurred when analyzing layer ID %s : %s', - request['ID'], exc_info()[0]) - return - try: - jsonResponse = httpResponse.json() - except: - logger.exception('An exception occurred when analyzing layer ID %s : the response is \ - not valid JSON (%s)', request['ID'], httpResponse.text) - return - - if httpResponse.status_code != 201: - if 'Message' in jsonResponse: - if 'OS and/or package manager are not supported' in jsonResponse['Message']: - # The current engine could not index this layer - logger.warning('A warning event occurred when analyzing layer ID %s : %s', - request['ID'], jsonResponse['Message']) - # Hopefully, there is no version lower than the target one running - _update_image(img, False, self._target_version) - else: - logger.exception('An exception occurred when analyzing layer ID %s : %d %s', - request['ID'], httpResponse.status_code, jsonResponse['Message']) - return - else: - logger.exception('An exception occurred when analyzing layer ID %s : %d', - request['ID'], httpResponse.status_code) - return - - # The layer has been successfully indexed - api_version = jsonResponse['Version'] - if api_version < self._target_version: - logger.warning('An engine runs on version %d but the target version is %d') - - logger.debug('Layer %s analyzed successfully', request['ID']) - _update_image(img, True, api_version) - - - # TODO(jschorr): Put this in a proper place, properly comment, unify with the - # callback code, etc. - try: - logger.debug('Loading vulnerabilities for layer %s', img['image_id']) - response = secscan_api.call('layers/%s/vulnerabilities', request['ID']) - except requests.exceptions.Timeout: - logger.debug('Timeout when calling Sec') + for image in images: + sec_data = self._analyze_image(image) + if sec_data is None: continue - except requests.exceptions.ConnectionError: - logger.debug('Connection error when calling Sec') - continue - - logger.debug('Got response %s for vulnerabilities for layer %s', response.status_code, img['image_id']) - if response.status_code == 404: - continue - - sec_data = json.loads(response.text) - logger.debug('Got response vulnerabilities for layer %s: %s', img['image_id'], sec_data) + logger.debug('Got response vulnerabilities for layer %s: %s', image['image_id'], sec_data) if not sec_data['Vulnerabilities']: continue + # Dispatch events for any detected vulnerabilities event = ExternalNotificationEvent.get(name='vulnerability_found') matching = (RepositoryTag - .select(RepositoryTag, Repository) - .distinct() - .join(Repository) - .join(RepositoryNotification) - .where(RepositoryNotification.event == event, - RepositoryTag.image == img['image_id'])) + .select(RepositoryTag, Repository) + .distinct() + .join(Repository) + .join(RepositoryNotification) + .where(RepositoryNotification.event == event, + RepositoryTag.image == image['image_id'])) - repository_map = defaultdict(list) + repository_map = defaultdict() for tag in matching: repository_map[tag.repository_id].append(tag) From 37ce84f6af86c9fa5e246d308ebdee9f24a9c11f Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Thu, 12 Nov 2015 17:02:18 -0500 Subject: [PATCH 3/3] tiny fixes to securityworker --- util/secscan/api.py | 26 +++++++++++++------------- workers/securityworker.py | 25 ++++++++++++++++++------- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/util/secscan/api.py b/util/secscan/api.py index 331e45294..277fd3f6d 100644 --- a/util/secscan/api.py +++ b/util/secscan/api.py @@ -134,13 +134,11 @@ class SecurityConfigValidator(object): return self._keys def valid(self): - config = self._security_config - if (not features.SECURITY_SCANNER - or not config - or not 'ENDPOINT' in config - or not 'ENGINE_VERSION_TARGET' in config - or not 'DISTRIBUTED_STORAGE_PREFERENCE' in config + or not self._security_config + or not 'ENDPOINT' in self._security_config + or not 'ENGINE_VERSION_TARGET' in self._security_config + or not 'DISTRIBUTED_STORAGE_PREFERENCE' in self._security_config or (self._certificate is False and self._keys is None)): return False @@ -155,10 +153,12 @@ class SecurityScannerAPI(object): config_validator = SecurityConfigValidator(app, config_provider) if not config_validator.valid(): + logger.warning('Invalid config provided to SecurityScannerAPI') return - self.certificate = config_validator.cert() - self.keys = config_validator.keypair() + self._security_config = app.config.get('SECURITY_SCANNER') + self._certificate = config_validator.cert() + self._keys = config_validator.keypair() def check_layer_vulnerable(self, layer_id, cve_id): """ Checks with Clair whether the given layer is vulnerable to the given CVE. """ @@ -191,7 +191,7 @@ class SecurityScannerAPI(object): This function disconnects from the database while awaiting a response from the API server. """ - security_config = self.security_config + security_config = self._security_config api_url = urljoin(security_config['ENDPOINT'], '/' + security_config['API_VERSION']) + '/' url = urljoin(api_url, relative_url % args) @@ -201,8 +201,8 @@ class SecurityScannerAPI(object): with CloseForLongOperation(self.app.config): if body is not None: - return client.post(url, json=body, params=kwargs, timeout=timeout, cert=self.keys, - verify=self.certificate) + return client.post(url, json=body, params=kwargs, timeout=timeout, cert=self._keys, + verify=self._certificate) else: - return client.get(url, params=kwargs, timeout=timeout, cert=self.keys, - verify=self.certificate) + return client.get(url, params=kwargs, timeout=timeout, cert=self._keys, + verify=self._certificate) diff --git a/workers/securityworker.py b/workers/securityworker.py index 1f4328741..664e91472 100644 --- a/workers/securityworker.py +++ b/workers/securityworker.py @@ -129,6 +129,7 @@ class SecurityWorker(Worker): self._keys = validator.keypair() self.add_operation(self._index_images, INDEXING_INTERVAL) + logger.warning('Failed to validate security scan configuration') def _get_image_url(self, image): """ Gets the download URL for an image and if the storage doesn't exist, @@ -149,9 +150,18 @@ class SecurityWorker(Worker): return None uri = storage.get_direct_download_url(locations, path) - # Local storage hack if uri is None: - uri = path + # Handle local storage + local_storage_enabled = False + for storage_type, _ in app.config.get('DISTRIBUTED_STORAGE_CONFIG', {}).values(): + if storage_type == 'LocalStorage': + local_storage_enabled = True + + if local_storage_enabled: + uri = path + else: + logger.warning('Could not get image URL and local storage was not enabled') + return None return uri @@ -161,12 +171,13 @@ class SecurityWorker(Worker): return None request = { - 'ID': image['docker_image_id']+'.'+image['storage_uuid'], + 'ID': '%s.%s' % (image['docker_image_id'], image['storage_uuid']), 'Path': url, } if image['parent_docker_image_id'] is not None and image['parent_storage_uuid'] is not None: - request['ParentID'] = image['parent_docker_image_id']+'.'+image['parent_storage_uuid'] + request['ParentID'] = '%s.%s' % (image['parent_docker_image_id'], + image['parent_storage_uuid']) return request @@ -182,7 +193,7 @@ class SecurityWorker(Worker): httpResponse = requests.post(self._api + API_METHOD_INSERT, json=request, cert=self._keys, verify=self._cert) jsonResponse = httpResponse.json() - except: + except (requests.exceptions.RequestException, ValueError): logger.exception('An exception occurred when analyzing layer ID %s', request['ID']) return None @@ -245,18 +256,18 @@ class SecurityWorker(Worker): if not images: logger.debug('No more images left to analyze') return - logger.debug('Found %d images to index' % len(images)) + logger.debug('Found %d images to index', len(images)) for image in images: sec_data = self._analyze_image(image) if sec_data is None: continue - logger.debug('Got response vulnerabilities for layer %s: %s', image['image_id'], sec_data) if not sec_data['Vulnerabilities']: continue # Dispatch events for any detected vulnerabilities + logger.debug('Got response vulnerabilities for layer %s: %s', image['image_id'], sec_data) event = ExternalNotificationEvent.get(name='vulnerability_found') matching = (RepositoryTag .select(RepositoryTag, Repository)