Merge pull request #2238 from coreos-inc/fake-clair
Add a fake security scanner class for easier testing
This commit is contained in:
commit
9fa16679f8
6 changed files with 522 additions and 383 deletions
|
@ -159,11 +159,10 @@ class SecurityScannerAPI(object):
|
|||
except requests.exceptions.ConnectionError:
|
||||
logger.exception('Connection error when trying to post layer data response for %s', layer.id)
|
||||
return None, True
|
||||
except (requests.exceptions.RequestException, ValueError):
|
||||
except (requests.exceptions.RequestException, ValueError) as re:
|
||||
logger.exception('Failed to post layer data response for %s', layer.id)
|
||||
return None, False
|
||||
|
||||
|
||||
# Handle any errors from the security scanner.
|
||||
if response.status_code != 201:
|
||||
message = json_response.get('Error').get('Message', '')
|
||||
|
|
264
util/secscan/fake.py
Normal file
264
util/secscan/fake.py
Normal file
|
@ -0,0 +1,264 @@
|
|||
import json
|
||||
import copy
|
||||
import uuid
|
||||
import urlparse
|
||||
|
||||
from contextlib import contextmanager
|
||||
from httmock import urlmatch, HTTMock, all_requests
|
||||
|
||||
@contextmanager
|
||||
def fake_security_scanner(hostname='fakesecurityscanner'):
|
||||
""" Context manager which yields a fake security scanner. All requests made to the given
|
||||
hostname (default: fakesecurityscanner) will be handled by the fake.
|
||||
"""
|
||||
scanner = FakeSecurityScanner(hostname)
|
||||
with HTTMock(*(scanner.get_endpoints())):
|
||||
yield scanner
|
||||
|
||||
|
||||
class FakeSecurityScanner(object):
|
||||
""" Implements a fake security scanner (with somewhat real responses) for testing API calls and
|
||||
responses.
|
||||
"""
|
||||
def __init__(self, hostname, index_version=1):
|
||||
self.hostname = hostname
|
||||
self.index_version = index_version
|
||||
self.layers = {}
|
||||
self.notifications = {}
|
||||
self.layer_vulns = {}
|
||||
|
||||
self.fail_layer_id = None
|
||||
self.internal_error_layer_id = None
|
||||
|
||||
def set_fail_layer_id(self, fail_layer_id):
|
||||
""" Sets a layer ID that, if encountered when the analyze call is made, causes a 422
|
||||
to be raised.
|
||||
"""
|
||||
self.fail_layer_id = fail_layer_id
|
||||
|
||||
def set_internal_error_layer_id(self, internal_error_layer_id):
|
||||
""" Sets a layer ID that, if encountered when the analyze call is made, causes a 500
|
||||
to be raised.
|
||||
"""
|
||||
self.internal_error_layer_id = internal_error_layer_id
|
||||
|
||||
def has_layer(self, layer_id):
|
||||
""" Returns true if the layer with the given ID has been analyzed. """
|
||||
return layer_id in self.layers
|
||||
|
||||
def has_notification(self, notification_id):
|
||||
""" Returns whether a notification with the given ID is found in the scanner. """
|
||||
return notification_id in self.notifications
|
||||
|
||||
def add_notification(self, old_layer_ids, new_layer_ids, old_vuln, new_vuln):
|
||||
""" Adds a new notification over the given sets of layer IDs and vulnerability information,
|
||||
returning the structural data of the notification created.
|
||||
"""
|
||||
notification_id = str(uuid.uuid4())
|
||||
self.notifications[notification_id] = dict(old_layer_ids=old_layer_ids,
|
||||
new_layer_ids=new_layer_ids,
|
||||
old_vuln=old_vuln,
|
||||
new_vuln=new_vuln)
|
||||
|
||||
return self._get_notification_data(notification_id, 0, 100)
|
||||
|
||||
def layer_id(self, layer):
|
||||
""" Returns the Quay Security Scanner layer ID for the given layer (Image row). """
|
||||
return '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
|
||||
|
||||
def add_layer(self, layer_id):
|
||||
""" Adds a layer to the security scanner, with no features or vulnerabilities. """
|
||||
self.layers[layer_id] = {
|
||||
"Name": layer_id,
|
||||
"Format": "Docker",
|
||||
"IndexedByVersion": self.index_version,
|
||||
}
|
||||
|
||||
def remove_layer(self, layer_id):
|
||||
""" Removes a layer from the security scanner. """
|
||||
self.layers.pop(layer_id, None)
|
||||
|
||||
def set_vulns(self, layer_id, vulns):
|
||||
""" Sets the vulnerabilities for the layer with the given ID to those given. """
|
||||
self.layer_vulns[layer_id] = vulns
|
||||
|
||||
# Since this call may occur before the layer is "anaylzed", we only add the data
|
||||
# to the layer itself if present.
|
||||
if self.layers.get(layer_id):
|
||||
layer = self.layers[layer_id]
|
||||
layer['Features'] = layer.get('Features', [])
|
||||
layer['Features'].append({
|
||||
"Name": 'somefeature',
|
||||
"Namespace": 'somenamespace',
|
||||
"Version": 'someversion',
|
||||
"Vulnerabilities": self.layer_vulns[layer_id],
|
||||
})
|
||||
|
||||
def _get_notification_data(self, notification_id, page, limit):
|
||||
""" Returns the structural data for the notification with the given ID, paginated using
|
||||
the given page and limit. """
|
||||
notification = self.notifications[notification_id]
|
||||
notification_data = {
|
||||
"Name": notification_id,
|
||||
"Created": "1456247389",
|
||||
"Notified": "1456246708",
|
||||
"Limit": limit,
|
||||
}
|
||||
|
||||
start_index = (page*limit)
|
||||
end_index = ((page+1)*limit)
|
||||
has_additional_page = False
|
||||
|
||||
if notification.get('old_vuln'):
|
||||
old_layer_ids = notification['old_layer_ids']
|
||||
old_layer_ids = old_layer_ids[start_index:end_index]
|
||||
has_additional_page = has_additional_page or bool(len(old_layer_ids[end_index-1:]))
|
||||
|
||||
notification_data['Old'] = {
|
||||
'Vulnerability': notification['old_vuln'],
|
||||
'LayersIntroducingVulnerability': old_layer_ids,
|
||||
}
|
||||
|
||||
if notification.get('new_vuln'):
|
||||
new_layer_ids = notification['new_layer_ids']
|
||||
new_layer_ids = new_layer_ids[start_index:end_index]
|
||||
has_additional_page = has_additional_page or bool(len(new_layer_ids[end_index-1:]))
|
||||
|
||||
notification_data['New'] = {
|
||||
'Vulnerability': notification['new_vuln'],
|
||||
'LayersIntroducingVulnerability': new_layer_ids,
|
||||
}
|
||||
|
||||
if has_additional_page:
|
||||
notification_data['NextPage'] = str(page+1)
|
||||
|
||||
return notification_data
|
||||
|
||||
def get_endpoints(self):
|
||||
""" Returns the HTTMock endpoint definitions for the fake security scanner. """
|
||||
|
||||
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers/(.+)', method='GET')
|
||||
def get_layer_mock(url, request):
|
||||
layer_id = url.path[len('/v1/layers/'):]
|
||||
if layer_id == self.internal_error_layer_id:
|
||||
return {
|
||||
'status_code': 500,
|
||||
'content': json.dumps({'Error': {'Message': 'Internal server error'}}),
|
||||
}
|
||||
|
||||
if not layer_id in self.layers:
|
||||
return {
|
||||
'status_code': 404,
|
||||
'content': json.dumps({'Error': {'Message': 'Unknown layer'}}),
|
||||
}
|
||||
|
||||
layer_data = copy.deepcopy(self.layers[layer_id])
|
||||
|
||||
has_vulns = request.url.find('vulnerabilities') > 0
|
||||
has_features = request.url.find('features') > 0
|
||||
if not has_vulns and not has_features:
|
||||
layer_data.pop('Features', None)
|
||||
|
||||
return {
|
||||
'status_code': 200,
|
||||
'content': json.dumps({'Layer': layer_data}),
|
||||
}
|
||||
|
||||
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers', method='POST')
|
||||
def post_layer_mock(_, request):
|
||||
body_data = json.loads(request.body)
|
||||
if not 'Layer' in body_data:
|
||||
return {'status_code': 400, 'content': 'Missing body'}
|
||||
|
||||
layer = body_data['Layer']
|
||||
if not 'Path' in layer:
|
||||
return {'status_code': 400, 'content': 'Missing Path'}
|
||||
|
||||
if not 'Name' in layer:
|
||||
return {'status_code': 400, 'content': 'Missing Name'}
|
||||
|
||||
if not 'Format' in layer:
|
||||
return {'status_code': 400, 'content': 'Missing Format'}
|
||||
|
||||
if layer['Name'] == self.internal_error_layer_id:
|
||||
return {
|
||||
'status_code': 500,
|
||||
'content': json.dumps({'Error': {'Message': 'Internal server error'}}),
|
||||
}
|
||||
|
||||
if layer['Name'] == self.fail_layer_id:
|
||||
return {
|
||||
'status_code': 422,
|
||||
'content': json.dumps({'Error': {'Message': 'Cannot analyze'}}),
|
||||
}
|
||||
|
||||
parent_id = layer.get('ParentName', None)
|
||||
parent_layer = None
|
||||
|
||||
if parent_id is not None:
|
||||
parent_layer = self.layers.get(parent_id, None)
|
||||
if parent_layer is None:
|
||||
return {
|
||||
'status_code': 400,
|
||||
'content': json.dumps({'Error': {'Message': 'Unknown parent'}}),
|
||||
}
|
||||
|
||||
self.add_layer(layer['Name'])
|
||||
if parent_layer is not None:
|
||||
self.layers[layer['Name']]['ParentName'] = parent_id
|
||||
|
||||
# If vulnerabilities have already been registered with this layer, call set_vulns to make sure
|
||||
# their data is added to the layer's data.
|
||||
if self.layer_vulns.get(layer['Name']):
|
||||
self.set_vulns(layer['Name'], self.layer_vulns[layer['Name']])
|
||||
|
||||
return {
|
||||
'status_code': 201,
|
||||
'content': json.dumps({
|
||||
"Layer": self.layers[layer['Name']],
|
||||
}),
|
||||
}
|
||||
|
||||
|
||||
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/notifications/(.+)$', method='DELETE')
|
||||
def delete_notification(url, _):
|
||||
notification_id = url.path[len('/v1/notifications/'):]
|
||||
if notification_id not in self.notifications:
|
||||
return {
|
||||
'status_code': 404,
|
||||
'content': json.dumps({'Error': {'Message': 'Unknown notification'}}),
|
||||
}
|
||||
|
||||
self.notifications.pop(notification_id)
|
||||
return {
|
||||
'status_code': 204,
|
||||
'content': '',
|
||||
}
|
||||
|
||||
|
||||
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/notifications/(.+)$', method='GET')
|
||||
def get_notification(url, _):
|
||||
notification_id = url.path[len('/v1/notifications/'):]
|
||||
if notification_id not in self.notifications:
|
||||
return {
|
||||
'status_code': 404,
|
||||
'content': json.dumps({'Error': {'Message': 'Unknown notification'}}),
|
||||
}
|
||||
|
||||
query_params = urlparse.parse_qs(url.query)
|
||||
limit = int(query_params.get('limit', [2])[0])
|
||||
page = int(query_params.get('page', [0])[0])
|
||||
|
||||
notification_data = self._get_notification_data(notification_id, page, limit)
|
||||
response = {'Notification': notification_data}
|
||||
return {
|
||||
'status_code': 200,
|
||||
'content': json.dumps(response),
|
||||
}
|
||||
|
||||
@all_requests
|
||||
def response_content(url, _):
|
||||
raise Exception('Unknown endpoint: ' + str(url))
|
||||
|
||||
return [get_layer_mock, post_layer_mock, get_notification, delete_notification,
|
||||
response_content]
|
Reference in a new issue