From e5da33578c24667ef9d0f1bcbea96e755320a590 Mon Sep 17 00:00:00 2001 From: Quentin Machu Date: Wed, 17 Feb 2016 14:44:49 -0500 Subject: [PATCH] Adapt security worker for Clair v1.0 (except notifications) --- data/model/image.py | 68 ++-------- workers/securityworker.py | 262 +++++++++++++++++++++----------------- 2 files changed, 160 insertions(+), 170 deletions(-) diff --git a/data/model/image.py b/data/model/image.py index 0665c5750..8078abd58 100644 --- a/data/model/image.py +++ b/data/model/image.py @@ -426,57 +426,16 @@ def ensure_image_locations(*names): data = [{'name': name} for name in insert_names] ImageStorageLocation.insert_many(data).execute() -def get_secscan_candidates(engine_version, batch_size): +def get_image_with_storage_and_parent_base(): Parent = Image.alias() ParentImageStorage = ImageStorage.alias() - rimages = [] - - # Collect the images without parents. - candidates = list(Image - .select(Image.id) - .join(ImageStorage) - .where(Image.security_indexed_engine < engine_version, - Image.parent >> None, - ImageStorage.uploading == False) - .limit(batch_size*10)) - - if len(candidates) > 0: - images = (Image - .select(Image, ImageStorage) - .join(ImageStorage) - .where(Image.id << candidates) - .order_by(db_random_func()) - .limit(batch_size)) - rimages.extend(images) - - # Collect the images with analyzed parents. - candidates = list(Image - .select(Image.id) - .join(Parent, on=(Image.parent == Parent.id)) - .switch(Image) - .join(ImageStorage) - .where(Image.security_indexed_engine < engine_version, - Parent.security_indexed_engine == engine_version, - ImageStorage.uploading == False) - .limit(batch_size*10)) - - if len(candidates) > 0: - images = (Image - .select(Image, ImageStorage, Parent, ParentImageStorage) - .join(Parent, on=(Image.parent == Parent.id)) - .join(ParentImageStorage, on=(ParentImageStorage.id == Parent.storage)) - .switch(Image) - .join(ImageStorage) - .where(Image.id << candidates) - .order_by(db_random_func()) - .limit(batch_size)) - rimages.extend(images) - - # Shuffle the images, otherwise the images without parents will always be on the top - random.shuffle(rimages) - - return rimages + return (Image + .select(Image, ImageStorage, Parent, ParentImageStorage) + .join(ImageStorage) + .switch(Image) + .join(Parent, JOIN_LEFT_OUTER, on=(Image.parent == Parent.id)) + .join(ParentImageStorage, JOIN_LEFT_OUTER, on=(ParentImageStorage.id == Parent.storage))) def set_secscan_status(image, indexed, version): query = (Image @@ -487,12 +446,13 @@ def set_secscan_status(image, indexed, version): ids_to_update = [row.id for row in query] if not ids_to_update: - return + return False - (Image - .update(security_indexed=indexed, security_indexed_engine=version) - .where(Image.id << ids_to_update) - .execute()) + return (Image + .update(security_indexed=indexed, security_indexed_engine=version) + .where(Image.id << ids_to_update) + .where((Image.security_indexed_engine != version) | (Image.security_indexed != indexed)) + .execute()) != 0 def find_or_create_derived_storage(source_image, transformation_name, preferred_location): @@ -536,5 +496,3 @@ def delete_derived_storage_by_uuid(storage_uuid): return image_storage.delete_instance(recursive=True) - - diff --git a/workers/securityworker.py b/workers/securityworker.py index e6fe492a1..51998e068 100644 --- a/workers/securityworker.py +++ b/workers/securityworker.py @@ -5,24 +5,26 @@ import requests import features import time -from endpoints.notificationhelper import spawn_notification +from peewee import fn from collections import defaultdict + from app import app, config_provider, storage, secscan_api +from endpoints.notificationhelper import spawn_notification from workers.worker import Worker from data import model +from data.database import (Image, UseThenDisconnect, ExternalNotificationEvent) from data.model.tag import filter_tags_have_repository_event, get_tags_for_image -from data.model.image import get_secscan_candidates, set_secscan_status +from data.model.image import set_secscan_status, get_image_with_storage_and_parent_base from data.model.storage import get_storage_locations -from data.database import ExternalNotificationEvent from util.secscan.api import SecurityConfigValidator - -logger = logging.getLogger(__name__) +from util.migrate.allocator import yield_random_entries BATCH_SIZE = 50 INDEXING_INTERVAL = 30 API_METHOD_INSERT = '/v1/layers' -API_METHOD_VERSION = '/v1/versions/engine' +API_METHOD_GET_WITH_VULNERABILITIES = '/v1/layers/%s?vulnerabilities' +logger = logging.getLogger(__name__) class SecurityWorker(Worker): def __init__(self): @@ -40,6 +42,26 @@ class SecurityWorker(Worker): else: logger.warning('Failed to validate security scan configuration') + def _new_request(self, image): + """ Create the request body to submit the given image for analysis. """ + url = self._get_image_url(image) + if url is None: + return None + + request = { + 'Layer': { + 'Name': '%s.%s' % (image.docker_image_id, image.storage.uuid), + 'Path': url, + 'Format': 'Docker' + } + } + + if image.parent.docker_image_id and image.parent.storage.uuid: + request['Layer']['ParentName'] = '%s.%s' % (image.parent.docker_image_id, + image.parent.storage.uuid) + + return request + def _get_image_url(self, image): """ Gets the download URL for an image and if the storage doesn't exist, marks the image as unindexed. """ @@ -71,147 +93,157 @@ class SecurityWorker(Worker): return uri - def _new_request(self, image): - url = self._get_image_url(image) - if url is None: - return None + def _index_images(self): + def batch_query(): + base_query = get_image_with_storage_and_parent_base() + return base_query.where(Image.security_indexed_engine < self._target_version) - request = { - 'ID': '%s.%s' % (image.docker_image_id, image.storage.uuid), - 'Path': url, - } + min_id = (Image + .select(fn.Min(Image.id)) + .where(Image.security_indexed_engine < self._target_version) + .scalar()) + max_id = Image.select(fn.Max(Image.id)).scalar() - if image.parent is not None: - request['ParentID'] = '%s.%s' % (image.parent.docker_image_id, - image.parent.storage.uuid) + with UseThenDisconnect(app.config): + for candidate, abt in yield_random_entries(batch_query, Image.id, BATCH_SIZE, max_id, min_id): + _, continue_batch = self._analyze_recursively(candidate) + if not continue_batch: + logger.info('Another worker pre-empted us for layer: %s', candidate.id) + abt.set() - return request + def _analyze_recursively(self, layer): + """ Analyzes a layer and all its parents """ + if layer.parent_id and layer.parent.security_indexed_engine < self._target_version: + # The image has a parent that is not analyzed yet with this engine. + # Get the parent to get it's own parent and recurse. + try: + base_query = get_image_with_storage_and_parent_base() + parent_layer = base_query.where(Image.id == layer.parent_id).get() + except Image.DoesNotExist: + logger.warning("Image %s has Image %s as parent but doesn't exist.", layer.id, + layer.parent_id) - def _analyze_image(self, image): - """ Analyzes an image by passing it to Clair. """ - request = self._new_request(image) + return False, set_secscan_status(layer, False, self._target_version) + + cont, _ = self._analyze_recursively(parent_layer) + if not cont: + # The analysis failed for some reason and did not mark the layer as failed, + # thus we should not try to analyze the children of that layer. + # Interrupt the recursive analysis and return as no-one pre-empted us. + return False, True + + # Now we know all parents are analyzed. + return self._analyze(layer) + + def _analyze(self, layer): + """ Analyzes a single layer. + Return two bools, the first one tells us if we should evaluate its children, the second + one is set to False when another worker pre-empted the candidate's analysis for us. """ + + # If the parent couldn't be analyzed with the target version or higher, we can't analyze + # this image. Mark it as failed with the current target version. + if (layer.parent_id and not layer.parent.security_indexed and + layer.parent.security_indexed_engine >= self._target_version): + return True, set_secscan_status(layer, False, self._target_version) + + request = self._new_request(layer) if request is None: - return False + return False, True # Analyze the image. try: - logger.info('Analyzing %s', request['ID']) + logger.info('Analyzing layer %s', request['Layer']['Name']) # Using invalid certificates doesn't return proper errors because of # https://github.com/shazow/urllib3/issues/556 - httpResponse = requests.post(self._api + API_METHOD_INSERT, json=request, - cert=self._keys, verify=self._cert) - jsonResponse = httpResponse.json() + http_response = requests.post(self._api + API_METHOD_INSERT, json=request, + cert=self._keys, verify=self._cert) + json_response = http_response.json() except (requests.exceptions.RequestException, ValueError): - logger.exception('An exception occurred when analyzing layer ID %s', request['ID']) - return False + logger.exception('An exception occurred when analyzing layer %s', request['Layer']['Name']) + return False, True # Handle any errors from the security scanner. - if httpResponse.status_code != 201: - message = jsonResponse.get('Message', '') - if 'OS and/or package manager are not supported' in message or 'could not extract' in message: - # The current engine could not index this layer or we tried to index a manifest. - logger.warning('A warning event occurred when analyzing layer ID %s : %s', - request['ID'], jsonResponse['Message']) + if http_response.status_code != 201: + message = json_response.get('Error').get('Message', '') + logger.warning('A warning event occurred when analyzing layer %s (status code %s): %s', + request['Layer']['Name'], http_response.status_code, message) - # Hopefully, there is no version lower than the target one running - set_secscan_status(image, False, self._target_version) - - return False + # 422 means that the layer could not be analyzed: + # - the layer could not be extracted (manifest?) + # - the layer operating system / package manager is unsupported + # Set the layer as failed. + if http_response.status_code == 422: + return True, set_secscan_status(layer, False, self._target_version) else: - logger.warning('Got non-201 when analyzing layer ID %s: %s', request['ID'], jsonResponse) - return False + return False, True # Verify that the version matches. - api_version = jsonResponse['Version'] + api_version = json_response['Layer']['IndexedByVersion'] if api_version < self._target_version: - logger.warning('An engine runs on version %d but the target version is %d') + logger.warning('An engine runs on version %d but the target version is %d', api_version, + self._target_version) # Mark the image as analyzed. - logger.debug('Layer %s analyzed successfully', image.id) - set_secscan_status(image, True, api_version) + logger.info('Analyzed layer %s successfully', request['Layer']['Name']) + set_status = set_secscan_status(layer, True, api_version) - return True + # If we are the one who've done the job successfully first, get the vulnerabilities and + # send notifications to the repos that have a tag on that layer. + # TODO(josephschorr): Adapt this depending on the new notification format we adopt. + # if set_status: + # # Get the tags of the layer we analyzed. + # repository_map = defaultdict(list) + # event = ExternalNotificationEvent.get(name='vulnerability_found') + # matching = list(filter_tags_have_repository_event(get_tags_for_image(layer.id), event)) + # + # for tag in matching: + # repository_map[tag.repository_id].append(tag) + # + # # If there is at least one tag, + # # Lookup the vulnerabilities for the image, now that it is analyzed. + # if len(repository_map) > 0: + # logger.debug('Loading vulnerabilities for layer %s', layer.id) + # sec_data = self._get_vulnerabilities(layer) + # + # if sec_data is not None: + # # Dispatch events for any detected vulnerabilities + # logger.debug('Got vulnerabilities for layer %s: %s', layer.id, sec_data) + # + # for repository_id in repository_map: + # tags = repository_map[repository_id] + # + # for vuln in sec_data['Vulnerabilities']: + # event_data = { + # 'tags': [tag.name for tag in tags], + # 'vulnerability': { + # 'id': vuln['Name'], + # 'description': vuln['Description'], + # 'link': vuln['Link'], + # 'priority': vuln['Priority'], + # }, + # } + # + # spawn_notification(tags[0].repository, 'vulnerability_found', event_data) - def _get_vulnerabilities(self, image): + return True, set_status + + def _get_vulnerabilities(self, layer): """ Returns the vulnerabilities detected (if any) or None on error. """ try: - response = secscan_api.call('layers/%s/vulnerabilities', None, - '%s.%s' % (image.docker_image_id, image.storage.uuid)) + response = secscan_api.call(self._api + API_METHOD_GET_WITH_VULNERABILITIES, None, + '%s.%s' % (layer.docker_image_id, layer.storage.uuid)) + logger.debug('Got response %s for vulnerabilities for layer %s', - response.status_code, image.id) + response.status_code, layer.id) + if response.status_code == 404: return None except (requests.exceptions.RequestException, ValueError): - logger.exception('Failed to get vulnerability response for %s', image.id) + logger.exception('Failed to get vulnerability response for %s', layer.id) return None return response.json() - def _index_images(self): - logger.debug('Started indexing') - event = ExternalNotificationEvent.get(name='vulnerability_found') - - while True: - # Lookup the images to index. - images = [] - logger.debug('Looking up images to index') - images = get_secscan_candidates(self._target_version, BATCH_SIZE) - - if not images: - logger.debug('No more images left to analyze') - return - - logger.debug('Found %d images to index', len(images)) - for image in images: - # If we couldn't analyze the parent, we can't analyze this image. - if (image.parent and not image.parent.security_indexed and - image.parent.security_indexed_engine >= self._target_version): - set_secscan_status(image, False, self._target_version) - continue - - # Analyze the image. - analyzed = self._analyze_image(image) - if not analyzed: - continue - - # Get the tags of the image we analyzed - matching = list(filter_tags_have_repository_event(get_tags_for_image(image.id), event)) - - repository_map = defaultdict(list) - - for tag in matching: - repository_map[tag.repository_id].append(tag) - - # If there is at least one tag, - # Lookup the vulnerabilities for the image, now that it is analyzed. - if len(repository_map) > 0: - logger.debug('Loading vulnerabilities for layer %s', image.id) - sec_data = self._get_vulnerabilities(image) - - if sec_data is None: - continue - - if not sec_data.get('Vulnerabilities'): - continue - - # Dispatch events for any detected vulnerabilities - logger.debug('Got vulnerabilities for layer %s: %s', image.id, sec_data) - - for repository_id in repository_map: - tags = repository_map[repository_id] - - for vuln in sec_data['Vulnerabilities']: - event_data = { - 'tags': [tag.name for tag in tags], - 'vulnerability': { - 'id': vuln['ID'], - 'description': vuln['Description'], - 'link': vuln['Link'], - 'priority': vuln['Priority'], - }, - } - - spawn_notification(tags[0].repository, 'vulnerability_found', event_data) if __name__ == '__main__': if not features.SECURITY_SCANNER: