This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/util/secscan/fake.py
2019-11-12 11:09:47 -05:00

348 lines
12 KiB
Python

import json
import copy
import uuid
import urlparse
from contextlib import contextmanager
from httmock import urlmatch, HTTMock, all_requests
from util.secscan.api import UNKNOWN_PARENT_LAYER_ERROR_MSG, compute_layer_id
@contextmanager
def fake_security_scanner(hostname='fakesecurityscanner'):
""" Context manager which yields a fake security scanner. All requests made to the given
hostname (default: fakesecurityscanner) will be handled by the fake.
"""
scanner = FakeSecurityScanner(hostname)
with HTTMock(*(scanner.get_endpoints())):
yield scanner
class FakeSecurityScanner(object):
""" Implements a fake security scanner (with somewhat real responses) for testing API calls and
responses.
"""
def __init__(self, hostname, index_version=1):
self.hostname = hostname
self.index_version = index_version
self.layers = {}
self.notifications = {}
self.layer_vulns = {}
self.ok_layer_id = None
self.fail_layer_id = None
self.internal_error_layer_id = None
self.error_layer_id = None
self.unexpected_status_layer_id = None
def set_ok_layer_id(self, ok_layer_id):
""" Sets a layer ID that, if encountered when the analyze call is made, causes a 200
to be immediately returned.
"""
self.ok_layer_id = ok_layer_id
def set_fail_layer_id(self, fail_layer_id):
""" Sets a layer ID that, if encountered when the analyze call is made, causes a 422
to be raised.
"""
self.fail_layer_id = fail_layer_id
def set_internal_error_layer_id(self, internal_error_layer_id):
""" Sets a layer ID that, if encountered when the analyze call is made, causes a 500
to be raised.
"""
self.internal_error_layer_id = internal_error_layer_id
def set_error_layer_id(self, error_layer_id):
""" Sets a layer ID that, if encountered when the analyze call is made, causes a 400
to be raised.
"""
self.error_layer_id = error_layer_id
def set_unexpected_status_layer_id(self, layer_id):
""" Sets a layer ID that, if encountered when the analyze call is made, causes an HTTP 600
to be raised. This is useful in testing the robustness of the to unknown status codes.
"""
self.unexpected_status_layer_id = layer_id
def has_layer(self, layer_id):
""" Returns true if the layer with the given ID has been analyzed. """
return layer_id in self.layers
def has_notification(self, notification_id):
""" Returns whether a notification with the given ID is found in the scanner. """
return notification_id in self.notifications
def add_notification(self, old_layer_ids, new_layer_ids, old_vuln, new_vuln, max_per_page=100,
indexed_old_layer_ids=None, indexed_new_layer_ids=None):
""" Adds a new notification over the given sets of layer IDs and vulnerability information,
returning the structural data of the notification created.
"""
notification_id = str(uuid.uuid4())
if old_vuln is None:
old_vuln = dict(new_vuln)
self.notifications[notification_id] = dict(old_layer_ids=old_layer_ids,
new_layer_ids=new_layer_ids,
old_vuln=old_vuln,
new_vuln=new_vuln,
max_per_page=max_per_page,
indexed_old_layer_ids=indexed_old_layer_ids,
indexed_new_layer_ids=indexed_new_layer_ids)
return self._get_notification_data(notification_id, 0, 100)
def layer_id(self, layer):
""" Returns the Quay Security Scanner layer ID for the given layer (Image row). """
return compute_layer_id(layer)
def add_layer(self, layer_id):
""" Adds a layer to the security scanner, with no features or vulnerabilities. """
self.layers[layer_id] = {
"Name": layer_id,
"Format": "Docker",
"IndexedByVersion": self.index_version,
}
def remove_layer(self, layer_id):
""" Removes a layer from the security scanner. """
self.layers.pop(layer_id, None)
def set_vulns(self, layer_id, vulns):
""" Sets the vulnerabilities for the layer with the given ID to those given. """
self.layer_vulns[layer_id] = vulns
# Since this call may occur before the layer is "anaylzed", we only add the data
# to the layer itself if present.
if self.layers.get(layer_id):
layer = self.layers[layer_id]
layer['Features'] = layer.get('Features', [])
layer['Features'].append({
"Name": 'somefeature',
"Namespace": 'somenamespace',
"Version": 'someversion',
"Vulnerabilities": self.layer_vulns[layer_id],
})
def _get_notification_data(self, notification_id, page, limit):
""" Returns the structural data for the notification with the given ID, paginated using
the given page and limit. """
notification = self.notifications[notification_id]
limit = min(limit, notification['max_per_page'])
notification_data = {
"Name": notification_id,
"Created": "1456247389",
"Notified": "1456246708",
"Limit": limit,
}
start_index = (page*limit)
end_index = ((page+1)*limit)
has_additional_page = False
if notification.get('old_vuln'):
old_layer_ids = notification['old_layer_ids']
old_layer_ids = old_layer_ids[start_index:end_index]
has_additional_page = has_additional_page or bool(len(old_layer_ids[end_index-1:]))
notification_data['Old'] = {
'Vulnerability': notification['old_vuln'],
'LayersIntroducingVulnerability': old_layer_ids,
}
if notification.get('indexed_old_layer_ids', None):
indexed_old_layer_ids = notification['indexed_old_layer_ids'][start_index:end_index]
notification_data['Old']['OrderedLayersIntroducingVulnerability'] = indexed_old_layer_ids
if notification.get('new_vuln'):
new_layer_ids = notification['new_layer_ids']
new_layer_ids = new_layer_ids[start_index:end_index]
has_additional_page = has_additional_page or bool(len(new_layer_ids[end_index-1:]))
notification_data['New'] = {
'Vulnerability': notification['new_vuln'],
'LayersIntroducingVulnerability': new_layer_ids,
}
if notification.get('indexed_new_layer_ids', None):
indexed_new_layer_ids = notification['indexed_new_layer_ids'][start_index:end_index]
notification_data['New']['OrderedLayersIntroducingVulnerability'] = indexed_new_layer_ids
if has_additional_page:
notification_data['NextPage'] = str(page+1)
return notification_data
def get_endpoints(self):
""" Returns the HTTMock endpoint definitions for the fake security scanner. """
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers/(.+)', method='GET')
def get_layer_mock(url, request):
layer_id = url.path[len('/v1/layers/'):]
if layer_id == self.ok_layer_id:
return {
'status_code': 200,
'content': json.dumps({'Layer': {}}),
}
if layer_id == self.internal_error_layer_id:
return {
'status_code': 500,
'content': json.dumps({'Error': {'Message': 'Internal server error'}}),
}
if not layer_id in self.layers:
return {
'status_code': 404,
'content': json.dumps({'Error': {'Message': 'Unknown layer'}}),
}
layer_data = copy.deepcopy(self.layers[layer_id])
has_vulns = request.url.find('vulnerabilities') > 0
has_features = request.url.find('features') > 0
if not has_vulns and not has_features:
layer_data.pop('Features', None)
return {
'status_code': 200,
'content': json.dumps({'Layer': layer_data}),
}
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers/(.+)', method='DELETE')
def remove_layer_mock(url, _):
layer_id = url.path[len('/v1/layers/'):]
if not layer_id in self.layers:
return {
'status_code': 404,
'content': json.dumps({'Error': {'Message': 'Unknown layer'}}),
}
self.layers.pop(layer_id)
return {
'status_code': 204, 'content': '',
}
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers', method='POST')
def post_layer_mock(_, request):
body_data = json.loads(request.body)
if not 'Layer' in body_data:
return {'status_code': 400, 'content': 'Missing body'}
layer = body_data['Layer']
if not 'Path' in layer:
return {'status_code': 400, 'content': 'Missing Path'}
if not 'Name' in layer:
return {'status_code': 400, 'content': 'Missing Name'}
if not 'Format' in layer:
return {'status_code': 400, 'content': 'Missing Format'}
if layer['Name'] == self.internal_error_layer_id:
return {
'status_code': 500,
'content': json.dumps({'Error': {'Message': 'Internal server error'}}),
}
if layer['Name'] == self.fail_layer_id:
return {
'status_code': 422,
'content': json.dumps({'Error': {'Message': 'Cannot analyze'}}),
}
if layer['Name'] == self.error_layer_id:
return {
'status_code': 400,
'content': json.dumps({'Error': {'Message': 'Some sort of error'}}),
}
if layer['Name'] == self.unexpected_status_layer_id:
return {
'status_code': 600,
'content': json.dumps({'Error': {'Message': 'Some sort of error'}}),
}
parent_id = layer.get('ParentName', None)
parent_layer = None
if parent_id is not None:
parent_layer = self.layers.get(parent_id, None)
if parent_layer is None:
return {
'status_code': 400,
'content': json.dumps({'Error': {'Message': UNKNOWN_PARENT_LAYER_ERROR_MSG}}),
}
self.add_layer(layer['Name'])
if parent_layer is not None:
self.layers[layer['Name']]['ParentName'] = parent_id
# If vulnerabilities have already been registered with this layer, call set_vulns to make sure
# their data is added to the layer's data.
if self.layer_vulns.get(layer['Name']):
self.set_vulns(layer['Name'], self.layer_vulns[layer['Name']])
return {
'status_code': 201,
'content': json.dumps({
"Layer": self.layers[layer['Name']],
}),
}
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/notifications/(.+)$', method='DELETE')
def delete_notification(url, _):
notification_id = url.path[len('/v1/notifications/'):]
if notification_id not in self.notifications:
return {
'status_code': 404,
'content': json.dumps({'Error': {'Message': 'Unknown notification'}}),
}
self.notifications.pop(notification_id)
return {
'status_code': 204,
'content': '',
}
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/notifications/(.+)$', method='GET')
def get_notification(url, _):
notification_id = url.path[len('/v1/notifications/'):]
if notification_id not in self.notifications:
return {
'status_code': 404,
'content': json.dumps({'Error': {'Message': 'Unknown notification'}}),
}
query_params = urlparse.parse_qs(url.query)
limit = int(query_params.get('limit', [2])[0])
page = int(query_params.get('page', [0])[0])
notification_data = self._get_notification_data(notification_id, page, limit)
response = {'Notification': notification_data}
return {
'status_code': 200,
'content': json.dumps(response),
}
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/metrics$', method='GET')
def metrics(url, _):
return {
'status_code': 200,
'content': json.dumps({'fake': True}),
}
@all_requests
def response_content(url, _):
return {
'status_code': 500,
'content': json.dumps({'Error': {'Message': 'Unknown endpoint %s' % url.path}}),
}
return [get_layer_mock, post_layer_mock, remove_layer_mock, get_notification,
delete_notification, metrics, response_content]