commit
ce0ba3f68f
4 changed files with 113 additions and 39 deletions
|
@ -1,10 +1,12 @@
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
from contextlib import contextmanager
|
||||||
|
|
||||||
from data.database import QueueItem, db, db_for_update, db_random_func
|
from data.database import QueueItem, db, db_for_update, db_random_func
|
||||||
from util.morecollections import AttrDict
|
from util.morecollections import AttrDict
|
||||||
|
|
||||||
|
|
||||||
MINIMUM_EXTENSION = timedelta(seconds=20)
|
MINIMUM_EXTENSION = timedelta(seconds=20)
|
||||||
|
DEFAULT_BATCH_SIZE = 1000
|
||||||
|
|
||||||
|
|
||||||
class NoopWith:
|
class NoopWith:
|
||||||
|
@ -160,18 +162,45 @@ class WorkQueue(object):
|
||||||
except QueueItem.DoesNotExist:
|
except QueueItem.DoesNotExist:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
|
def _queue_body(self, canonical_name_list, message, available_after, retries_remaining):
|
||||||
"""
|
return dict(
|
||||||
Put an item, if it shouldn't be processed for some number of seconds,
|
|
||||||
specify that amount as available_after. Returns the ID of the queue item added.
|
|
||||||
"""
|
|
||||||
item = QueueItem.create(
|
|
||||||
queue_name=self._canonical_name([self._queue_name] + canonical_name_list),
|
queue_name=self._canonical_name([self._queue_name] + canonical_name_list),
|
||||||
body=message,
|
body=message,
|
||||||
retries_remaining=retries_remaining,
|
retries_remaining=retries_remaining,
|
||||||
available_after=datetime.utcnow() + timedelta(seconds=available_after or 0),
|
available_after=datetime.utcnow() + timedelta(seconds=available_after or 0),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def batch_insert(self, batch_size=DEFAULT_BATCH_SIZE):
|
||||||
|
items_to_insert = []
|
||||||
|
def batch_put(canonical_name_list, message, available_after=0, retries_remaining=5):
|
||||||
|
"""
|
||||||
|
Put an item, if it shouldn't be processed for some number of seconds,
|
||||||
|
specify that amount as available_after. Returns the ID of the queue item added.
|
||||||
|
"""
|
||||||
|
items_to_insert.append(self._queue_body(canonical_name_list, message, available_after,
|
||||||
|
retries_remaining))
|
||||||
|
|
||||||
|
yield batch_put
|
||||||
|
|
||||||
|
# Chunk the inserted items into batch_size chunks and insert_many
|
||||||
|
remaining = list(items_to_insert)
|
||||||
|
while remaining:
|
||||||
|
QueueItem.insert_many(remaining[0:batch_size]).execute()
|
||||||
|
remaining = remaining[batch_size:]
|
||||||
|
|
||||||
|
if self._metric_queue:
|
||||||
|
self._metric_queue.put_deprecated('Added', len(items_to_insert),
|
||||||
|
dimensions={'queue': self._queue_name})
|
||||||
|
|
||||||
|
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
|
||||||
|
"""
|
||||||
|
Put an item, if it shouldn't be processed for some number of seconds,
|
||||||
|
specify that amount as available_after. Returns the ID of the queue item added.
|
||||||
|
"""
|
||||||
|
item = QueueItem.create(**self._queue_body(canonical_name_list, message, available_after,
|
||||||
|
retries_remaining))
|
||||||
|
|
||||||
if self._metric_queue:
|
if self._metric_queue:
|
||||||
self._metric_queue.put_deprecated('Added', 1, dimensions={'queue': self._queue_name})
|
self._metric_queue.put_deprecated('Added', 1, dimensions={'queue': self._queue_name})
|
||||||
|
|
||||||
|
|
|
@ -1,11 +1,16 @@
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
from contextlib import contextmanager
|
||||||
|
|
||||||
from app import app, notification_queue
|
from app import app, notification_queue
|
||||||
from data import model
|
from data import model
|
||||||
from auth.auth_context import get_authenticated_user, get_validated_oauth_token
|
from auth.auth_context import get_authenticated_user, get_validated_oauth_token
|
||||||
|
|
||||||
|
|
||||||
def build_event_data(repo, extra_data={}, subpage=None):
|
DEFAULT_BATCH_SIZE = 1000
|
||||||
|
|
||||||
|
|
||||||
|
def build_event_data(repo, extra_data=None, subpage=None):
|
||||||
repo_string = '%s/%s' % (repo.namespace_name, repo.name)
|
repo_string = '%s/%s' % (repo.namespace_name, repo.name)
|
||||||
homepage = '%s://%s/repository/%s' % (app.config['PREFERRED_URL_SCHEME'],
|
homepage = '%s://%s/repository/%s' % (app.config['PREFERRED_URL_SCHEME'],
|
||||||
app.config['SERVER_HOSTNAME'],
|
app.config['SERVER_HOSTNAME'],
|
||||||
|
@ -25,7 +30,7 @@ def build_event_data(repo, extra_data={}, subpage=None):
|
||||||
'homepage': homepage,
|
'homepage': homepage,
|
||||||
}
|
}
|
||||||
|
|
||||||
event_data.update(extra_data)
|
event_data.update(extra_data or {})
|
||||||
return event_data
|
return event_data
|
||||||
|
|
||||||
def build_notification_data(notification, event_data, performer_data=None):
|
def build_notification_data(notification, event_data, performer_data=None):
|
||||||
|
@ -50,14 +55,30 @@ def build_notification_data(notification, event_data, performer_data=None):
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def spawn_notification(repo, event_name, extra_data={}, subpage=None, pathargs=[],
|
@contextmanager
|
||||||
performer_data=None):
|
def notification_batch(batch_size=DEFAULT_BATCH_SIZE):
|
||||||
event_data = build_event_data(repo, extra_data=extra_data, subpage=subpage)
|
"""
|
||||||
|
Context manager implementation which returns a target callable with the same signature
|
||||||
|
as spawn_notification. When the the context block exits the notifications generated by
|
||||||
|
the callable will be bulk inserted into the queue with the specified batch size.
|
||||||
|
"""
|
||||||
|
with notification_queue.batch_insert(batch_size) as queue_put:
|
||||||
|
def spawn_notification_batch(repo, event_name, extra_data=None, subpage=None, pathargs=None,
|
||||||
|
performer_data=None):
|
||||||
|
event_data = build_event_data(repo, extra_data=extra_data, subpage=subpage)
|
||||||
|
|
||||||
notifications = model.notification.list_repo_notifications(repo.namespace_name,
|
notifications = model.notification.list_repo_notifications(repo.namespace_name,
|
||||||
repo.name,
|
repo.name,
|
||||||
event_name=event_name)
|
event_name=event_name)
|
||||||
for notification in list(notifications):
|
path = [repo.namespace_name, repo.name, event_name] + (pathargs or [])
|
||||||
notification_data = build_notification_data(notification, event_data, performer_data)
|
for notification in list(notifications):
|
||||||
path = [repo.namespace_name, repo.name, event_name] + pathargs
|
notification_data = build_notification_data(notification, event_data, performer_data)
|
||||||
notification_queue.put(path, json.dumps(notification_data))
|
queue_put(path, json.dumps(notification_data))
|
||||||
|
|
||||||
|
yield spawn_notification_batch
|
||||||
|
|
||||||
|
|
||||||
|
def spawn_notification(repo, event_name, extra_data=None, subpage=None, pathargs=None,
|
||||||
|
performer_data=None):
|
||||||
|
with notification_batch(1) as batch_spawn:
|
||||||
|
batch_spawn(repo, event_name, extra_data, subpage, pathargs, performer_data)
|
||||||
|
|
|
@ -67,8 +67,9 @@ class TestQueue(QueueTestCase):
|
||||||
self.assertEqual(self.reporter.running_count, None)
|
self.assertEqual(self.reporter.running_count, None)
|
||||||
self.assertEqual(self.reporter.total, None)
|
self.assertEqual(self.reporter.total, None)
|
||||||
|
|
||||||
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)
|
id_1 = int(self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1))
|
||||||
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_2, available_after=-1)
|
id_2 = int(self.queue.put(['abc', 'def'], self.TEST_MESSAGE_2, available_after=-1))
|
||||||
|
self.assertEqual(id_1 + 1, id_2)
|
||||||
self.assertEqual(self.reporter.currently_processing, False)
|
self.assertEqual(self.reporter.currently_processing, False)
|
||||||
self.assertEqual(self.reporter.running_count, 0)
|
self.assertEqual(self.reporter.running_count, 0)
|
||||||
self.assertEqual(self.reporter.total, 1)
|
self.assertEqual(self.reporter.total, 1)
|
||||||
|
@ -202,6 +203,28 @@ class TestQueue(QueueTestCase):
|
||||||
msg = str(json_body['data'])
|
msg = str(json_body['data'])
|
||||||
self.assertIn(msg, seen)
|
self.assertIn(msg, seen)
|
||||||
|
|
||||||
|
def test_bulk_insert(self):
|
||||||
|
self.assertEqual(self.reporter.currently_processing, None)
|
||||||
|
self.assertEqual(self.reporter.running_count, None)
|
||||||
|
self.assertEqual(self.reporter.total, None)
|
||||||
|
|
||||||
|
with self.queue.batch_insert() as queue_put:
|
||||||
|
queue_put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)
|
||||||
|
queue_put(['abc', 'def'], self.TEST_MESSAGE_2, available_after=-1)
|
||||||
|
|
||||||
|
self.queue.update_metrics()
|
||||||
|
self.assertEqual(self.reporter.currently_processing, False)
|
||||||
|
self.assertEqual(self.reporter.running_count, 0)
|
||||||
|
self.assertEqual(self.reporter.total, 1)
|
||||||
|
|
||||||
|
with self.queue.batch_insert() as queue_put:
|
||||||
|
queue_put(['abd', 'def'], self.TEST_MESSAGE_1, available_after=-1)
|
||||||
|
queue_put(['abd', 'ghi'], self.TEST_MESSAGE_2, available_after=-1)
|
||||||
|
|
||||||
|
self.queue.update_metrics()
|
||||||
|
self.assertEqual(self.reporter.currently_processing, False)
|
||||||
|
self.assertEqual(self.reporter.running_count, 0)
|
||||||
|
self.assertEqual(self.reporter.total, 3)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
|
@ -7,7 +7,7 @@ from app import secscan_api
|
||||||
from data.model.tag import filter_tags_have_repository_event, get_matching_tags
|
from data.model.tag import filter_tags_have_repository_event, get_matching_tags
|
||||||
from data.database import (Image, ImageStorage, ExternalNotificationEvent, Repository,
|
from data.database import (Image, ImageStorage, ExternalNotificationEvent, Repository,
|
||||||
RepositoryTag)
|
RepositoryTag)
|
||||||
from endpoints.notificationhelper import spawn_notification
|
from endpoints.notificationhelper import notification_batch
|
||||||
from util.secscan import PRIORITY_LEVELS
|
from util.secscan import PRIORITY_LEVELS
|
||||||
from util.secscan.api import APIRequestFailure
|
from util.secscan.api import APIRequestFailure
|
||||||
from util.morecollections import AttrDict
|
from util.morecollections import AttrDict
|
||||||
|
@ -89,25 +89,26 @@ def process_notification_data(notification_data):
|
||||||
repository_map[tag.repository_id] = tag.repository
|
repository_map[tag.repository_id] = tag.repository
|
||||||
|
|
||||||
# For each of the tags found, issue a notification.
|
# For each of the tags found, issue a notification.
|
||||||
for repository_id in tag_map:
|
with notification_batch() as spawn_notification:
|
||||||
tags = tag_map[repository_id]
|
for repository_id in tag_map:
|
||||||
event_data = {
|
tags = tag_map[repository_id]
|
||||||
'tags': list(tags),
|
event_data = {
|
||||||
'vulnerability': {
|
'tags': list(tags),
|
||||||
'id': cve_id,
|
'vulnerability': {
|
||||||
'description': new_vuln.get('Description', None),
|
'id': cve_id,
|
||||||
'link': new_vuln.get('Link', None),
|
'description': new_vuln.get('Description', None),
|
||||||
'priority': new_severity['title'],
|
'link': new_vuln.get('Link', None),
|
||||||
'has_fix': 'FixedIn' in new_vuln,
|
'priority': new_severity['title'],
|
||||||
},
|
'has_fix': 'FixedIn' in new_vuln,
|
||||||
}
|
},
|
||||||
|
}
|
||||||
|
|
||||||
# TODO(jzelinskie): remove when more endpoints have been converted to using interfaces
|
# TODO(jzelinskie): remove when more endpoints have been converted to using interfaces
|
||||||
repository = AttrDict({
|
repository = AttrDict({
|
||||||
'namespace_name': repository_map[repository_id].namespace_user.username,
|
'namespace_name': repository_map[repository_id].namespace_user.username,
|
||||||
'name': repository_map[repository_id].name,
|
'name': repository_map[repository_id].name,
|
||||||
})
|
})
|
||||||
spawn_notification(repository, 'vulnerability_found', event_data)
|
spawn_notification(repository, 'vulnerability_found', event_data)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
Reference in a new issue