diff --git a/app.py b/app.py index 01c4ae6ab..964a79536 100644 --- a/app.py +++ b/app.py @@ -35,7 +35,7 @@ from util.saas.metricqueue import MetricQueue from util.config.provider import get_config_provider from util.config.configutil import generate_secret_key from util.config.superusermanager import SuperUserManager -from util.secscan.secscanendpoint import SecurityScanEndpoint +from util.secscan.api import SecurityScannerAPI OVERRIDE_CONFIG_DIRECTORY = 'conf/stack/' OVERRIDE_CONFIG_YAML_FILENAME = 'conf/stack/config.yaml' @@ -148,7 +148,9 @@ image_replication_queue = WorkQueue(app.config['REPLICATION_QUEUE_NAME'], tf) dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf, reporter=MetricQueueReporter(metric_queue)) notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf) -secscan_endpoint = SecurityScanEndpoint(app, config_provider) +secscan_notification_queue = WorkQueue(app.config['SECSCAN_NOTIFICATION_QUEUE_NAME'], tf) + +secscan_api = SecurityScannerAPI(app, config_provider) database.configure(app.config) model.config.app_config = app.config diff --git a/conf/init/service/security_notification_worker/log/run b/conf/init/service/security_notification_worker/log/run new file mode 100644 index 000000000..262fed98e --- /dev/null +++ b/conf/init/service/security_notification_worker/log/run @@ -0,0 +1,2 @@ +#!/bin/sh +exec logger -i -t securitynotificationworker diff --git a/conf/init/service/security_notification_worker/run b/conf/init/service/security_notification_worker/run new file mode 100755 index 000000000..83c94e686 --- /dev/null +++ b/conf/init/service/security_notification_worker/run @@ -0,0 +1,8 @@ +#! /bin/bash + +echo 'Starting security scanner notification worker' + +cd / +venv/bin/python -m workers.security_notification_worker 2>&1 + +echo 'Security scanner notification worker exited' diff --git a/conf/init/service/securityworker/run b/conf/init/service/securityworker/run old mode 100644 new mode 100755 diff --git a/config.py b/config.py index 3e03b951b..3629ecab4 100644 --- a/config.py +++ b/config.py @@ -131,6 +131,7 @@ class DefaultConfig(object): DIFFS_QUEUE_NAME = 'imagediff' DOCKERFILE_BUILD_QUEUE_NAME = 'dockerfilebuild' REPLICATION_QUEUE_NAME = 'imagestoragereplication' + SECSCAN_NOTIFICATION_QUEUE_NAME = 'secscan_notification' # Super user config. Note: This MUST BE an empty list for the default config. SUPER_USERS = [] diff --git a/endpoints/api/sec.py b/endpoints/api/sec.py index d080d2fe1..4e77750f9 100644 --- a/endpoints/api/sec.py +++ b/endpoints/api/sec.py @@ -5,7 +5,7 @@ import features import json import requests -from app import secscan_endpoint +from app import secscan_api from data import model from endpoints.api import (require_repo_read, NotFound, DownstreamIssue, path_param, RepositoryParamResource, resource, nickname, show_if, parse_args, @@ -18,7 +18,7 @@ logger = logging.getLogger(__name__) def _call_security_api(relative_url, *args, **kwargs): """ Issues an HTTP call to the sec API at the given relative URL. """ try: - response = secscan_endpoint.call_api(relative_url, *args, **kwargs) + response = secscan_api.call(relative_url, *args, **kwargs) except requests.exceptions.Timeout: raise DownstreamIssue(payload=dict(message='API call timed out')) except requests.exceptions.ConnectionError: diff --git a/endpoints/api/secscan.py b/endpoints/api/secscan.py index 9a1773ccb..56fcea95d 100644 --- a/endpoints/api/secscan.py +++ b/endpoints/api/secscan.py @@ -5,7 +5,7 @@ import features import json import requests -from app import secscan_endpoint +from app import secscan_api from data import model from endpoints.api import (require_repo_read, NotFound, DownstreamIssue, path_param, RepositoryParamResource, resource, nickname, show_if, parse_args, @@ -18,7 +18,7 @@ logger = logging.getLogger(__name__) def _call_security_api(relative_url, *args, **kwargs): """ Issues an HTTP call to the sec API at the given relative URL. """ try: - response = secscan_endpoint.call_api(relative_url, body=None, *args, **kwargs) + response = secscan_api.call(relative_url, body=None, *args, **kwargs) except requests.exceptions.Timeout: raise DownstreamIssue(payload=dict(message='API call timed out')) except requests.exceptions.ConnectionError: diff --git a/endpoints/secscan.py b/endpoints/secscan.py index 874aec243..7576318e8 100644 --- a/endpoints/secscan.py +++ b/endpoints/secscan.py @@ -1,14 +1,11 @@ import logging +import json + import features -from app import secscan_endpoint +from app import secscan_notification_queue from flask import request, make_response, Blueprint -from data import model -from data.database import (RepositoryNotification, Repository, ExternalNotificationEvent, - RepositoryTag, Image, ImageStorage) from endpoints.common import route_show_if -from endpoints.notificationhelper import spawn_notification -from collections import defaultdict logger = logging.getLogger(__name__) secscan = Blueprint('secscan', __name__) @@ -19,70 +16,10 @@ def secscan_notification(): data = request.get_json() logger.debug('Got notification from Clair: %s', data) - # Find all tags that contain the layer(s) introducing the vulnerability. content = data['Content'] layer_ids = content.get('NewIntroducingLayersIDs', content.get('IntroducingLayersIDs', [])) if not layer_ids: return make_response('Okay') - # TODO(jzelinkskie): Write a queueitem for these layer ids, and do the rest of this - # in a worker. - cve_id = data['Name'] - vulnerability = data['Content']['Vulnerability'] - priority = vulnerability['Priority'] - - # Lookup the external event for when we have vulnerabilities. - event = ExternalNotificationEvent.get(name='vulnerability_found') - - # For each layer, retrieving the matching tags and join with repository to determine which - # require new notifications. - tag_map = defaultdict(set) - repository_map = {} - - for layer_id in layer_ids: - (docker_image_id, storage_uuid) = layer_id.split('.', 2) - tags = model.tag.get_matching_tags(docker_image_id, storage_uuid, RepositoryTag, - Repository, Image, ImageStorage) - - # Additionally filter to tags only in repositories that have the event setup. - matching = (tags.switch(RepositoryTag) - .join(Repository) - .join(RepositoryNotification) - .where(RepositoryNotification.event == event)) - - check_map = {} - for tag in matching: - # Verify that the tag's root image has the vulnerability. - tag_layer_id = '%s.%s' % (tag.image.docker_image_id, tag.image.storage.uuid) - logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id) - - if not tag_layer_id in check_map: - is_vulerable = secscan_endpoint.check_layer_vulnerable(tag_layer_id, cve_id) - check_map[tag_layer_id] = is_vulerable - - logger.debug('Result of layer %s is vulnerable to %s check: %s', tag_layer_id, cve_id, - check_map[tag_layer_id]) - - if check_map[tag_layer_id]: - # Add the vulnerable tag to the list. - tag_map[tag.repository_id].add(tag.name) - repository_map[tag.repository_id] = tag.repository - - # For each of the tags found, issue a notification. - for repository_id in tag_map: - tags = tag_map[repository_id] - event_data = { - 'tags': list(tags), - 'vulnerability': { - 'id': data['Name'], - 'description': vulnerability['Description'], - 'link': vulnerability['Link'], - 'priority': priority, - }, - } - - # TODO: only add this notification if the repository's event(s) defined meet the priority - # minimum. - spawn_notification(repository_map[repository_id], 'vulnerability_found', event_data) - + secscan_notification_queue.put(data['Name'], json.dumps(data)) return make_response('Okay') diff --git a/pylintrc b/pylintrc index e1d21d338..7cecd0de7 100644 --- a/pylintrc +++ b/pylintrc @@ -9,7 +9,7 @@ # --enable=similarities". If you want to run only the classes checker, but have # no Warning level messages displayed, use"--disable=all --enable=classes # --disable=W" -disable=missing-docstring +disable=missing-docstring,invalid-name,too-many-locals [TYPECHECK] diff --git a/util/secscan/secscanendpoint.py b/util/secscan/api.py similarity index 74% rename from util/secscan/secscanendpoint.py rename to util/secscan/api.py index 2cd24fab1..e03a19369 100644 --- a/util/secscan/secscanendpoint.py +++ b/util/secscan/api.py @@ -2,11 +2,13 @@ import features import logging import requests +from app import app +from database import CloseForLongOperation from urlparse import urljoin logger = logging.getLogger(__name__) -class SecurityScanEndpoint(object): +class SecurityScannerAPI(object): """ Helper class for talking to the Security Scan service (Clair). """ def __init__(self, app, config_provider): self.app = app @@ -41,7 +43,7 @@ class SecurityScanEndpoint(object): body = { 'LayersIDs': [layer_id] } - response = self.call_api('vulnerabilities/%s/affected-layers', body, cve_id) + response = self.call('vulnerabilities/%s/affected-layers', body, cve_id) except requests.exceptions.RequestException: logger.exception('Got exception when trying to call Clair endpoint') return False @@ -61,8 +63,11 @@ class SecurityScanEndpoint(object): return True - def call_api(self, relative_url, body=None, *args, **kwargs): - """ Issues an HTTP call to the sec API at the given relative URL. """ + def call(self, relative_url, body=None, *args, **kwargs): + """ Issues an HTTP call to the sec API at the given relative URL. + This function disconnects from the database while awaiting a response + from the API server. + """ security_config = self.security_config api_url = urljoin(security_config['ENDPOINT'], '/' + security_config['API_VERSION']) + '/' url = urljoin(api_url, relative_url % args) @@ -71,9 +76,10 @@ class SecurityScanEndpoint(object): timeout = security_config.get('API_TIMEOUT_SECONDS', 1) logger.debug('Looking up sec information: %s', url) - if body is not None: - return client.post(url, json=body, params=kwargs, timeout=timeout, cert=self.keys, - verify=self.certificate) - else: - return client.get(url, params=kwargs, timeout=timeout, cert=self.keys, - verify=self.certificate) \ No newline at end of file + with CloseForLongOperation(app.config): + if body is not None: + return client.post(url, json=body, params=kwargs, timeout=timeout, cert=self.keys, + verify=self.certificate) + else: + return client.get(url, params=kwargs, timeout=timeout, cert=self.keys, + verify=self.certificate) diff --git a/workers/security_notification_worker.py b/workers/security_notification_worker.py new file mode 100644 index 000000000..1679e80b6 --- /dev/null +++ b/workers/security_notification_worker.py @@ -0,0 +1,93 @@ +import json +import logging +import time + +from collections import defaultdict + +import features + +from app import secscan_notification_queue, secscan_api +from data import model +from data.database import (Image, ImageStorage, ExternalNotificationEvent, + Repository, RepositoryNotification, RepositoryTag) +from endpoints.notificationhelper import spawn_notification +from workers.queueworker import QueueWorker + + +logger = logging.getLogger(__name__) + + +class SecurityNotificationWorker(QueueWorker): + def process_queue_item(self, queueitem): + data = json.loads(queueitem.body) + + cve_id = data['Name'] + vulnerability = data['Content']['Vulnerability'] + priority = vulnerability['Priority'] + + # Lookup the external event for when we have vulnerabilities. + event = ExternalNotificationEvent.get(name='vulnerability_found') + + # For each layer, retrieving the matching tags and join with repository to determine which + # require new notifications. + tag_map = defaultdict(set) + repository_map = {} + + # Find all tags that contain the layer(s) introducing the vulnerability. + content = data['Content'] + layer_ids = content.get('NewIntroducingLayersIDs', content.get('IntroducingLayersIDs', [])) + for layer_id in layer_ids: + (docker_image_id, storage_uuid) = layer_id.split('.', 2) + tags = model.tag.get_matching_tags(docker_image_id, storage_uuid, RepositoryTag, + Repository, Image, ImageStorage) + + # Additionally filter to tags only in repositories that have the event setup. + matching = list(tags + .switch(RepositoryTag) + .join(Repository) + .join(RepositoryNotification) + .where(RepositoryNotification.event == event)) + + check_map = {} + for tag in matching: + # Verify that the tag's root image has the vulnerability. + tag_layer_id = '%s.%s' % (tag.image.docker_image_id, tag.image.storage.uuid) + logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id) + + if not tag_layer_id in check_map: + is_vulerable = secscan_api.check_layer_vulnerable(tag_layer_id, cve_id) + check_map[tag_layer_id] = is_vulerable + + logger.debug('Result of layer %s is vulnerable to %s check: %s', tag_layer_id, cve_id, + check_map[tag_layer_id]) + + if check_map[tag_layer_id]: + # Add the vulnerable tag to the list. + tag_map[tag.repository_id].add(tag.name) + repository_map[tag.repository_id] = tag.repository + + # For each of the tags found, issue a notification. + for repository_id in tag_map: + tags = tag_map[repository_id] + event_data = { + 'tags': list(tags), + 'vulnerability': { + 'id': data['Name'], + 'description': vulnerability['Description'], + 'link': vulnerability['Link'], + 'priority': priority, + }, + } + + spawn_notification(repository_map[repository_id], 'vulnerability_found', event_data) + + +if __name__ == '__main__': + if not features.SECURITY_SCANNER: + logger.debug('Security scanner disabled; skipping SecurityNotificationWorker') + while True: + time.sleep(100000) + + worker = SecurityNotificationWorker(secscan_notification_queue, poll_period_seconds=30, + reservation_seconds=30, retry_after_seconds=30) + worker.start() diff --git a/workers/securityworker.py b/workers/securityworker.py index 1deba196a..26d360754 100644 --- a/workers/securityworker.py +++ b/workers/securityworker.py @@ -12,7 +12,7 @@ from endpoints.notificationhelper import spawn_notification from collections import defaultdict from sys import exc_info from peewee import JOIN_LEFT_OUTER -from app import app, storage, OVERRIDE_CONFIG_DIRECTORY, secscan_endpoint +from app import app, storage, OVERRIDE_CONFIG_DIRECTORY, secscan_api from workers.worker import Worker from data.database import (Image, ImageStorage, ImageStorageLocation, ImageStoragePlacement, db_random_func, UseThenDisconnect, RepositoryTag, Repository, @@ -256,7 +256,7 @@ class SecurityWorker(Worker): # callback code, etc. try: logger.debug('Loading vulnerabilities for layer %s', img['image_id']) - response = secscan_endpoint.call_api('layers/%s/vulnerabilities', request['ID']) + response = secscan_api.call('layers/%s/vulnerabilities', request['ID']) except requests.exceptions.Timeout: logger.debug('Timeout when calling Sec') continue @@ -307,7 +307,7 @@ class SecurityWorker(Worker): if __name__ == '__main__': if not features.SECURITY_SCANNER: - logger.debug('Security scanner disabled; skipping') + logger.debug('Security scanner disabled; skipping SecurityWorker') while True: time.sleep(100000)