58 lines
1.9 KiB
Python
58 lines
1.9 KiB
Python
import logging
|
|
|
|
from flask import request, make_response, Blueprint
|
|
from data import model
|
|
from data.database import RepositoryNotification, Repository, ExternalNotificationEvent, RepositoryTag, Image
|
|
from endpoints.notificationhelper import spawn_notification
|
|
from collections import defaultdict
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
sec = Blueprint('sec', __name__)
|
|
|
|
@sec.route('/notification', methods=['POST'])
|
|
def sec_notification():
|
|
data = request.get_json()
|
|
|
|
# Find all tags that contain the layer(s) introducing the vulnerability.
|
|
# TODO: remove this check once fixed.
|
|
if not 'IntroducingLayersIDs' in data['Content']:
|
|
return make_response('Okay')
|
|
|
|
# TODO: fix this for the image_id.storage thing properly.
|
|
layer_ids = [full_id.split('.')[0] for full_id in data['Content']['IntroducingLayersIDs']]
|
|
if not layer_ids:
|
|
return make_response('Okay')
|
|
|
|
tags = model.tag.get_matching_tags(layer_ids, RepositoryTag, Repository, Image)
|
|
|
|
# For any repository that has a notification setup, issue a notification.
|
|
event = ExternalNotificationEvent.get(name='vulnerability_found')
|
|
|
|
matching = (tags.switch(RepositoryTag)
|
|
.join(Repository)
|
|
.join(RepositoryNotification)
|
|
.where(RepositoryNotification.event == event))
|
|
|
|
repository_map = defaultdict(list)
|
|
|
|
for tag in matching:
|
|
repository_map[tag.repository_id].append(tag)
|
|
|
|
for repository_id in repository_map:
|
|
tags = repository_map[repository_id]
|
|
|
|
# TODO(jschorr): Pull out the other metadata once added.
|
|
event_data = {
|
|
'tags': [tag.name for tag in tags],
|
|
'vulnerability': {
|
|
'id': data['Name'],
|
|
'description': 'Some description',
|
|
'link': 'https://security-tracker.debian.org/tracker/CVE-FAKE-CVE',
|
|
'priority': 'High',
|
|
},
|
|
}
|
|
|
|
spawn_notification(tags[0].repository, 'vulnerability_found', event_data)
|
|
|
|
return make_response('Okay')
|