This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/util/secscan/fake.py

334 lines
12 KiB
Python

import json
import copy
import uuid
import urlparse
from contextlib import contextmanager
from httmock import urlmatch, HTTMock, all_requests
from util.secscan.api import UNKNOWN_PARENT_LAYER_ERROR_MSG, compute_layer_id
@contextmanager
def fake_security_scanner(hostname='fakesecurityscanner'):
""" Context manager which yields a fake security scanner. All requests made to the given
hostname (default: fakesecurityscanner) will be handled by the fake.
"""
scanner = FakeSecurityScanner(hostname)
with HTTMock(*(scanner.get_endpoints())):
yield scanner
class FakeSecurityScanner(object):
""" Implements a fake security scanner (with somewhat real responses) for testing API calls and
responses.
"""
def __init__(self, hostname, index_version=1):
self.hostname = hostname
self.index_version = index_version
self.layers = {}
self.notifications = {}
self.layer_vulns = {}
self.ok_layer_id = None
self.fail_layer_id = None
self.internal_error_layer_id = None
self.error_layer_id = None
def set_ok_layer_id(self, ok_layer_id):
""" Sets a layer ID that, if encountered when the analyze call is made, causes a 200
to be immediately returned.
"""
self.ok_layer_id = ok_layer_id
def set_fail_layer_id(self, fail_layer_id):
""" Sets a layer ID that, if encountered when the analyze call is made, causes a 422
to be raised.
"""
self.fail_layer_id = fail_layer_id
def set_internal_error_layer_id(self, internal_error_layer_id):
""" Sets a layer ID that, if encountered when the analyze call is made, causes a 500
to be raised.
"""
self.internal_error_layer_id = internal_error_layer_id
def set_error_layer_id(self, error_layer_id):
""" Sets a layer ID that, if encountered when the analyze call is made, causes a 400
to be raised.
"""
self.error_layer_id = error_layer_id
def has_layer(self, layer_id):
""" Returns true if the layer with the given ID has been analyzed. """
return layer_id in self.layers
def has_notification(self, notification_id):
""" Returns whether a notification with the given ID is found in the scanner. """
return notification_id in self.notifications
def add_notification(self, old_layer_ids, new_layer_ids, old_vuln, new_vuln, max_per_page=100,
indexed_old_layer_ids=None, indexed_new_layer_ids=None):
""" Adds a new notification over the given sets of layer IDs and vulnerability information,
returning the structural data of the notification created.
"""
notification_id = str(uuid.uuid4())
if old_vuln is None:
old_vuln = dict(new_vuln)
self.notifications[notification_id] = dict(old_layer_ids=old_layer_ids,
new_layer_ids=new_layer_ids,
old_vuln=old_vuln,
new_vuln=new_vuln,
max_per_page=max_per_page,
indexed_old_layer_ids=indexed_old_layer_ids,
indexed_new_layer_ids=indexed_new_layer_ids)
return self._get_notification_data(notification_id, 0, 100)
def layer_id(self, layer):
""" Returns the Quay Security Scanner layer ID for the given layer (Image row). """
return compute_layer_id(layer)
def add_layer(self, layer_id):
""" Adds a layer to the security scanner, with no features or vulnerabilities. """
self.layers[layer_id] = {
"Name": layer_id,
"Format": "Docker",
"IndexedByVersion": self.index_version,
}
def remove_layer(self, layer_id):
""" Removes a layer from the security scanner. """
self.layers.pop(layer_id, None)
def set_vulns(self, layer_id, vulns):
""" Sets the vulnerabilities for the layer with the given ID to those given. """
self.layer_vulns[layer_id] = vulns
# Since this call may occur before the layer is "anaylzed", we only add the data
# to the layer itself if present.
if self.layers.get(layer_id):
layer = self.layers[layer_id]
layer['Features'] = layer.get('Features', [])
layer['Features'].append({
"Name": 'somefeature',
"Namespace": 'somenamespace',
"Version": 'someversion',
"Vulnerabilities": self.layer_vulns[layer_id],
})
def _get_notification_data(self, notification_id, page, limit):
""" Returns the structural data for the notification with the given ID, paginated using
the given page and limit. """
notification = self.notifications[notification_id]
limit = min(limit, notification['max_per_page'])
notification_data = {
"Name": notification_id,
"Created": "1456247389",
"Notified": "1456246708",
"Limit": limit,
}
start_index = (page*limit)
end_index = ((page+1)*limit)
has_additional_page = False
if notification.get('old_vuln'):
old_layer_ids = notification['old_layer_ids']
old_layer_ids = old_layer_ids[start_index:end_index]
has_additional_page = has_additional_page or bool(len(old_layer_ids[end_index-1:]))
notification_data['Old'] = {
'Vulnerability': notification['old_vuln'],
'LayersIntroducingVulnerability': old_layer_ids,
}
if notification.get('indexed_old_layer_ids', None):
indexed_old_layer_ids = notification['indexed_old_layer_ids'][start_index:end_index]
notification_data['Old']['OrderedLayersIntroducingVulnerability'] = indexed_old_layer_ids
if notification.get('new_vuln'):
new_layer_ids = notification['new_layer_ids']
new_layer_ids = new_layer_ids[start_index:end_index]
has_additional_page = has_additional_page or bool(len(new_layer_ids[end_index-1:]))
notification_data['New'] = {
'Vulnerability': notification['new_vuln'],
'LayersIntroducingVulnerability': new_layer_ids,
}
if notification.get('indexed_new_layer_ids', None):
indexed_new_layer_ids = notification['indexed_new_layer_ids'][start_index:end_index]
notification_data['New']['OrderedLayersIntroducingVulnerability'] = indexed_new_layer_ids
if has_additional_page:
notification_data['NextPage'] = str(page+1)
return notification_data
def get_endpoints(self):
""" Returns the HTTMock endpoint definitions for the fake security scanner. """
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers/(.+)', method='GET')
def get_layer_mock(url, request):
layer_id = url.path[len('/v1/layers/'):]
if layer_id == self.ok_layer_id:
return {
'status_code': 200,
'content': json.dumps({'Layer': {}}),
}
if layer_id == self.internal_error_layer_id:
return {
'status_code': 500,
'content': json.dumps({'Error': {'Message': 'Internal server error'}}),
}
if not layer_id in self.layers:
return {
'status_code': 404,
'content': json.dumps({'Error': {'Message': 'Unknown layer'}}),
}
layer_data = copy.deepcopy(self.layers[layer_id])
has_vulns = request.url.find('vulnerabilities') > 0
has_features = request.url.find('features') > 0
if not has_vulns and not has_features:
layer_data.pop('Features', None)
return {
'status_code': 200,
'content': json.dumps({'Layer': layer_data}),
}
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers/(.+)', method='DELETE')
def remove_layer_mock(url, _):
layer_id = url.path[len('/v1/layers/'):]
if not layer_id in self.layers:
return {
'status_code': 404,
'content': json.dumps({'Error': {'Message': 'Unknown layer'}}),
}
self.layers.pop(layer_id)
return {
'status_code': 204, 'content': '',
}
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/layers', method='POST')
def post_layer_mock(_, request):
body_data = json.loads(request.body)
if not 'Layer' in body_data:
return {'status_code': 400, 'content': 'Missing body'}
layer = body_data['Layer']
if not 'Path' in layer:
return {'status_code': 400, 'content': 'Missing Path'}
if not 'Name' in layer:
return {'status_code': 400, 'content': 'Missing Name'}
if not 'Format' in layer:
return {'status_code': 400, 'content': 'Missing Format'}
if layer['Name'] == self.internal_error_layer_id:
return {
'status_code': 500,
'content': json.dumps({'Error': {'Message': 'Internal server error'}}),
}
if layer['Name'] == self.fail_layer_id:
return {
'status_code': 422,
'content': json.dumps({'Error': {'Message': 'Cannot analyze'}}),
}
if layer['Name'] == self.error_layer_id:
return {
'status_code': 400,
'content': json.dumps({'Error': {'Message': 'Some sort of error'}}),
}
parent_id = layer.get('ParentName', None)
parent_layer = None
if parent_id is not None:
parent_layer = self.layers.get(parent_id, None)
if parent_layer is None:
return {
'status_code': 400,
'content': json.dumps({'Error': {'Message': UNKNOWN_PARENT_LAYER_ERROR_MSG}}),
}
self.add_layer(layer['Name'])
if parent_layer is not None:
self.layers[layer['Name']]['ParentName'] = parent_id
# If vulnerabilities have already been registered with this layer, call set_vulns to make sure
# their data is added to the layer's data.
if self.layer_vulns.get(layer['Name']):
self.set_vulns(layer['Name'], self.layer_vulns[layer['Name']])
return {
'status_code': 201,
'content': json.dumps({
"Layer": self.layers[layer['Name']],
}),
}
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/notifications/(.+)$', method='DELETE')
def delete_notification(url, _):
notification_id = url.path[len('/v1/notifications/'):]
if notification_id not in self.notifications:
return {
'status_code': 404,
'content': json.dumps({'Error': {'Message': 'Unknown notification'}}),
}
self.notifications.pop(notification_id)
return {
'status_code': 204,
'content': '',
}
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/notifications/(.+)$', method='GET')
def get_notification(url, _):
notification_id = url.path[len('/v1/notifications/'):]
if notification_id not in self.notifications:
return {
'status_code': 404,
'content': json.dumps({'Error': {'Message': 'Unknown notification'}}),
}
query_params = urlparse.parse_qs(url.query)
limit = int(query_params.get('limit', [2])[0])
page = int(query_params.get('page', [0])[0])
notification_data = self._get_notification_data(notification_id, page, limit)
response = {'Notification': notification_data}
return {
'status_code': 200,
'content': json.dumps(response),
}
@urlmatch(netloc=r'(.*\.)?' + self.hostname, path=r'/v1/metrics$', method='GET')
def metrics(url, _):
return {
'status_code': 200,
'content': json.dumps({'fake': True}),
}
@all_requests
def response_content(url, _):
return {
'status_code': 500,
'content': json.dumps({'Error': {'Message': 'Unknown endpoint %s' % url.path}}),
}
return [get_layer_mock, post_layer_mock, remove_layer_mock, get_notification,
delete_notification, metrics, response_content]