15041ac5ed
The FakeSecurityScanner mocks out all calls that Quay is expected to make to the security scanner API, and returns faked data that can be adjusted by the calling test case
76 lines
2.9 KiB
Python
76 lines
2.9 KiB
Python
import logging
|
|
import time
|
|
import json
|
|
|
|
import features
|
|
|
|
from app import secscan_notification_queue, secscan_api
|
|
from workers.queueworker import QueueWorker, JobException
|
|
from util.secscan.notifier import process_notification_data
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
_PROCESSING_SECONDS = 60 * 60 # 1 hour
|
|
_LAYER_LIMIT = 1000 # The number of layers to request on each page.
|
|
|
|
|
|
class SecurityNotificationWorker(QueueWorker):
|
|
def process_queue_item(self, data):
|
|
self.perform_notification_work(data)
|
|
|
|
def perform_notification_work(self, data, layer_limit=_LAYER_LIMIT):
|
|
""" Performs the work for handling a security notification as referenced by the given data
|
|
object. Returns True on successful handling, False on non-retryable failure and raises
|
|
a JobException on retryable failure.
|
|
"""
|
|
|
|
notification_name = data['Name']
|
|
current_page = data.get('page', None)
|
|
|
|
while True:
|
|
(response_data, should_retry) = secscan_api.get_notification(notification_name,
|
|
layer_limit=layer_limit,
|
|
page=current_page)
|
|
if response_data is None:
|
|
if should_retry:
|
|
raise JobException()
|
|
else:
|
|
# Remove the job from the API.
|
|
logger.error('Failed to handle security notification %s', notification_name)
|
|
secscan_api.mark_notification_read(notification_name)
|
|
|
|
# Return to mark the job as "complete", as we'll never be able to finish it.
|
|
return False
|
|
|
|
self.extend_processing(_PROCESSING_SECONDS, json.dumps(data))
|
|
notification_data = response_data['Notification']
|
|
if not process_notification_data(notification_data):
|
|
raise JobException()
|
|
|
|
# Check for a next page of results. If none, we're done.
|
|
if 'NextPage' not in notification_data:
|
|
# Mark the notification as read and processed.
|
|
if not secscan_api.mark_notification_read(notification_name):
|
|
# Return to mark the job as "complete", as we'll never be able to finish it.
|
|
logger.error('Failed to mark notification %s as read', notification_name)
|
|
return False
|
|
|
|
return True
|
|
|
|
# Otherwise, save the next page token into the queue item (so we can pick up from here if
|
|
# something goes wrong in the next loop iteration), and continue.
|
|
current_page = notification_data['NextPage']
|
|
data['page'] = current_page
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if not features.SECURITY_SCANNER or not features.SECURITY_NOTIFICATIONS:
|
|
logger.debug('Security scanner disabled; skipping SecurityNotificationWorker')
|
|
while True:
|
|
time.sleep(100000)
|
|
|
|
worker = SecurityNotificationWorker(secscan_notification_queue, poll_period_seconds=30,
|
|
reservation_seconds=30, retry_after_seconds=30)
|
|
worker.start()
|