Adapt security worker for Clair v1.0 (except notifications)

This commit is contained in:
Quentin Machu 2016-02-17 14:44:49 -05:00
parent c8d825c232
commit e5da33578c
2 changed files with 160 additions and 170 deletions

View file

@ -426,57 +426,16 @@ def ensure_image_locations(*names):
data = [{'name': name} for name in insert_names] data = [{'name': name} for name in insert_names]
ImageStorageLocation.insert_many(data).execute() ImageStorageLocation.insert_many(data).execute()
def get_secscan_candidates(engine_version, batch_size): def get_image_with_storage_and_parent_base():
Parent = Image.alias() Parent = Image.alias()
ParentImageStorage = ImageStorage.alias() ParentImageStorage = ImageStorage.alias()
rimages = []
# Collect the images without parents. return (Image
candidates = list(Image
.select(Image.id)
.join(ImageStorage)
.where(Image.security_indexed_engine < engine_version,
Image.parent >> None,
ImageStorage.uploading == False)
.limit(batch_size*10))
if len(candidates) > 0:
images = (Image
.select(Image, ImageStorage)
.join(ImageStorage)
.where(Image.id << candidates)
.order_by(db_random_func())
.limit(batch_size))
rimages.extend(images)
# Collect the images with analyzed parents.
candidates = list(Image
.select(Image.id)
.join(Parent, on=(Image.parent == Parent.id))
.switch(Image)
.join(ImageStorage)
.where(Image.security_indexed_engine < engine_version,
Parent.security_indexed_engine == engine_version,
ImageStorage.uploading == False)
.limit(batch_size*10))
if len(candidates) > 0:
images = (Image
.select(Image, ImageStorage, Parent, ParentImageStorage) .select(Image, ImageStorage, Parent, ParentImageStorage)
.join(Parent, on=(Image.parent == Parent.id))
.join(ParentImageStorage, on=(ParentImageStorage.id == Parent.storage))
.switch(Image)
.join(ImageStorage) .join(ImageStorage)
.where(Image.id << candidates) .switch(Image)
.order_by(db_random_func()) .join(Parent, JOIN_LEFT_OUTER, on=(Image.parent == Parent.id))
.limit(batch_size)) .join(ParentImageStorage, JOIN_LEFT_OUTER, on=(ParentImageStorage.id == Parent.storage)))
rimages.extend(images)
# Shuffle the images, otherwise the images without parents will always be on the top
random.shuffle(rimages)
return rimages
def set_secscan_status(image, indexed, version): def set_secscan_status(image, indexed, version):
query = (Image query = (Image
@ -487,12 +446,13 @@ def set_secscan_status(image, indexed, version):
ids_to_update = [row.id for row in query] ids_to_update = [row.id for row in query]
if not ids_to_update: if not ids_to_update:
return return False
(Image return (Image
.update(security_indexed=indexed, security_indexed_engine=version) .update(security_indexed=indexed, security_indexed_engine=version)
.where(Image.id << ids_to_update) .where(Image.id << ids_to_update)
.execute()) .where((Image.security_indexed_engine != version) | (Image.security_indexed != indexed))
.execute()) != 0
def find_or_create_derived_storage(source_image, transformation_name, preferred_location): def find_or_create_derived_storage(source_image, transformation_name, preferred_location):
@ -536,5 +496,3 @@ def delete_derived_storage_by_uuid(storage_uuid):
return return
image_storage.delete_instance(recursive=True) image_storage.delete_instance(recursive=True)

View file

@ -5,24 +5,26 @@ import requests
import features import features
import time import time
from endpoints.notificationhelper import spawn_notification from peewee import fn
from collections import defaultdict from collections import defaultdict
from app import app, config_provider, storage, secscan_api from app import app, config_provider, storage, secscan_api
from endpoints.notificationhelper import spawn_notification
from workers.worker import Worker from workers.worker import Worker
from data import model from data import model
from data.database import (Image, UseThenDisconnect, ExternalNotificationEvent)
from data.model.tag import filter_tags_have_repository_event, get_tags_for_image from data.model.tag import filter_tags_have_repository_event, get_tags_for_image
from data.model.image import get_secscan_candidates, set_secscan_status from data.model.image import set_secscan_status, get_image_with_storage_and_parent_base
from data.model.storage import get_storage_locations from data.model.storage import get_storage_locations
from data.database import ExternalNotificationEvent
from util.secscan.api import SecurityConfigValidator from util.secscan.api import SecurityConfigValidator
from util.migrate.allocator import yield_random_entries
logger = logging.getLogger(__name__)
BATCH_SIZE = 50 BATCH_SIZE = 50
INDEXING_INTERVAL = 30 INDEXING_INTERVAL = 30
API_METHOD_INSERT = '/v1/layers' API_METHOD_INSERT = '/v1/layers'
API_METHOD_VERSION = '/v1/versions/engine' API_METHOD_GET_WITH_VULNERABILITIES = '/v1/layers/%s?vulnerabilities'
logger = logging.getLogger(__name__)
class SecurityWorker(Worker): class SecurityWorker(Worker):
def __init__(self): def __init__(self):
@ -40,6 +42,26 @@ class SecurityWorker(Worker):
else: else:
logger.warning('Failed to validate security scan configuration') logger.warning('Failed to validate security scan configuration')
def _new_request(self, image):
""" Create the request body to submit the given image for analysis. """
url = self._get_image_url(image)
if url is None:
return None
request = {
'Layer': {
'Name': '%s.%s' % (image.docker_image_id, image.storage.uuid),
'Path': url,
'Format': 'Docker'
}
}
if image.parent.docker_image_id and image.parent.storage.uuid:
request['Layer']['ParentName'] = '%s.%s' % (image.parent.docker_image_id,
image.parent.storage.uuid)
return request
def _get_image_url(self, image): def _get_image_url(self, image):
""" Gets the download URL for an image and if the storage doesn't exist, """ Gets the download URL for an image and if the storage doesn't exist,
marks the image as unindexed. """ marks the image as unindexed. """
@ -71,147 +93,157 @@ class SecurityWorker(Worker):
return uri return uri
def _new_request(self, image): def _index_images(self):
url = self._get_image_url(image) def batch_query():
if url is None: base_query = get_image_with_storage_and_parent_base()
return None return base_query.where(Image.security_indexed_engine < self._target_version)
request = { min_id = (Image
'ID': '%s.%s' % (image.docker_image_id, image.storage.uuid), .select(fn.Min(Image.id))
'Path': url, .where(Image.security_indexed_engine < self._target_version)
} .scalar())
max_id = Image.select(fn.Max(Image.id)).scalar()
if image.parent is not None: with UseThenDisconnect(app.config):
request['ParentID'] = '%s.%s' % (image.parent.docker_image_id, for candidate, abt in yield_random_entries(batch_query, Image.id, BATCH_SIZE, max_id, min_id):
image.parent.storage.uuid) _, continue_batch = self._analyze_recursively(candidate)
if not continue_batch:
logger.info('Another worker pre-empted us for layer: %s', candidate.id)
abt.set()
return request def _analyze_recursively(self, layer):
""" Analyzes a layer and all its parents """
if layer.parent_id and layer.parent.security_indexed_engine < self._target_version:
# The image has a parent that is not analyzed yet with this engine.
# Get the parent to get it's own parent and recurse.
try:
base_query = get_image_with_storage_and_parent_base()
parent_layer = base_query.where(Image.id == layer.parent_id).get()
except Image.DoesNotExist:
logger.warning("Image %s has Image %s as parent but doesn't exist.", layer.id,
layer.parent_id)
def _analyze_image(self, image): return False, set_secscan_status(layer, False, self._target_version)
""" Analyzes an image by passing it to Clair. """
request = self._new_request(image) cont, _ = self._analyze_recursively(parent_layer)
if not cont:
# The analysis failed for some reason and did not mark the layer as failed,
# thus we should not try to analyze the children of that layer.
# Interrupt the recursive analysis and return as no-one pre-empted us.
return False, True
# Now we know all parents are analyzed.
return self._analyze(layer)
def _analyze(self, layer):
""" Analyzes a single layer.
Return two bools, the first one tells us if we should evaluate its children, the second
one is set to False when another worker pre-empted the candidate's analysis for us. """
# If the parent couldn't be analyzed with the target version or higher, we can't analyze
# this image. Mark it as failed with the current target version.
if (layer.parent_id and not layer.parent.security_indexed and
layer.parent.security_indexed_engine >= self._target_version):
return True, set_secscan_status(layer, False, self._target_version)
request = self._new_request(layer)
if request is None: if request is None:
return False return False, True
# Analyze the image. # Analyze the image.
try: try:
logger.info('Analyzing %s', request['ID']) logger.info('Analyzing layer %s', request['Layer']['Name'])
# Using invalid certificates doesn't return proper errors because of # Using invalid certificates doesn't return proper errors because of
# https://github.com/shazow/urllib3/issues/556 # https://github.com/shazow/urllib3/issues/556
httpResponse = requests.post(self._api + API_METHOD_INSERT, json=request, http_response = requests.post(self._api + API_METHOD_INSERT, json=request,
cert=self._keys, verify=self._cert) cert=self._keys, verify=self._cert)
jsonResponse = httpResponse.json() json_response = http_response.json()
except (requests.exceptions.RequestException, ValueError): except (requests.exceptions.RequestException, ValueError):
logger.exception('An exception occurred when analyzing layer ID %s', request['ID']) logger.exception('An exception occurred when analyzing layer %s', request['Layer']['Name'])
return False return False, True
# Handle any errors from the security scanner. # Handle any errors from the security scanner.
if httpResponse.status_code != 201: if http_response.status_code != 201:
message = jsonResponse.get('Message', '') message = json_response.get('Error').get('Message', '')
if 'OS and/or package manager are not supported' in message or 'could not extract' in message: logger.warning('A warning event occurred when analyzing layer %s (status code %s): %s',
# The current engine could not index this layer or we tried to index a manifest. request['Layer']['Name'], http_response.status_code, message)
logger.warning('A warning event occurred when analyzing layer ID %s : %s',
request['ID'], jsonResponse['Message'])
# Hopefully, there is no version lower than the target one running # 422 means that the layer could not be analyzed:
set_secscan_status(image, False, self._target_version) # - the layer could not be extracted (manifest?)
# - the layer operating system / package manager is unsupported
return False # Set the layer as failed.
if http_response.status_code == 422:
return True, set_secscan_status(layer, False, self._target_version)
else: else:
logger.warning('Got non-201 when analyzing layer ID %s: %s', request['ID'], jsonResponse) return False, True
return False
# Verify that the version matches. # Verify that the version matches.
api_version = jsonResponse['Version'] api_version = json_response['Layer']['IndexedByVersion']
if api_version < self._target_version: if api_version < self._target_version:
logger.warning('An engine runs on version %d but the target version is %d') logger.warning('An engine runs on version %d but the target version is %d', api_version,
self._target_version)
# Mark the image as analyzed. # Mark the image as analyzed.
logger.debug('Layer %s analyzed successfully', image.id) logger.info('Analyzed layer %s successfully', request['Layer']['Name'])
set_secscan_status(image, True, api_version) set_status = set_secscan_status(layer, True, api_version)
return True # If we are the one who've done the job successfully first, get the vulnerabilities and
# send notifications to the repos that have a tag on that layer.
# TODO(josephschorr): Adapt this depending on the new notification format we adopt.
# if set_status:
# # Get the tags of the layer we analyzed.
# repository_map = defaultdict(list)
# event = ExternalNotificationEvent.get(name='vulnerability_found')
# matching = list(filter_tags_have_repository_event(get_tags_for_image(layer.id), event))
#
# for tag in matching:
# repository_map[tag.repository_id].append(tag)
#
# # If there is at least one tag,
# # Lookup the vulnerabilities for the image, now that it is analyzed.
# if len(repository_map) > 0:
# logger.debug('Loading vulnerabilities for layer %s', layer.id)
# sec_data = self._get_vulnerabilities(layer)
#
# if sec_data is not None:
# # Dispatch events for any detected vulnerabilities
# logger.debug('Got vulnerabilities for layer %s: %s', layer.id, sec_data)
#
# for repository_id in repository_map:
# tags = repository_map[repository_id]
#
# for vuln in sec_data['Vulnerabilities']:
# event_data = {
# 'tags': [tag.name for tag in tags],
# 'vulnerability': {
# 'id': vuln['Name'],
# 'description': vuln['Description'],
# 'link': vuln['Link'],
# 'priority': vuln['Priority'],
# },
# }
#
# spawn_notification(tags[0].repository, 'vulnerability_found', event_data)
def _get_vulnerabilities(self, image): return True, set_status
def _get_vulnerabilities(self, layer):
""" Returns the vulnerabilities detected (if any) or None on error. """ """ Returns the vulnerabilities detected (if any) or None on error. """
try: try:
response = secscan_api.call('layers/%s/vulnerabilities', None, response = secscan_api.call(self._api + API_METHOD_GET_WITH_VULNERABILITIES, None,
'%s.%s' % (image.docker_image_id, image.storage.uuid)) '%s.%s' % (layer.docker_image_id, layer.storage.uuid))
logger.debug('Got response %s for vulnerabilities for layer %s', logger.debug('Got response %s for vulnerabilities for layer %s',
response.status_code, image.id) response.status_code, layer.id)
if response.status_code == 404: if response.status_code == 404:
return None return None
except (requests.exceptions.RequestException, ValueError): except (requests.exceptions.RequestException, ValueError):
logger.exception('Failed to get vulnerability response for %s', image.id) logger.exception('Failed to get vulnerability response for %s', layer.id)
return None return None
return response.json() return response.json()
def _index_images(self):
logger.debug('Started indexing')
event = ExternalNotificationEvent.get(name='vulnerability_found')
while True:
# Lookup the images to index.
images = []
logger.debug('Looking up images to index')
images = get_secscan_candidates(self._target_version, BATCH_SIZE)
if not images:
logger.debug('No more images left to analyze')
return
logger.debug('Found %d images to index', len(images))
for image in images:
# If we couldn't analyze the parent, we can't analyze this image.
if (image.parent and not image.parent.security_indexed and
image.parent.security_indexed_engine >= self._target_version):
set_secscan_status(image, False, self._target_version)
continue
# Analyze the image.
analyzed = self._analyze_image(image)
if not analyzed:
continue
# Get the tags of the image we analyzed
matching = list(filter_tags_have_repository_event(get_tags_for_image(image.id), event))
repository_map = defaultdict(list)
for tag in matching:
repository_map[tag.repository_id].append(tag)
# If there is at least one tag,
# Lookup the vulnerabilities for the image, now that it is analyzed.
if len(repository_map) > 0:
logger.debug('Loading vulnerabilities for layer %s', image.id)
sec_data = self._get_vulnerabilities(image)
if sec_data is None:
continue
if not sec_data.get('Vulnerabilities'):
continue
# Dispatch events for any detected vulnerabilities
logger.debug('Got vulnerabilities for layer %s: %s', image.id, sec_data)
for repository_id in repository_map:
tags = repository_map[repository_id]
for vuln in sec_data['Vulnerabilities']:
event_data = {
'tags': [tag.name for tag in tags],
'vulnerability': {
'id': vuln['ID'],
'description': vuln['Description'],
'link': vuln['Link'],
'priority': vuln['Priority'],
},
}
spawn_notification(tags[0].repository, 'vulnerability_found', event_data)
if __name__ == '__main__': if __name__ == '__main__':
if not features.SECURITY_SCANNER: if not features.SECURITY_SCANNER: