This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/notificationevent.py

352 lines
11 KiB
Python
Raw Normal View History

import logging
import time
import json
import re
from datetime import datetime
from endpoints.notificationhelper import build_event_data
from util.jinjautil import get_template_env
from util.morecollections import AttrDict
from util.secscan import PRIORITY_LEVELS, get_priority_for_index
template_env = get_template_env("events")
logger = logging.getLogger(__name__)
class InvalidNotificationEventException(Exception):
pass
class NotificationEvent(object):
def __init__(self):
pass
def get_level(self, event_data, notification_data):
"""
Returns a 'level' representing the severity of the event.
Valid values are: 'info', 'warning', 'error', 'primary', 'success'
"""
raise NotImplementedError
def get_summary(self, event_data, notification_data):
"""
Returns a human readable one-line summary for the given notification data.
"""
raise NotImplementedError
def get_message(self, event_data, notification_data):
"""
Returns a human readable HTML message for the given notification data.
"""
return template_env.get_template(self.event_name() + '.html').render({
'event_data': event_data,
'notification_data': notification_data
})
def get_sample_data(self, notification):
"""
Returns sample data for testing the raising of this notification, with an example notification.
"""
raise NotImplementedError
def should_perform(self, event_data, notification_data):
"""
Whether a notification for this event should be performed. By default returns True.
"""
return True
@classmethod
def event_name(cls):
"""
Particular event implemented by subclasses.
"""
raise NotImplementedError
@classmethod
def get_event(cls, eventname):
2016-09-21 18:37:23 +00:00
found = NotificationEvent._get_event(cls, eventname)
if found is not None:
return found
raise InvalidNotificationEventException('Unable to find event: %s' % eventname)
2016-09-21 18:37:23 +00:00
@staticmethod
def _get_event(cls, eventname):
for subc in cls.__subclasses__():
if subc.event_name() is None:
found = NotificationEvent._get_event(subc, eventname)
if found is not None:
return found
elif subc.event_name() == eventname:
return subc()
class RepoPushEvent(NotificationEvent):
@classmethod
def event_name(cls):
return 'repo_push'
def get_level(self, event_data, notification_data):
return 'primary'
def get_summary(self, event_data, notification_data):
return 'Repository %s updated' % (event_data['repository'])
def get_sample_data(self, notification):
# TODO(jzelinskie): remove when more endpoints have been converted to using
# interfaces
repo = AttrDict({
'namespace_name': notification.repository.namespace_user.username,
'name': notification.repository.name,
})
return build_event_data(repo, {
'updated_tags': {'latest': 'someimageid', 'foo': 'anotherimage'},
'pruned_image_count': 3
})
def _build_summary(event_data):
""" Returns a summary string for the build data found in the event data block. """
summary = 'for repository %s [%s]' % (event_data['repository'], event_data['build_id'][0:7])
return summary
class VulnerabilityFoundEvent(NotificationEvent):
@classmethod
def event_name(cls):
return 'vulnerability_found'
def get_level(self, event_data, notification_data):
priority = event_data['vulnerability']['priority']
if priority == 'Defcon1' or priority == 'Critical':
return 'error'
if priority == 'Medium' or priority == 'High':
return 'warning'
return 'info'
def get_sample_data(self, notification):
event_config = json.loads(notification.event_config_json)
# TODO(jzelinskie): remove when more endpoints have been converted to using
# interfaces
repo = AttrDict({
'namespace_name': notification.repository.namespace_user.username,
'name': notification.repository.name,
})
return build_event_data(repo, {
'tags': ['latest', 'prod', 'foo', 'bar', 'baz'],
'image': 'some-image-id',
'vulnerability': {
'id': 'CVE-FAKE-CVE',
'description': 'A futurist vulnerability',
'link': 'https://security-tracker.debian.org/tracker/CVE-FAKE-CVE',
'priority': get_priority_for_index(event_config['level'])
},
})
def should_perform(self, event_data, notification_data):
event_config = json.loads(notification_data.event_config_json)
filter_level_index = int(event_config['level'])
event_severity = PRIORITY_LEVELS.get(event_data['vulnerability']['priority'])
if event_severity is None:
return False
actual_level_index = int(event_severity['index'])
return actual_level_index <= filter_level_index
def get_summary(self, event_data, notification_data):
msg = '%s vulnerability detected in repository %s in %s tags'
return msg % (event_data['vulnerability']['priority'], event_data['repository'],
len(event_data['tags']))
class BaseBuildEvent(NotificationEvent):
2016-09-21 18:37:23 +00:00
@classmethod
def event_name(cls):
return None
def should_perform(self, event_data, notification_data):
if not notification_data.event_config_json:
return True
event_config = json.loads(notification_data.event_config_json)
ref_regex = event_config.get('ref-regex') or None
if ref_regex is None:
return True
# Lookup the ref. If none, this is a non-git build and we should not fire the event.
ref = event_data.get('trigger_metadata', {}).get('ref', None)
if ref is None:
return False
# Try parsing the regex string as a regular expression. If we fail, we fail to fire
# the event.
try:
return bool(re.compile(str(ref_regex)).match(ref))
except Exception:
logger.warning('Regular expression error for build event filter: %s', ref_regex)
return False
class BuildQueueEvent(BaseBuildEvent):
@classmethod
def event_name(cls):
return 'build_queued'
def get_level(self, event_data, notification_data):
return 'info'
2014-11-24 21:07:38 +00:00
def get_sample_data(self, notification):
build_uuid = 'fake-build-id'
# TODO(jzelinskie): remove when more endpoints have been converted to using
# interfaces
repo = AttrDict({
'namespace_name': notification.repository.namespace_user.username,
'name': notification.repository.name,
})
return build_event_data(repo, {
'is_manual': False,
'build_id': build_uuid,
'build_name': 'some-fake-build',
'docker_tags': ['latest', 'foo', 'bar'],
'trigger_id': '1245634',
'trigger_kind': 'GitHub',
'trigger_metadata': {
"default_branch": "master",
"ref": "refs/heads/somebranch",
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d",
"commit_info": {
'url': 'http://path/to/the/commit',
'message': 'Some commit message',
'date': time.mktime(datetime.now().timetuple()),
'author': {
'username': 'fakeauthor',
'url': 'http://path/to/fake/author/in/scm',
'avatar_url': 'http://www.gravatar.com/avatar/fakehash'
}
}
}
}, subpage='/build/%s' % build_uuid)
2014-11-24 21:07:38 +00:00
def get_summary(self, event_data, notification_data):
return 'Build queued ' + _build_summary(event_data)
class BuildStartEvent(BaseBuildEvent):
@classmethod
def event_name(cls):
return 'build_start'
def get_level(self, event_data, notification_data):
return 'info'
def get_sample_data(self, notification):
build_uuid = 'fake-build-id'
# TODO(jzelinskie): remove when more endpoints have been converted to using
# interfaces
repo = AttrDict({
'namespace_name': notification.repository.namespace_user.username,
'name': notification.repository.name,
})
return build_event_data(repo, {
'build_id': build_uuid,
'build_name': 'some-fake-build',
'docker_tags': ['latest', 'foo', 'bar'],
'trigger_id': '1245634',
'trigger_kind': 'GitHub',
'trigger_metadata': {
"default_branch": "master",
"ref": "refs/heads/somebranch",
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d"
}
}, subpage='/build/%s' % build_uuid)
2014-11-24 21:07:38 +00:00
def get_summary(self, event_data, notification_data):
return 'Build started ' + _build_summary(event_data)
class BuildSuccessEvent(BaseBuildEvent):
@classmethod
def event_name(cls):
return 'build_success'
def get_level(self, event_data, notification_data):
return 'success'
def get_sample_data(self, notification):
build_uuid = 'fake-build-id'
# TODO(jzelinskie): remove when more endpoints have been converted to using
# interfaces
repo = AttrDict({
'namespace_name': notification.repository.namespace_user.username,
'name': notification.repository.name,
})
return build_event_data(repo, {
'build_id': build_uuid,
'build_name': 'some-fake-build',
'docker_tags': ['latest', 'foo', 'bar'],
'trigger_id': '1245634',
'trigger_kind': 'GitHub',
'trigger_metadata': {
"default_branch": "master",
"ref": "refs/heads/somebranch",
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d"
},
'image_id': '1245657346'
}, subpage='/build/%s' % build_uuid)
def get_summary(self, event_data, notification_data):
return 'Build succeeded ' + _build_summary(event_data)
class BuildFailureEvent(BaseBuildEvent):
@classmethod
def event_name(cls):
return 'build_failure'
def get_level(self, event_data, notification_data):
return 'error'
def get_sample_data(self, notification):
build_uuid = 'fake-build-id'
# TODO(jzelinskie): remove when more endpoints have been converted to using
# interfaces
repo = AttrDict({
'namespace_name': notification.repository.namespace_user.username,
'name': notification.repository.name,
})
return build_event_data(repo, {
'build_id': build_uuid,
'build_name': 'some-fake-build',
'docker_tags': ['latest', 'foo', 'bar'],
'trigger_kind': 'GitHub',
'error_message': 'This is a fake error message',
'trigger_id': '1245634',
'trigger_kind': 'GitHub',
'trigger_metadata': {
"default_branch": "master",
"ref": "refs/heads/somebranch",
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d",
"commit_info": {
'url': 'http://path/to/the/commit',
'message': 'Some commit message',
'date': time.mktime(datetime.now().timetuple()),
'author': {
'username': 'fakeauthor',
'url': 'http://path/to/fake/author/in/scm',
'avatar_url': 'http://www.gravatar.com/avatar/fakehash'
}
}
}
}, subpage='/build?current=%s' % build_uuid)
2014-11-24 21:07:38 +00:00
def get_summary(self, event_data, notification_data):
return 'Build failure ' + _build_summary(event_data)