Merge pull request #842 from jzelinskie/goodhumor

add queue and worker for clair notifications
This commit is contained in:
Jimmy Zelinskie 2015-11-10 15:23:41 -05:00
commit 5926501e08
12 changed files with 136 additions and 87 deletions

6
app.py
View file

@ -35,7 +35,7 @@ from util.saas.metricqueue import MetricQueue
from util.config.provider import get_config_provider from util.config.provider import get_config_provider
from util.config.configutil import generate_secret_key from util.config.configutil import generate_secret_key
from util.config.superusermanager import SuperUserManager from util.config.superusermanager import SuperUserManager
from util.secscan.secscanendpoint import SecurityScanEndpoint from util.secscan.api import SecurityScannerAPI
OVERRIDE_CONFIG_DIRECTORY = 'conf/stack/' OVERRIDE_CONFIG_DIRECTORY = 'conf/stack/'
OVERRIDE_CONFIG_YAML_FILENAME = 'conf/stack/config.yaml' OVERRIDE_CONFIG_YAML_FILENAME = 'conf/stack/config.yaml'
@ -148,7 +148,9 @@ image_replication_queue = WorkQueue(app.config['REPLICATION_QUEUE_NAME'], tf)
dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf, dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf,
reporter=MetricQueueReporter(metric_queue)) reporter=MetricQueueReporter(metric_queue))
notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf) notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf)
secscan_endpoint = SecurityScanEndpoint(app, config_provider) secscan_notification_queue = WorkQueue(app.config['SECSCAN_NOTIFICATION_QUEUE_NAME'], tf)
secscan_api = SecurityScannerAPI(app, config_provider)
database.configure(app.config) database.configure(app.config)
model.config.app_config = app.config model.config.app_config = app.config

View file

@ -0,0 +1,2 @@
#!/bin/sh
exec logger -i -t securitynotificationworker

View file

@ -0,0 +1,8 @@
#! /bin/bash
echo 'Starting security scanner notification worker'
cd /
venv/bin/python -m workers.security_notification_worker 2>&1
echo 'Security scanner notification worker exited'

0
conf/init/service/securityworker/run Normal file → Executable file
View file

View file

@ -131,6 +131,7 @@ class DefaultConfig(object):
DIFFS_QUEUE_NAME = 'imagediff' DIFFS_QUEUE_NAME = 'imagediff'
DOCKERFILE_BUILD_QUEUE_NAME = 'dockerfilebuild' DOCKERFILE_BUILD_QUEUE_NAME = 'dockerfilebuild'
REPLICATION_QUEUE_NAME = 'imagestoragereplication' REPLICATION_QUEUE_NAME = 'imagestoragereplication'
SECSCAN_NOTIFICATION_QUEUE_NAME = 'secscan_notification'
# Super user config. Note: This MUST BE an empty list for the default config. # Super user config. Note: This MUST BE an empty list for the default config.
SUPER_USERS = [] SUPER_USERS = []

View file

@ -5,7 +5,7 @@ import features
import json import json
import requests import requests
from app import secscan_endpoint from app import secscan_api
from data import model from data import model
from endpoints.api import (require_repo_read, NotFound, DownstreamIssue, path_param, from endpoints.api import (require_repo_read, NotFound, DownstreamIssue, path_param,
RepositoryParamResource, resource, nickname, show_if, parse_args, RepositoryParamResource, resource, nickname, show_if, parse_args,
@ -18,7 +18,7 @@ logger = logging.getLogger(__name__)
def _call_security_api(relative_url, *args, **kwargs): def _call_security_api(relative_url, *args, **kwargs):
""" Issues an HTTP call to the sec API at the given relative URL. """ """ Issues an HTTP call to the sec API at the given relative URL. """
try: try:
response = secscan_endpoint.call_api(relative_url, *args, **kwargs) response = secscan_api.call(relative_url, *args, **kwargs)
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
raise DownstreamIssue(payload=dict(message='API call timed out')) raise DownstreamIssue(payload=dict(message='API call timed out'))
except requests.exceptions.ConnectionError: except requests.exceptions.ConnectionError:

View file

@ -5,7 +5,7 @@ import features
import json import json
import requests import requests
from app import secscan_endpoint from app import secscan_api
from data import model from data import model
from endpoints.api import (require_repo_read, NotFound, DownstreamIssue, path_param, from endpoints.api import (require_repo_read, NotFound, DownstreamIssue, path_param,
RepositoryParamResource, resource, nickname, show_if, parse_args, RepositoryParamResource, resource, nickname, show_if, parse_args,
@ -18,7 +18,7 @@ logger = logging.getLogger(__name__)
def _call_security_api(relative_url, *args, **kwargs): def _call_security_api(relative_url, *args, **kwargs):
""" Issues an HTTP call to the sec API at the given relative URL. """ """ Issues an HTTP call to the sec API at the given relative URL. """
try: try:
response = secscan_endpoint.call_api(relative_url, body=None, *args, **kwargs) response = secscan_api.call(relative_url, body=None, *args, **kwargs)
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
raise DownstreamIssue(payload=dict(message='API call timed out')) raise DownstreamIssue(payload=dict(message='API call timed out'))
except requests.exceptions.ConnectionError: except requests.exceptions.ConnectionError:

View file

@ -1,14 +1,11 @@
import logging import logging
import json
import features import features
from app import secscan_endpoint from app import secscan_notification_queue
from flask import request, make_response, Blueprint from flask import request, make_response, Blueprint
from data import model
from data.database import (RepositoryNotification, Repository, ExternalNotificationEvent,
RepositoryTag, Image, ImageStorage)
from endpoints.common import route_show_if from endpoints.common import route_show_if
from endpoints.notificationhelper import spawn_notification
from collections import defaultdict
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
secscan = Blueprint('secscan', __name__) secscan = Blueprint('secscan', __name__)
@ -19,70 +16,10 @@ def secscan_notification():
data = request.get_json() data = request.get_json()
logger.debug('Got notification from Clair: %s', data) logger.debug('Got notification from Clair: %s', data)
# Find all tags that contain the layer(s) introducing the vulnerability.
content = data['Content'] content = data['Content']
layer_ids = content.get('NewIntroducingLayersIDs', content.get('IntroducingLayersIDs', [])) layer_ids = content.get('NewIntroducingLayersIDs', content.get('IntroducingLayersIDs', []))
if not layer_ids: if not layer_ids:
return make_response('Okay') return make_response('Okay')
# TODO(jzelinkskie): Write a queueitem for these layer ids, and do the rest of this secscan_notification_queue.put(data['Name'], json.dumps(data))
# in a worker.
cve_id = data['Name']
vulnerability = data['Content']['Vulnerability']
priority = vulnerability['Priority']
# Lookup the external event for when we have vulnerabilities.
event = ExternalNotificationEvent.get(name='vulnerability_found')
# For each layer, retrieving the matching tags and join with repository to determine which
# require new notifications.
tag_map = defaultdict(set)
repository_map = {}
for layer_id in layer_ids:
(docker_image_id, storage_uuid) = layer_id.split('.', 2)
tags = model.tag.get_matching_tags(docker_image_id, storage_uuid, RepositoryTag,
Repository, Image, ImageStorage)
# Additionally filter to tags only in repositories that have the event setup.
matching = (tags.switch(RepositoryTag)
.join(Repository)
.join(RepositoryNotification)
.where(RepositoryNotification.event == event))
check_map = {}
for tag in matching:
# Verify that the tag's root image has the vulnerability.
tag_layer_id = '%s.%s' % (tag.image.docker_image_id, tag.image.storage.uuid)
logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id)
if not tag_layer_id in check_map:
is_vulerable = secscan_endpoint.check_layer_vulnerable(tag_layer_id, cve_id)
check_map[tag_layer_id] = is_vulerable
logger.debug('Result of layer %s is vulnerable to %s check: %s', tag_layer_id, cve_id,
check_map[tag_layer_id])
if check_map[tag_layer_id]:
# Add the vulnerable tag to the list.
tag_map[tag.repository_id].add(tag.name)
repository_map[tag.repository_id] = tag.repository
# For each of the tags found, issue a notification.
for repository_id in tag_map:
tags = tag_map[repository_id]
event_data = {
'tags': list(tags),
'vulnerability': {
'id': data['Name'],
'description': vulnerability['Description'],
'link': vulnerability['Link'],
'priority': priority,
},
}
# TODO: only add this notification if the repository's event(s) defined meet the priority
# minimum.
spawn_notification(repository_map[repository_id], 'vulnerability_found', event_data)
return make_response('Okay') return make_response('Okay')

View file

@ -9,7 +9,7 @@
# --enable=similarities". If you want to run only the classes checker, but have # --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use"--disable=all --enable=classes # no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W" # --disable=W"
disable=missing-docstring disable=missing-docstring,invalid-name,too-many-locals
[TYPECHECK] [TYPECHECK]

View file

@ -2,11 +2,13 @@ import features
import logging import logging
import requests import requests
from app import app
from database import CloseForLongOperation
from urlparse import urljoin from urlparse import urljoin
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class SecurityScanEndpoint(object): class SecurityScannerAPI(object):
""" Helper class for talking to the Security Scan service (Clair). """ """ Helper class for talking to the Security Scan service (Clair). """
def __init__(self, app, config_provider): def __init__(self, app, config_provider):
self.app = app self.app = app
@ -41,7 +43,7 @@ class SecurityScanEndpoint(object):
body = { body = {
'LayersIDs': [layer_id] 'LayersIDs': [layer_id]
} }
response = self.call_api('vulnerabilities/%s/affected-layers', body, cve_id) response = self.call('vulnerabilities/%s/affected-layers', body, cve_id)
except requests.exceptions.RequestException: except requests.exceptions.RequestException:
logger.exception('Got exception when trying to call Clair endpoint') logger.exception('Got exception when trying to call Clair endpoint')
return False return False
@ -61,8 +63,11 @@ class SecurityScanEndpoint(object):
return True return True
def call_api(self, relative_url, body=None, *args, **kwargs): def call(self, relative_url, body=None, *args, **kwargs):
""" Issues an HTTP call to the sec API at the given relative URL. """ """ Issues an HTTP call to the sec API at the given relative URL.
This function disconnects from the database while awaiting a response
from the API server.
"""
security_config = self.security_config security_config = self.security_config
api_url = urljoin(security_config['ENDPOINT'], '/' + security_config['API_VERSION']) + '/' api_url = urljoin(security_config['ENDPOINT'], '/' + security_config['API_VERSION']) + '/'
url = urljoin(api_url, relative_url % args) url = urljoin(api_url, relative_url % args)
@ -71,9 +76,10 @@ class SecurityScanEndpoint(object):
timeout = security_config.get('API_TIMEOUT_SECONDS', 1) timeout = security_config.get('API_TIMEOUT_SECONDS', 1)
logger.debug('Looking up sec information: %s', url) logger.debug('Looking up sec information: %s', url)
if body is not None: with CloseForLongOperation(app.config):
return client.post(url, json=body, params=kwargs, timeout=timeout, cert=self.keys, if body is not None:
verify=self.certificate) return client.post(url, json=body, params=kwargs, timeout=timeout, cert=self.keys,
else: verify=self.certificate)
return client.get(url, params=kwargs, timeout=timeout, cert=self.keys, else:
verify=self.certificate) return client.get(url, params=kwargs, timeout=timeout, cert=self.keys,
verify=self.certificate)

View file

@ -0,0 +1,93 @@
import json
import logging
import time
from collections import defaultdict
import features
from app import secscan_notification_queue, secscan_api
from data import model
from data.database import (Image, ImageStorage, ExternalNotificationEvent,
Repository, RepositoryNotification, RepositoryTag)
from endpoints.notificationhelper import spawn_notification
from workers.queueworker import QueueWorker
logger = logging.getLogger(__name__)
class SecurityNotificationWorker(QueueWorker):
def process_queue_item(self, queueitem):
data = json.loads(queueitem.body)
cve_id = data['Name']
vulnerability = data['Content']['Vulnerability']
priority = vulnerability['Priority']
# Lookup the external event for when we have vulnerabilities.
event = ExternalNotificationEvent.get(name='vulnerability_found')
# For each layer, retrieving the matching tags and join with repository to determine which
# require new notifications.
tag_map = defaultdict(set)
repository_map = {}
# Find all tags that contain the layer(s) introducing the vulnerability.
content = data['Content']
layer_ids = content.get('NewIntroducingLayersIDs', content.get('IntroducingLayersIDs', []))
for layer_id in layer_ids:
(docker_image_id, storage_uuid) = layer_id.split('.', 2)
tags = model.tag.get_matching_tags(docker_image_id, storage_uuid, RepositoryTag,
Repository, Image, ImageStorage)
# Additionally filter to tags only in repositories that have the event setup.
matching = list(tags
.switch(RepositoryTag)
.join(Repository)
.join(RepositoryNotification)
.where(RepositoryNotification.event == event))
check_map = {}
for tag in matching:
# Verify that the tag's root image has the vulnerability.
tag_layer_id = '%s.%s' % (tag.image.docker_image_id, tag.image.storage.uuid)
logger.debug('Checking if layer %s is vulnerable to %s', tag_layer_id, cve_id)
if not tag_layer_id in check_map:
is_vulerable = secscan_api.check_layer_vulnerable(tag_layer_id, cve_id)
check_map[tag_layer_id] = is_vulerable
logger.debug('Result of layer %s is vulnerable to %s check: %s', tag_layer_id, cve_id,
check_map[tag_layer_id])
if check_map[tag_layer_id]:
# Add the vulnerable tag to the list.
tag_map[tag.repository_id].add(tag.name)
repository_map[tag.repository_id] = tag.repository
# For each of the tags found, issue a notification.
for repository_id in tag_map:
tags = tag_map[repository_id]
event_data = {
'tags': list(tags),
'vulnerability': {
'id': data['Name'],
'description': vulnerability['Description'],
'link': vulnerability['Link'],
'priority': priority,
},
}
spawn_notification(repository_map[repository_id], 'vulnerability_found', event_data)
if __name__ == '__main__':
if not features.SECURITY_SCANNER:
logger.debug('Security scanner disabled; skipping SecurityNotificationWorker')
while True:
time.sleep(100000)
worker = SecurityNotificationWorker(secscan_notification_queue, poll_period_seconds=30,
reservation_seconds=30, retry_after_seconds=30)
worker.start()

View file

@ -12,7 +12,7 @@ from endpoints.notificationhelper import spawn_notification
from collections import defaultdict from collections import defaultdict
from sys import exc_info from sys import exc_info
from peewee import JOIN_LEFT_OUTER from peewee import JOIN_LEFT_OUTER
from app import app, storage, OVERRIDE_CONFIG_DIRECTORY, secscan_endpoint from app import app, storage, OVERRIDE_CONFIG_DIRECTORY, secscan_api
from workers.worker import Worker from workers.worker import Worker
from data.database import (Image, ImageStorage, ImageStorageLocation, ImageStoragePlacement, from data.database import (Image, ImageStorage, ImageStorageLocation, ImageStoragePlacement,
db_random_func, UseThenDisconnect, RepositoryTag, Repository, db_random_func, UseThenDisconnect, RepositoryTag, Repository,
@ -256,7 +256,7 @@ class SecurityWorker(Worker):
# callback code, etc. # callback code, etc.
try: try:
logger.debug('Loading vulnerabilities for layer %s', img['image_id']) logger.debug('Loading vulnerabilities for layer %s', img['image_id'])
response = secscan_endpoint.call_api('layers/%s/vulnerabilities', request['ID']) response = secscan_api.call('layers/%s/vulnerabilities', request['ID'])
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
logger.debug('Timeout when calling Sec') logger.debug('Timeout when calling Sec')
continue continue
@ -307,7 +307,7 @@ class SecurityWorker(Worker):
if __name__ == '__main__': if __name__ == '__main__':
if not features.SECURITY_SCANNER: if not features.SECURITY_SCANNER:
logger.debug('Security scanner disabled; skipping') logger.debug('Security scanner disabled; skipping SecurityWorker')
while True: while True:
time.sleep(100000) time.sleep(100000)