Merge pull request #1303 from coreos-inc/secnotifierfixes

Fixes and added tests for the security notification worker
This commit is contained in:
Jake Moshenko 2016-03-21 10:42:07 -04:00
commit 64986add0f
4 changed files with 75 additions and 15 deletions

View file

@ -19,14 +19,5 @@ def secscan_notification():
abort(400)
notification = data['Notification']
# Queue the notification to be processed.
item_id = secscan_notification_queue.put(['named', notification['Name']],
json.dumps(notification))
# Mark the notification as read.
if not secscan_api.mark_notification_read(notification['Name']):
secscan_notification_queue.cancel(item_id)
abort(400)
secscan_notification_queue.put(['named', notification['Name']], json.dumps(notification))
return make_response('Okay')

View file

@ -10,10 +10,12 @@ from util.secscan.api import SecurityScannerAPI, AnalyzeLayerException
from util.secscan.analyzer import LayerAnalyzer
from util.secscan.notifier import process_notification_data
from data import model
from workers.security_notification_worker import SecurityNotificationWorker
ADMIN_ACCESS_USER = 'devtable'
SIMPLE_REPO = 'simple'
COMPLEX_REPO = 'complex'
_PORT_NUMBER = 5001
@ -462,5 +464,63 @@ class TestSecurityScanner(unittest.TestCase):
notification = model.notification.create_repo_notification(repo, 'vulnerability_found', 'quay_notification', {}, {'level': 0})
self.assertFalse(VulnerabilityFoundEvent().should_perform(event_data, notification))
def test_notification_worker(self):
pages_called = []
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='DELETE')
def delete_notification(url, request):
pages_called.append('DELETE')
return {'status_code': 201, 'content': ''}
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/somenotification$', method='GET')
def get_notification(url, request):
if url.query.find('page=nextpage') >= 0:
pages_called.append('GET-2')
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, COMPLEX_REPO, 'prod')
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
data = {
'Notification': self._get_notification_data([layer_id], [layer_id]),
}
return json.dumps(data)
else:
pages_called.append('GET-1')
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, SIMPLE_REPO, 'latest')
layer_id = '%s.%s' % (layer.docker_image_id, layer.storage.uuid)
notification_data = self._get_notification_data([layer_id], [layer_id])
notification_data['NextPage'] = 'nextpage'
data = {
'Notification': notification_data,
}
return json.dumps(data)
@urlmatch(netloc=r'(.*\.)?mockclairservice', path=r'/v1/notifications/(.*)')
def unknown_notification(url, request):
return {'status_code': 404, 'content': 'Unknown notification'}
# Test with an unknown notification.
with HTTMock(get_notification, unknown_notification):
worker = SecurityNotificationWorker(None)
self.assertFalse(worker.process_queue_item({
'Name': 'unknownnotification'
}))
# Test with a known notification with pages.
data = {
'Name': 'somenotification'
}
with HTTMock(get_notification, delete_notification, unknown_notification):
worker = SecurityNotificationWorker(None)
self.assertTrue(worker.process_queue_item(data))
self.assertEquals(['GET-1', 'GET-2', 'DELETE'], pages_called)
if __name__ == '__main__':
unittest.main()

View file

@ -160,7 +160,7 @@ class SecurityScannerAPI(object):
return False
def get_notification(self, notification_name, layer_limit=10, page=None):
def get_notification(self, notification_name, layer_limit=100, page=None):
""" Gets the data for a specific notification, with optional page token.
Returns a tuple of the data (None on failure) and whether to retry.
"""
@ -194,7 +194,7 @@ class SecurityScannerAPI(object):
""" Marks a security scanner notification as read. """
try:
response = self._call('DELETE', _API_METHOD_MARK_NOTIFICATION_READ % notification_name)
return response.status_code == 200
return response.status_code / 100 == 2
except requests.exceptions.RequestException:
logger.exception('Failed to mark notification as read: %s', notification_name)
return False

View file

@ -11,6 +11,7 @@ from util.secscan.notifier import process_notification_data
logger = logging.getLogger(__name__)
_EXTENDED_SECONDS = 600
_LAYER_LIMIT = 100 # The number of layers to request on each page.
class SecurityNotificationWorker(QueueWorker):
def process_queue_item(self, data):
@ -18,14 +19,16 @@ class SecurityNotificationWorker(QueueWorker):
current_page = data.get('page', None)
while True:
(response_data, should_retry) = secscan_api.get_notification(notification_name)
(response_data, should_retry) = secscan_api.get_notification(notification_name,
layer_limit=_LAYER_LIMIT,
page=current_page)
if response_data is None:
if should_retry:
raise JobException()
else:
# Return to mark the job as "complete", as we'll never be able to finish it.
logger.error('Failed to handle security notification %s', notification_name)
return
return False
notification_data = response_data['Notification']
if not process_notification_data(notification_data):
@ -33,7 +36,13 @@ class SecurityNotificationWorker(QueueWorker):
# Check for a next page of results. If none, we're done.
if 'NextPage' not in notification_data:
return
# Mark the notification as read and processed.
if not secscan_api.mark_notification_read(notification_name):
# Return to mark the job as "complete", as we'll never be able to finish it.
logger.error('Failed to mark notification %s as read', notification_name)
return False
return True
# Otherwise, save the next page token into the queue item (so we can pick up from here if
# something goes wrong in the next loop iteration), and continue.