Move notifications into its own package

This commit is contained in:
Joseph Schorr 2017-07-14 16:09:56 +03:00
parent be206a8b88
commit ce56031846
16 changed files with 73 additions and 70 deletions

View file

View file

@ -0,0 +1,15 @@
from collections import namedtuple
class Repository(namedtuple('Repository', ['namespace_name', 'name'])):
"""
Repository represents a repository.
"""
class Notification(
namedtuple('Notification', [
'uuid', 'event_name', 'method_name', 'event_config_dict', 'method_config_dict',
'repository'])):
"""
Notification represents a registered notification of some kind.
"""

View file

@ -0,0 +1,364 @@
import logging
import time
import re
from datetime import datetime
from notifications.notificationhelper import build_event_data
from util.jinjautil import get_template_env
from util.secscan import PRIORITY_LEVELS, get_priority_for_index
logger = logging.getLogger(__name__)
TEMPLATE_ENV = get_template_env("events")
class InvalidNotificationEventException(Exception):
pass
class NotificationEvent(object):
def __init__(self):
pass
def get_level(self, event_data, notification_data):
"""
Returns a 'level' representing the severity of the event.
Valid values are: 'info', 'warning', 'error', 'primary', 'success'
"""
raise NotImplementedError
def get_summary(self, event_data, notification_data):
"""
Returns a human readable one-line summary for the given notification data.
"""
raise NotImplementedError
def get_message(self, event_data, notification_data):
"""
Returns a human readable HTML message for the given notification data.
"""
return TEMPLATE_ENV.get_template(self.event_name() + '.html').render({
'event_data': event_data,
'notification_data': notification_data
})
def get_sample_data(self, notification):
"""
Returns sample data for testing the raising of this notification, with an example notification.
"""
raise NotImplementedError
def should_perform(self, event_data, notification_data):
"""
Whether a notification for this event should be performed. By default returns True.
"""
return True
@classmethod
def event_name(cls):
"""
Particular event implemented by subclasses.
"""
raise NotImplementedError
@classmethod
def get_event(cls, eventname):
found = NotificationEvent._get_event(cls, eventname)
if found is not None:
return found
raise InvalidNotificationEventException('Unable to find event: %s' % eventname)
@staticmethod
def _get_event(cls, eventname):
for subc in cls.__subclasses__():
if subc.event_name() is None:
found = NotificationEvent._get_event(subc, eventname)
if found is not None:
return found
elif subc.event_name() == eventname:
return subc()
class RepoPushEvent(NotificationEvent):
@classmethod
def event_name(cls):
return 'repo_push'
def get_level(self, event_data, notification_data):
return 'primary'
def get_summary(self, event_data, notification_data):
return 'Repository %s updated' % (event_data['repository'])
def get_sample_data(self, notification):
return build_event_data(notification.repository, {
'updated_tags': {'latest': 'someimageid', 'foo': 'anotherimage'},
'pruned_image_count': 3
})
def _build_summary(event_data):
""" Returns a summary string for the build data found in the event data block. """
summary = 'for repository %s [%s]' % (event_data['repository'], event_data['build_id'][0:7])
return summary
class VulnerabilityFoundEvent(NotificationEvent):
CONFIG_LEVEL = 'level'
PRIORITY_KEY = 'priority'
VULNERABILITY_KEY = 'vulnerability'
MULTIPLE_VULNERABILITY_KEY = 'vulnerabilities'
@classmethod
def event_name(cls):
return 'vulnerability_found'
def get_level(self, event_data, notification_data):
vuln_data = event_data[VulnerabilityFoundEvent.VULNERABILITY_KEY]
priority = vuln_data[VulnerabilityFoundEvent.PRIORITY_KEY]
if priority == 'Defcon1' or priority == 'Critical':
return 'error'
if priority == 'Medium' or priority == 'High':
return 'warning'
return 'info'
def get_sample_data(self, notification):
event_config = notification.event_config_dict
level = event_config.get(VulnerabilityFoundEvent.CONFIG_LEVEL, 'Critical')
return build_event_data(notification.repository, {
'tags': ['latest', 'prod', 'foo', 'bar', 'baz'],
'image': 'some-image-id',
'vulnerability': {
'id': 'CVE-FAKE-CVE',
'description': 'A futurist vulnerability',
'link': 'https://security-tracker.debian.org/tracker/CVE-FAKE-CVE',
'priority': get_priority_for_index(level)
},
})
def should_perform(self, event_data, notification_data):
event_config = notification_data.event_config_dict
if VulnerabilityFoundEvent.CONFIG_LEVEL not in event_config:
return True
if VulnerabilityFoundEvent.VULNERABILITY_KEY not in event_data:
return False
vuln_info = event_data.get(VulnerabilityFoundEvent.VULNERABILITY_KEY, {})
event_severity = PRIORITY_LEVELS.get(vuln_info.get('priority', 'Unknown'))
if event_severity is None:
return False
actual_level_index = int(event_severity['index'])
filter_level_index = int(event_config[VulnerabilityFoundEvent.CONFIG_LEVEL])
return actual_level_index <= filter_level_index
def get_summary(self, event_data, notification_data):
vuln_key = VulnerabilityFoundEvent.VULNERABILITY_KEY
priority_key = VulnerabilityFoundEvent.PRIORITY_KEY
multiple_vulns = event_data.get(VulnerabilityFoundEvent.MULTIPLE_VULNERABILITY_KEY)
if multiple_vulns is not None:
top_priority = multiple_vulns[0].get(priority_key, 'Unknown')
matching = [v for v in multiple_vulns if v.get(priority_key, 'Unknown') == top_priority]
msg = '%s %s' % (len(matching), top_priority)
if len(matching) < len(multiple_vulns):
msg += ' and %s more' % (len(multiple_vulns) - len(matching))
msg += ' vulnerabilities were detected in repository %s in %s tags'
return msg % (event_data['repository'], len(event_data['tags']))
else:
msg = '%s vulnerability detected in repository %s in %s tags'
return msg % (event_data[vuln_key][priority_key], event_data['repository'],
len(event_data['tags']))
class BaseBuildEvent(NotificationEvent):
@classmethod
def event_name(cls):
return None
def should_perform(self, event_data, notification_data):
if not notification_data.event_config_dict:
return True
event_config = notification_data.event_config_dict
ref_regex = event_config.get('ref-regex') or None
if ref_regex is None:
return True
# Lookup the ref. If none, this is a non-git build and we should not fire the event.
ref = event_data.get('trigger_metadata', {}).get('ref', None)
if ref is None:
return False
# Try parsing the regex string as a regular expression. If we fail, we fail to fire
# the event.
try:
return bool(re.compile(str(ref_regex)).match(ref))
except Exception:
logger.warning('Regular expression error for build event filter: %s', ref_regex)
return False
class BuildQueueEvent(BaseBuildEvent):
@classmethod
def event_name(cls):
return 'build_queued'
def get_level(self, event_data, notification_data):
return 'info'
def get_sample_data(self, notification):
build_uuid = 'fake-build-id'
return build_event_data(notification.repository, {
'is_manual': False,
'build_id': build_uuid,
'build_name': 'some-fake-build',
'docker_tags': ['latest', 'foo', 'bar'],
'trigger_id': '1245634',
'trigger_kind': 'GitHub',
'trigger_metadata': {
"default_branch": "master",
"ref": "refs/heads/somebranch",
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d",
"commit_info": {
'url': 'http://path/to/the/commit',
'message': 'Some commit message',
'date': time.mktime(datetime.now().timetuple()),
'author': {
'username': 'fakeauthor',
'url': 'http://path/to/fake/author/in/scm',
'avatar_url': 'http://www.gravatar.com/avatar/fakehash'
}
}
}
}, subpage='/build/%s' % build_uuid)
def get_summary(self, event_data, notification_data):
return 'Build queued ' + _build_summary(event_data)
class BuildStartEvent(BaseBuildEvent):
@classmethod
def event_name(cls):
return 'build_start'
def get_level(self, event_data, notification_data):
return 'info'
def get_sample_data(self, notification):
build_uuid = 'fake-build-id'
return build_event_data(notification.repository, {
'build_id': build_uuid,
'build_name': 'some-fake-build',
'docker_tags': ['latest', 'foo', 'bar'],
'trigger_id': '1245634',
'trigger_kind': 'GitHub',
'trigger_metadata': {
"default_branch": "master",
"ref": "refs/heads/somebranch",
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d"
}
}, subpage='/build/%s' % build_uuid)
def get_summary(self, event_data, notification_data):
return 'Build started ' + _build_summary(event_data)
class BuildSuccessEvent(BaseBuildEvent):
@classmethod
def event_name(cls):
return 'build_success'
def get_level(self, event_data, notification_data):
return 'success'
def get_sample_data(self, notification):
build_uuid = 'fake-build-id'
return build_event_data(notification.repository, {
'build_id': build_uuid,
'build_name': 'some-fake-build',
'docker_tags': ['latest', 'foo', 'bar'],
'trigger_id': '1245634',
'trigger_kind': 'GitHub',
'trigger_metadata': {
"default_branch": "master",
"ref": "refs/heads/somebranch",
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d"
},
'image_id': '1245657346'
}, subpage='/build/%s' % build_uuid)
def get_summary(self, event_data, notification_data):
return 'Build succeeded ' + _build_summary(event_data)
class BuildFailureEvent(BaseBuildEvent):
@classmethod
def event_name(cls):
return 'build_failure'
def get_level(self, event_data, notification_data):
return 'error'
def get_sample_data(self, notification):
build_uuid = 'fake-build-id'
return build_event_data(notification.repository, {
'build_id': build_uuid,
'build_name': 'some-fake-build',
'docker_tags': ['latest', 'foo', 'bar'],
'trigger_kind': 'GitHub',
'error_message': 'This is a fake error message',
'trigger_id': '1245634',
'trigger_kind': 'GitHub',
'trigger_metadata': {
"default_branch": "master",
"ref": "refs/heads/somebranch",
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d",
"commit_info": {
'url': 'http://path/to/the/commit',
'message': 'Some commit message',
'date': time.mktime(datetime.now().timetuple()),
'author': {
'username': 'fakeauthor',
'url': 'http://path/to/fake/author/in/scm',
'avatar_url': 'http://www.gravatar.com/avatar/fakehash'
}
}
}
}, subpage='/build?current=%s' % build_uuid)
def get_summary(self, event_data, notification_data):
return 'Build failure ' + _build_summary(event_data)
class BuildCancelledEvent(BaseBuildEvent):
@classmethod
def event_name(cls):
return 'build_cancelled'
def get_level(self, event_data, notification_data):
return 'info'
def get_sample_data(self, notification):
build_uuid = 'fake-build-id'
return build_event_data(notification.repository, {
'build_id': build_uuid,
'build_name': 'some-fake-build',
'docker_tags': ['latest', 'foo', 'bar'],
'trigger_id': '1245634',
'trigger_kind': 'GitHub',
'trigger_metadata': {
"default_branch": "master",
"ref": "refs/heads/somebranch",
"commit": "42d4a62c53350993ea41069e9f2cfdefb0df097d"
},
'image_id': '1245657346'
}, subpage='/build/%s' % build_uuid)
def get_summary(self, event_data, notification_data):
return 'Build cancelled ' + _build_summary(event_data)

View file

@ -0,0 +1,85 @@
import json
from contextlib import contextmanager
from app import app, notification_queue
from data import model
from auth.auth_context import get_authenticated_user, get_validated_oauth_token
from endpoints.notificationmethod import _get_namespace_name_from
DEFAULT_BATCH_SIZE = 1000
def build_event_data(repo, extra_data=None, subpage=None):
repo_string = '%s/%s' % (_get_namespace_name_from(repo), repo.name)
homepage = '%s://%s/repository/%s' % (app.config['PREFERRED_URL_SCHEME'],
app.config['SERVER_HOSTNAME'],
repo_string)
if subpage:
if not subpage.startswith('/'):
subpage = '/' + subpage
homepage = homepage + subpage
event_data = {
'repository': repo_string,
'namespace': _get_namespace_name_from(repo),
'name': repo.name,
'docker_url': '%s/%s' % (app.config['SERVER_HOSTNAME'], repo_string),
'homepage': homepage,
}
event_data.update(extra_data or {})
return event_data
def build_notification_data(notification, event_data, performer_data=None):
if not performer_data:
performer_data = {}
oauth_token = get_validated_oauth_token()
if oauth_token:
performer_data['oauth_token_id'] = oauth_token.id
performer_data['oauth_token_application_id'] = oauth_token.application.client_id
performer_data['oauth_token_application'] = oauth_token.application.name
performer_user = get_authenticated_user()
if performer_user:
performer_data['entity_id'] = performer_user.id
performer_data['entity_name'] = performer_user.username
return {
'notification_uuid': notification.uuid,
'event_data': event_data,
'performer_data': performer_data,
}
@contextmanager
def notification_batch(batch_size=DEFAULT_BATCH_SIZE):
"""
Context manager implementation which returns a target callable with the same signature
as spawn_notification. When the the context block exits the notifications generated by
the callable will be bulk inserted into the queue with the specified batch size.
"""
with notification_queue.batch_insert(batch_size) as queue_put:
def spawn_notification_batch(repo, event_name, extra_data=None, subpage=None, pathargs=None,
performer_data=None):
event_data = build_event_data(repo, extra_data=extra_data, subpage=subpage)
notifications = model.notification.list_repo_notifications(repo.namespace_name,
repo.name,
event_name=event_name)
path = [repo.namespace_name, repo.name, event_name] + (pathargs or [])
for notification in list(notifications):
notification_data = build_notification_data(notification, event_data, performer_data)
queue_put(path, json.dumps(notification_data))
yield spawn_notification_batch
def spawn_notification(repo, event_name, extra_data=None, subpage=None, pathargs=None,
performer_data=None):
with notification_batch(1) as batch_spawn:
batch_spawn(repo, event_name, extra_data, subpage, pathargs, performer_data)

View file

@ -0,0 +1,433 @@
import logging
import re
import json
import requests
from flask_mail import Message
from app import mail, app, OVERRIDE_CONFIG_DIRECTORY
from data import model
from util.config.validator import SSL_FILENAMES
from workers.queueworker import JobException
logger = logging.getLogger(__name__)
METHOD_TIMEOUT = app.config.get('NOTIFICATION_SEND_TIMEOUT', 10) # Seconds
class InvalidNotificationMethodException(Exception):
pass
class CannotValidateNotificationMethodException(Exception):
pass
class NotificationMethodPerformException(JobException):
pass
def _ssl_cert():
if app.config['PREFERRED_URL_SCHEME'] == 'https':
return [OVERRIDE_CONFIG_DIRECTORY + f for f in SSL_FILENAMES]
return None
class NotificationMethod(object):
def __init__(self):
pass
@classmethod
def method_name(cls):
"""
Particular method implemented by subclasses.
"""
raise NotImplementedError
def validate(self, repository, config_data):
"""
Validates that the notification can be created with the given data. Throws
a CannotValidateNotificationMethodException on failure.
"""
raise NotImplementedError
def perform(self, notification_obj, event_handler, notification_data):
"""
Performs the notification method.
notification_obj: The noticication namedtuple.
event_handler: The NotificationEvent handler.
notification_data: The dict of notification data placed in the queue.
"""
raise NotImplementedError
@classmethod
def get_method(cls, methodname):
for subc in cls.__subclasses__():
if subc.method_name() == methodname:
return subc()
raise InvalidNotificationMethodException('Unable to find method: %s' % methodname)
class QuayNotificationMethod(NotificationMethod):
@classmethod
def method_name(cls):
return 'quay_notification'
def validate(self, repository, config_data):
_, err_message, _ = self.find_targets(repository, config_data)
if err_message:
raise CannotValidateNotificationMethodException(err_message)
def find_targets(self, repository, config_data):
target_info = config_data['target']
if target_info['kind'] == 'user':
target = model.user.get_nonrobot_user(target_info['name'])
if not target:
# Just to be safe.
return (True, 'Unknown user %s' % target_info['name'], [])
return (True, None, [target])
elif target_info['kind'] == 'org':
target = model.organization.get_organization(target_info['name'])
if not target:
# Just to be safe.
return (True, 'Unknown organization %s' % target_info['name'], None)
# Only repositories under the organization can cause notifications to that org.
if target_info['name'] != repository.namespace_name:
return (False, 'Organization name must match repository namespace')
return (True, None, [target])
elif target_info['kind'] == 'team':
# Lookup the team.
org_team = None
try:
org_team = model.team.get_organization_team(repository.namespace_name, target_info['name'])
except model.InvalidTeamException:
# Probably deleted.
return (True, 'Unknown team %s' % target_info['name'], None)
# Lookup the team's members
return (True, None, model.organization.get_organization_team_members(org_team.id))
def perform(self, notification_obj, event_handler, notification_data):
repository = notification_obj.repository
if not repository:
# Probably deleted.
return
# Lookup the target user or team to which we'll send the notification.
config_data = notification_obj.method_config_dict
status, err_message, target_users = self.find_targets(repository, config_data)
if not status:
raise NotificationMethodPerformException(err_message)
# For each of the target users, create a notification.
for target_user in set(target_users or []):
model.notification.create_notification(event_handler.event_name(), target_user,
metadata=notification_data['event_data'])
class EmailMethod(NotificationMethod):
@classmethod
def method_name(cls):
return 'email'
def validate(self, repository, config_data):
email = config_data.get('email', '')
if not email:
raise CannotValidateNotificationMethodException('Missing e-mail address')
record = model.repository.get_email_authorized_for_repo(repository.namespace_name,
repository.name, email)
if not record or not record.confirmed:
raise CannotValidateNotificationMethodException('The specified e-mail address '
'is not authorized to receive '
'notifications for this repository')
def perform(self, notification_obj, event_handler, notification_data):
config_data = notification_obj.method_config_dict
email = config_data.get('email', '')
if not email:
return
with app.app_context():
msg = Message(event_handler.get_summary(notification_data['event_data'], notification_data),
recipients=[email])
msg.html = event_handler.get_message(notification_data['event_data'], notification_data)
try:
mail.send(msg)
except Exception as ex:
logger.exception('Email was unable to be sent')
raise NotificationMethodPerformException(ex.message)
class WebhookMethod(NotificationMethod):
@classmethod
def method_name(cls):
return 'webhook'
def validate(self, repository, config_data):
url = config_data.get('url', '')
if not url:
raise CannotValidateNotificationMethodException('Missing webhook URL')
def perform(self, notification_obj, event_handler, notification_data):
config_data = notification_obj.method_config_dict
url = config_data.get('url', '')
if not url:
return
payload = notification_data['event_data']
headers = {'Content-type': 'application/json'}
try:
resp = requests.post(url, data=json.dumps(payload), headers=headers, cert=_ssl_cert(),
timeout=METHOD_TIMEOUT)
if resp.status_code / 100 != 2:
error_message = '%s response for webhook to url: %s' % (resp.status_code, url)
logger.error(error_message)
logger.error(resp.content)
raise NotificationMethodPerformException(error_message)
except requests.exceptions.RequestException as ex:
logger.exception('Webhook was unable to be sent')
raise NotificationMethodPerformException(ex.message)
class FlowdockMethod(NotificationMethod):
""" Method for sending notifications to Flowdock via the Team Inbox API:
https://www.flowdock.com/api/team-inbox
"""
@classmethod
def method_name(cls):
return 'flowdock'
def validate(self, repository, config_data):
token = config_data.get('flow_api_token', '')
if not token:
raise CannotValidateNotificationMethodException('Missing Flowdock API Token')
def perform(self, notification_obj, event_handler, notification_data):
config_data = notification_obj.method_config_dict
token = config_data.get('flow_api_token', '')
if not token:
return
owner = model.user.get_user_or_org(notification_obj.repository.namespace_name)
if not owner:
# Something went wrong.
return
url = 'https://api.flowdock.com/v1/messages/team_inbox/%s' % token
headers = {'Content-type': 'application/json'}
payload = {
'source': 'Quay',
'from_address': 'support@quay.io',
'subject': event_handler.get_summary(notification_data['event_data'], notification_data),
'content': event_handler.get_message(notification_data['event_data'], notification_data),
'from_name': owner.username,
'project': (notification_obj.repository.namespace_name + ' ' +
notification_obj.repository.name),
'tags': ['#' + event_handler.event_name()],
'link': notification_data['event_data']['homepage']
}
try:
resp = requests.post(url, data=json.dumps(payload), headers=headers, timeout=METHOD_TIMEOUT)
if resp.status_code / 100 != 2:
error_message = '%s response for flowdock to url: %s' % (resp.status_code, url)
logger.error(error_message)
logger.error(resp.content)
raise NotificationMethodPerformException(error_message)
except requests.exceptions.RequestException as ex:
logger.exception('Flowdock method was unable to be sent')
raise NotificationMethodPerformException(ex.message)
class HipchatMethod(NotificationMethod):
""" Method for sending notifications to Hipchat via the API:
https://www.hipchat.com/docs/apiv2/method/send_room_notification
"""
@classmethod
def method_name(cls):
return 'hipchat'
def validate(self, repository, config_data):
if not config_data.get('notification_token', ''):
raise CannotValidateNotificationMethodException('Missing Hipchat Room Notification Token')
if not config_data.get('room_id', ''):
raise CannotValidateNotificationMethodException('Missing Hipchat Room ID')
def perform(self, notification_obj, event_handler, notification_data):
config_data = notification_obj.method_config_dict
token = config_data.get('notification_token', '')
room_id = config_data.get('room_id', '')
if not token or not room_id:
return
owner = model.user.get_user_or_org(notification_obj.repository.namespace_name)
if not owner:
# Something went wrong.
return
url = 'https://api.hipchat.com/v2/room/%s/notification?auth_token=%s' % (room_id, token)
level = event_handler.get_level(notification_data['event_data'], notification_data)
color = {
'info': 'gray',
'warning': 'yellow',
'error': 'red',
'success': 'green',
'primary': 'purple'
}.get(level, 'gray')
headers = {'Content-type': 'application/json'}
payload = {
'color': color,
'message': event_handler.get_message(notification_data['event_data'], notification_data),
'notify': level == 'error',
'message_format': 'html',
}
try:
resp = requests.post(url, data=json.dumps(payload), headers=headers, timeout=METHOD_TIMEOUT)
if resp.status_code / 100 != 2:
error_message = '%s response for hipchat to url: %s' % (resp.status_code, url)
logger.error(error_message)
logger.error(resp.content)
raise NotificationMethodPerformException(error_message)
except requests.exceptions.RequestException as ex:
logger.exception('Hipchat method was unable to be sent')
raise NotificationMethodPerformException(ex.message)
from HTMLParser import HTMLParser
class SlackAdjuster(HTMLParser):
def __init__(self):
self.reset()
self.result = []
def handle_data(self, d):
self.result.append(d)
def get_attr(self, attrs, name):
for attr in attrs:
if attr[0] == name:
return attr[1]
return ''
def handle_starttag(self, tag, attrs):
if tag == 'a':
self.result.append('<%s|' % (self.get_attr(attrs, 'href'),))
if tag == 'i':
self.result.append('_')
if tag == 'b' or tag == 'strong':
self.result.append('*')
if tag == 'img':
self.result.append('')
def handle_endtag(self, tag):
if tag == 'a':
self.result.append('>')
if tag == 'b' or tag == 'strong':
self.result.append('*')
if tag == 'i':
self.result.append('_')
def get_data(self):
return ''.join(self.result)
def adjust_tags(html):
s = SlackAdjuster()
s.feed(html)
return s.get_data()
class SlackMethod(NotificationMethod):
""" Method for sending notifications to Slack via the API:
https://api.slack.com/docs/attachments
"""
@classmethod
def method_name(cls):
return 'slack'
def validate(self, repository, config_data):
if not config_data.get('url', ''):
raise CannotValidateNotificationMethodException('Missing Slack Callback URL')
def format_for_slack(self, message):
message = message.replace('\n', '')
message = re.sub(r'\s+', ' ', message)
message = message.replace('<br>', '\n')
return adjust_tags(message)
def perform(self, notification_obj, event_handler, notification_data):
config_data = notification_obj.method_config_dict
url = config_data.get('url', '')
if not url:
return
owner = model.user.get_user_or_org(notification_obj.repository.namespace_name)
if not owner:
# Something went wrong.
return
level = event_handler.get_level(notification_data['event_data'], notification_data)
color = {
'info': '#ffffff',
'warning': 'warning',
'error': 'danger',
'success': 'good',
'primary': 'good'
}.get(level, '#ffffff')
summary = event_handler.get_summary(notification_data['event_data'], notification_data)
message = event_handler.get_message(notification_data['event_data'], notification_data)
headers = {'Content-type': 'application/json'}
payload = {
'text': summary,
'username': 'quayiobot',
'attachments': [
{
'fallback': summary,
'text': self.format_for_slack(message),
'color': color,
'mrkdwn_in': ["text"]
}
]
}
try:
resp = requests.post(url, data=json.dumps(payload), headers=headers, timeout=METHOD_TIMEOUT)
if resp.status_code / 100 != 2:
error_message = '%s response for Slack to url: %s' % (resp.status_code, url)
logger.error(error_message)
logger.error(resp.content)
raise NotificationMethodPerformException(error_message)
except requests.exceptions.RequestException as ex:
logger.exception('Slack method was unable to be sent: %s', ex.message)
raise NotificationMethodPerformException(ex.message)

View file

@ -0,0 +1,31 @@
from notifications.notificationevent import NotificationEvent
from util.morecollections import AttrDict
from test.fixtures import *
def test_all_notifications(app):
# Create a test notification.
test_notification = AttrDict({
'repository': AttrDict({
'namespace_name': AttrDict(dict(username='foo')),
'name': 'bar',
}),
'event_config_dict': {
'level': 'low',
},
})
for subc in NotificationEvent.__subclasses__():
if subc.event_name() is not None:
# Create the notification event.
found = NotificationEvent.get_event(subc.event_name())
sample_data = found.get_sample_data(test_notification)
# Make sure all calls succeed.
notification_data = {
'performer_data': {},
}
found.get_level(sample_data, notification_data)
found.get_summary(sample_data, notification_data)
found.get_message(sample_data, notification_data)