405eca074c
Changes the security scanner code to raise exceptions now for non-successful operations. One of the new exceptions raised is MissingParentLayerException, which, when raised, will cause the security worker to perform a full rescan of all parent images for the current layer, before trying once more to scan the current layer. This should allow the system to be "self-healing" in the case where the security scanner engine somehow loses or corrupts a parent layer.
278 lines
9.5 KiB
Python
278 lines
9.5 KiB
Python
import json
|
|
import copy
|
|
import uuid
|
|
import urlparse
|
|
|
|
from contextlib import contextmanager
|
|
from httmock import urlmatch, HTTMock, all_requests
|
|
from util.secscan.api import UNKNOWN_PARENT_LAYER_ERROR_MSG
|
|
|
|
@contextmanager
|
|
def fake_security_scanner(hostname='fakesecurityscanner'):
|
|
""" Context manager which yields a fake security scanner. All requests made to the given
|
|
hostname (default: fakesecurityscanner) will be handled by the fake.
|
|
"""
|
|
scanner = FakeSecurityScanner(hostname)
|
|
with HTTMock(*(scanner.get_endpoints())):
|
|
yield scanner
|
|
|
|
|
|
class FakeSecurityScanner(object):
|
|
""" Implements a fake security scanner (with somewhat real responses) for testing API calls and
|
|
responses.
|
|
"""
|
|
def __init__(self, hostname, index_version=1):
|
|
self.hostname = hostname
|
|
self.index_version = index_version
|
|
self.layers = {}
|
|
self.notifications = {}
|
|
self.layer_vulns = {}
|
|
|
|
self.fail_layer_id = None
|
|
self.internal_error_layer_id = None
|
|
self.error_layer_id = None
|
|
|
|
def set_fail_layer_id(self, fail_layer_id):
|
|
""" Sets a layer ID that, if encountered when the analyze call is made, causes a 422
|
|
to be raised.
|
|
"""
|
|
self.fail_layer_id = fail_layer_id
|
|
|
|
def set_internal_error_layer_id(self, internal_error_layer_id):
|
|
""" Sets a layer ID that, if encountered when the analyze call is made, causes a 500
|
|
to be raised.
|
|
"""
|
|
self.internal_error_layer_id = internal_error_layer_id
|
|
|
|
def set_error_layer_id(self, error_layer_id):
|
|
""" Sets a layer ID that, if encountered when the analyze call is made, causes a 400
|
|
to be raised.
|
|
"""
|
|
self.error_layer_id = error_layer_id
|
|
|
|
def has_layer(self, layer_id):
|
|
""" Returns true if the layer with the given ID has been analyzed. """
|
|
return layer_id in self.layers
|
|
|
|
def has_notification(self, notification_id):
|
|
""" Returns whether a notification with the given ID is found in the scanner. """
|
|
return notification_id in self.notifications
|
|
|
|
def add_notification(self, old_layer_ids, new_layer_ids, old_vuln, new_vuln):
|
|
""" Adds a new notification over the given sets of layer IDs and vulnerability information,
|
|
returning the structural data of the notification created.
|
|
"""
|
|
notification_id = str(uuid.uuid4())
|
|
self.notifications[notification_id] = dict(old_layer_ids=old_layer_ids,
|
|
new_layer_ids=new_layer_ids,
|
|
old_vuln=old_vuln,
|
|
new_vuln=new_vuln)
|
|
|
|
return self._get_notification_data(notification_id, 0, 100)
|
|
|
|
def layer_id(self, layer):
|
|
""" Returns the Quay Security Scanner layer ID for the given layer (Image row). """
|
|
return '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
|
|
|
def add_layer(self, layer_id):
|
|
""" Adds a layer to the security scanner, with no features or vulnerabilities. """
|
|
self.layers[layer_id] = {
|
|
"Name": layer_id,
|
|
"Format": "Docker",
|
|
"IndexedByVersion": self.index_version,
|
|
}
|
|
|
|
def remove_layer(self, layer_id):
|
|
""" Removes a layer from the security scanner. """
|
|
self.layers.pop(layer_id, None)
|
|
|
|
def set_vulns(self, layer_id, vulns):
|
|
""" Sets the vulnerabilities for the layer with the given ID to those given. """
|
|
self.layer_vulns[layer_id] = vulns
|
|
|
|
# Since this call may occur before the layer is "anaylzed", we only add the data
|
|
# to the layer itself if present.
|
|
if self.layers.get(layer_id):
|
|
layer = self.layers[layer_id]
|
|
layer['Features'] = layer.get('Features', [])
|
|
layer['Features'].append({
|
|
"Name": 'somefeature',
|
|
"Namespace": 'somenamespace',
|
|
"Version": 'someversion',
|
|
"Vulnerabilities": self.layer_vulns[layer_id],
|
|
})
|
|
|
|
def _get_notification_data(self, notification_id, page, limit):
|
|
""" Returns the structural data for the notification with the given ID, paginated using
|
|
the given page and limit. """
|
|
notification = self.notifications[notification_id]
|
|
notification_data = {
|
|
"Name": notification_id,
|
|
"Created": "1456247389",
|
|
"Notified": "1456246708",
|
|
"Limit": limit,
|
|
}
|
|
|
|
start_index = (page*limit)
|
|
end_index = ((page+1)*limit)
|
|
has_additional_page = False
|
|
|
|
if notification.get('old_vuln'):
|
|
old_layer_ids = notification['old_layer_ids']
|
|
old_layer_ids = old_layer_ids[start_index:end_index]
|
|
has_additional_page = has_additional_page or bool(len(old_layer_ids[end_index-1:]))
|
|
|
|
notification_data['Old'] = {
|
|
'Vulnerability': notification['old_vuln'],
|
|
'LayersIntroducingVulnerability': old_layer_ids,
|
|
}
|
|
|
|
if notification.get('new_vuln'):
|
|
new_layer_ids = notification['new_layer_ids']
|
|
new_layer_ids = new_layer_ids[start_index:end_index]
|
|
has_additional_page = has_additional_page or bool(len(new_layer_ids[end_index-1:]))
|
|
|
|
notification_data['New'] = {
|
|
'Vulnerability': notification['new_vuln'],
|
|
'LayersIntroducingVulnerability': new_layer_ids,
|
|
}
|
|
|
|
if has_additional_page:
|
|
notification_data['NextPage'] = str(page+1)
|
|
|
|
return notification_data
|
|
|
|
def get_endpoints(self):
|
|
""" Returns the HTTMock endpoint definitions for the fake security scanner. """
|
|
|
|
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers/(.+)', method='GET')
|
|
def get_layer_mock(url, request):
|
|
layer_id = url.path[len('/v1/layers/'):]
|
|
if layer_id == self.internal_error_layer_id:
|
|
return {
|
|
'status_code': 500,
|
|
'content': json.dumps({'Error': {'Message': 'Internal server error'}}),
|
|
}
|
|
|
|
if not layer_id in self.layers:
|
|
return {
|
|
'status_code': 404,
|
|
'content': json.dumps({'Error': {'Message': 'Unknown layer'}}),
|
|
}
|
|
|
|
layer_data = copy.deepcopy(self.layers[layer_id])
|
|
|
|
has_vulns = request.url.find('vulnerabilities') > 0
|
|
has_features = request.url.find('features') > 0
|
|
if not has_vulns and not has_features:
|
|
layer_data.pop('Features', None)
|
|
|
|
return {
|
|
'status_code': 200,
|
|
'content': json.dumps({'Layer': layer_data}),
|
|
}
|
|
|
|
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers', method='POST')
|
|
def post_layer_mock(_, request):
|
|
body_data = json.loads(request.body)
|
|
if not 'Layer' in body_data:
|
|
return {'status_code': 400, 'content': 'Missing body'}
|
|
|
|
layer = body_data['Layer']
|
|
if not 'Path' in layer:
|
|
return {'status_code': 400, 'content': 'Missing Path'}
|
|
|
|
if not 'Name' in layer:
|
|
return {'status_code': 400, 'content': 'Missing Name'}
|
|
|
|
if not 'Format' in layer:
|
|
return {'status_code': 400, 'content': 'Missing Format'}
|
|
|
|
if layer['Name'] == self.internal_error_layer_id:
|
|
return {
|
|
'status_code': 500,
|
|
'content': json.dumps({'Error': {'Message': 'Internal server error'}}),
|
|
}
|
|
|
|
if layer['Name'] == self.fail_layer_id:
|
|
return {
|
|
'status_code': 422,
|
|
'content': json.dumps({'Error': {'Message': 'Cannot analyze'}}),
|
|
}
|
|
|
|
if layer['Name'] == self.error_layer_id:
|
|
return {
|
|
'status_code': 400,
|
|
'content': json.dumps({'Error': {'Message': 'Some sort of error'}}),
|
|
}
|
|
|
|
parent_id = layer.get('ParentName', None)
|
|
parent_layer = None
|
|
|
|
if parent_id is not None:
|
|
parent_layer = self.layers.get(parent_id, None)
|
|
if parent_layer is None:
|
|
return {
|
|
'status_code': 400,
|
|
'content': json.dumps({'Error': {'Message': UNKNOWN_PARENT_LAYER_ERROR_MSG}}),
|
|
}
|
|
|
|
self.add_layer(layer['Name'])
|
|
if parent_layer is not None:
|
|
self.layers[layer['Name']]['ParentName'] = parent_id
|
|
|
|
# If vulnerabilities have already been registered with this layer, call set_vulns to make sure
|
|
# their data is added to the layer's data.
|
|
if self.layer_vulns.get(layer['Name']):
|
|
self.set_vulns(layer['Name'], self.layer_vulns[layer['Name']])
|
|
|
|
return {
|
|
'status_code': 201,
|
|
'content': json.dumps({
|
|
"Layer": self.layers[layer['Name']],
|
|
}),
|
|
}
|
|
|
|
|
|
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/notifications/(.+)$', method='DELETE')
|
|
def delete_notification(url, _):
|
|
notification_id = url.path[len('/v1/notifications/'):]
|
|
if notification_id not in self.notifications:
|
|
return {
|
|
'status_code': 404,
|
|
'content': json.dumps({'Error': {'Message': 'Unknown notification'}}),
|
|
}
|
|
|
|
self.notifications.pop(notification_id)
|
|
return {
|
|
'status_code': 204,
|
|
'content': '',
|
|
}
|
|
|
|
|
|
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/notifications/(.+)$', method='GET')
|
|
def get_notification(url, _):
|
|
notification_id = url.path[len('/v1/notifications/'):]
|
|
if notification_id not in self.notifications:
|
|
return {
|
|
'status_code': 404,
|
|
'content': json.dumps({'Error': {'Message': 'Unknown notification'}}),
|
|
}
|
|
|
|
query_params = urlparse.parse_qs(url.query)
|
|
limit = int(query_params.get('limit', [2])[0])
|
|
page = int(query_params.get('page', [0])[0])
|
|
|
|
notification_data = self._get_notification_data(notification_id, page, limit)
|
|
response = {'Notification': notification_data}
|
|
return {
|
|
'status_code': 200,
|
|
'content': json.dumps(response),
|
|
}
|
|
|
|
@all_requests
|
|
def response_content(url, _):
|
|
raise Exception('Unknown endpoint: ' + str(url))
|
|
|
|
return [get_layer_mock, post_layer_mock, get_notification, delete_notification,
|
|
response_content]
|