Fixes and added tests for the security notification worker

Fixes #1301

- Ensures that the worker uses pagination properly
- Ensures that the worker handles failure as expected
- Moves marking the notification as read to after the worker processes it
- Increases the number of layers requested to 100
This commit is contained in:
Joseph Schorr 2016-03-18 20:28:06 -04:00
parent e8a511d526
commit aa5587c93c
4 changed files with 75 additions and 15 deletions

View file

@ -10,10 +10,12 @@ from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException
from util.secscan.analyzer import LayerAnalyzer
from util.secscan.notifier import process_notification_data
from data import model
from workers.security_notification_worker import SecurityNotificationWorker
ADMIN_ACCESS_USER = 'devtable'
SIMPLE_REPO = 'simple'
COMPLEX_REPO = 'complex'
_PORT_NUMBER = 5001
@ -462,5 +464,63 @@ class TestSecurityScanner(unittest.TestCase):
notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 0})
self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification))
def test_notification_worker(self):
pages_called = []
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='DELETE')
def delete_notification(url, request):
pages_called.append('DELETE')
return {'status_code': 201, 'content': ''}
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='GET')
def get_notification(url, request):
if url.query.find('page=nextpage') >= 0:
pages_called.append('GET-2')
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod')
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
data = {
'Notification': self._get_notification_data([layer_id], [layer_id]),
}
return json.dumps(data)
else:
pages_called.append('GET-1')
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
notification_data = self._get_notification_data([layer_id], [layer_id])
notification_data['NextPage'] = 'nextpage'
data = {
'Notification': notification_data,
}
return json.dumps(data)
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/(.*)')
def unknown_notification(url, request):
return {'status_code': 404, 'content': 'Unknown notification'}
# Test with an unknown notification.
with HTTMock(get_notification, unknown_notification):
worker = SecurityNotificationWorker(None)
self.assertFalse(worker.process_queue_item({
'Name': 'unknownnotification'
}))
# Test with a known notification with pages.
data = {
'Name': 'somenotification'
}
with HTTMock(get_notification, delete_notification, unknown_notification):
worker = SecurityNotificationWorker(None)
self.assertTrue(worker.process_queue_item(data))
self.assertEquals(['GET-1', 'GET-2', 'DELETE'], pages_called)
if __name__ == '__main__':
unittest.main()