This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/notificationmethod.py

419 lines
14 KiB
Python

import logging
import json
import requests
import re
from flask.ext.mail import Message
from app import mail, app
from data import model
from workers.worker import JobException
logger = logging.getLogger(__name__)
class InvalidNotificationMethodException(Exception):
pass
class CannotValidateNotificationMethodException(Exception):
pass
class NotificationMethodPerformException(JobException):
pass
class NotificationMethod(object):
def __init__(self):
pass
@classmethod
def method_name(cls):
"""
Particular method implemented by subclasses.
"""
raise NotImplementedError
def validate(self, repository, config_data):
"""
Validates that the notification can be created with the given data. Throws
a CannotValidateNotificationMethodException on failure.
"""
raise NotImplementedError
def perform(self, notification_obj, event_handler, notification_data):
"""
Performs the notification method.
notification_obj: The noticication record itself.
event_handler: The NotificationEvent handler.
notification_data: The dict of notification data placed in the queue.
"""
raise NotImplementedError
@classmethod
def get_method(cls, methodname):
for subc in cls.__subclasses__():
if subc.method_name() == methodname:
return subc()
raise InvalidNotificationMethodException('Unable to find method: %s' % methodname)
class QuayNotificationMethod(NotificationMethod):
@classmethod
def method_name(cls):
return 'quay_notification'
def validate(self, repository, config_data):
status, err_message, target_users = self.find_targets(repository, config_data)
if err_message:
raise CannotValidateNotificationMethodException(err_message)
def find_targets(self, repository, config_data):
target_info = config_data['target']
if target_info['kind'] == 'user':
target = model.user.get_nonrobot_user(target_info['name'])
if not target:
# Just to be safe.
return (True, 'Unknown user %s' % target_info['name'], [])
return (True, None, [target])
elif target_info['kind'] == 'org':
target = model.organization.get_organization(target_info['name'])
if not target:
# Just to be safe.
return (True, 'Unknown organization %s' % target_info['name'], None)
# Only repositories under the organization can cause notifications to that org.
if target_info['name'] != repository.namespace_user.username:
return (False, 'Organization name must match repository namespace')
return (True, None, [target])
elif target_info['kind'] == 'team':
# Lookup the team.
org_team = None
try:
org_team = model.team.get_organization_team(repository.namespace_user.username,
target_info['name'])
except model.InvalidTeamException:
# Probably deleted.
return (True, 'Unknown team %s' % target_info['name'], None)
# Lookup the team's members
return (True, None, model.organization.get_organization_team_members(org_team.id))
def perform(self, notification_obj, event_handler, notification_data):
repository = notification_obj.repository
if not repository:
# Probably deleted.
return
# Lookup the target user or team to which we'll send the notification.
config_data = json.loads(notification_obj.config_json)
status, err_message, target_users = self.find_targets(repository, config_data)
if not status:
raise NotificationMethodPerformException(err_message)
# For each of the target users, create a notification.
for target_user in set(target_users or []):
model.notification.create_notification(event_handler.event_name(), target_user,
metadata=notification_data['event_data'])
class EmailMethod(NotificationMethod):
@classmethod
def method_name(cls):
return 'email'
def validate(self, repository, config_data):
email = config_data.get('email', '')
if not email:
raise CannotValidateNotificationMethodException('Missing e-mail address')
record = model.repository.get_email_authorized_for_repo(repository.namespace_user.username,
repository.name, email)
if not record or not record.confirmed:
raise CannotValidateNotificationMethodException('The specified e-mail address '
'is not authorized to receive '
'notifications for this repository')
def perform(self, notification_obj, event_handler, notification_data):
config_data = json.loads(notification_obj.config_json)
email = config_data.get('email', '')
if not email:
return
msg = Message(event_handler.get_summary(notification_data['event_data'], notification_data),
sender='support@quay.io',
recipients=[email])
msg.html = event_handler.get_message(notification_data['event_data'], notification_data)
try:
with app.app_context():
mail.send(msg)
except Exception as ex:
logger.exception('Email was unable to be sent: %s' % ex.message)
raise NotificationMethodPerformException(ex.message)
class WebhookMethod(NotificationMethod):
@classmethod
def method_name(cls):
return 'webhook'
def validate(self, repository, config_data):
url = config_data.get('url', '')
if not url:
raise CannotValidateNotificationMethodException('Missing webhook URL')
def perform(self, notification_obj, event_handler, notification_data):
config_data = json.loads(notification_obj.config_json)
url = config_data.get('url', '')
if not url:
return
payload = notification_data['event_data']
headers = {'Content-type': 'application/json'}
try:
resp = requests.post(url, data=json.dumps(payload), headers=headers)
if resp.status_code/100 != 2:
error_message = '%s response for webhook to url: %s' % (resp.status_code, url)
logger.error(error_message)
logger.error(resp.content)
raise NotificationMethodPerformException(error_message)
except requests.exceptions.RequestException as ex:
logger.exception('Webhook was unable to be sent: %s' % ex.message)
raise NotificationMethodPerformException(ex.message)
class FlowdockMethod(NotificationMethod):
""" Method for sending notifications to Flowdock via the Team Inbox API:
https://www.flowdock.com/api/team-inbox
"""
@classmethod
def method_name(cls):
return 'flowdock'
def validate(self, repository, config_data):
token = config_data.get('flow_api_token', '')
if not token:
raise CannotValidateNotificationMethodException('Missing Flowdock API Token')
def perform(self, notification_obj, event_handler, notification_data):
config_data = json.loads(notification_obj.config_json)
token = config_data.get('flow_api_token', '')
if not token:
return
owner = model.user.get_user_or_org(notification_obj.repository.namespace_user.username)
if not owner:
# Something went wrong.
return
url = 'https://api.flowdock.com/v1/messages/team_inbox/%s' % token
headers = {'Content-type': 'application/json'}
payload = {
'source': 'Quay',
'from_address': 'support@quay.io',
'subject': event_handler.get_summary(notification_data['event_data'], notification_data),
'content': event_handler.get_message(notification_data['event_data'], notification_data),
'from_name': owner.username,
'project': (notification_obj.repository.namespace_user.username + ' ' +
notification_obj.repository.name),
'tags': ['#' + event_handler.event_name()],
'link': notification_data['event_data']['homepage']
}
try:
resp = requests.post(url, data=json.dumps(payload), headers=headers)
if resp.status_code/100 != 2:
error_message = '%s response for flowdock to url: %s' % (resp.status_code, url)
logger.error(error_message)
logger.error(resp.content)
raise NotificationMethodPerformException(error_message)
except requests.exceptions.RequestException as ex:
logger.exception('Flowdock method was unable to be sent: %s' % ex.message)
raise NotificationMethodPerformException(ex.message)
class HipchatMethod(NotificationMethod):
""" Method for sending notifications to Hipchat via the API:
https://www.hipchat.com/docs/apiv2/method/send_room_notification
"""
@classmethod
def method_name(cls):
return 'hipchat'
def validate(self, repository, config_data):
if not config_data.get('notification_token', ''):
raise CannotValidateNotificationMethodException('Missing Hipchat Room Notification Token')
if not config_data.get('room_id', ''):
raise CannotValidateNotificationMethodException('Missing Hipchat Room ID')
def perform(self, notification_obj, event_handler, notification_data):
config_data = json.loads(notification_obj.config_json)
token = config_data.get('notification_token', '')
room_id = config_data.get('room_id', '')
if not token or not room_id:
return
owner = model.user.get_user_or_org(notification_obj.repository.namespace_user.username)
if not owner:
# Something went wrong.
return
url = 'https://api.hipchat.com/v2/room/%s/notification?auth_token=%s' % (room_id, token)
level = event_handler.get_level(notification_data['event_data'], notification_data)
color = {
'info': 'gray',
'warning': 'yellow',
'error': 'red',
'success': 'green',
'primary': 'purple'
}.get(level, 'gray')
headers = {'Content-type': 'application/json'}
payload = {
'color': color,
'message': event_handler.get_message(notification_data['event_data'], notification_data),
'notify': level == 'error',
'message_format': 'html',
}
try:
resp = requests.post(url, data=json.dumps(payload), headers=headers)
if resp.status_code/100 != 2:
error_message = '%s response for hipchat to url: %s' % (resp.status_code, url)
logger.error(error_message)
logger.error(resp.content)
raise NotificationMethodPerformException(error_message)
except requests.exceptions.RequestException as ex:
logger.exception('Hipchat method was unable to be sent: %s' % ex.message)
raise NotificationMethodPerformException(ex.message)
from HTMLParser import HTMLParser
class SlackAdjuster(HTMLParser):
def __init__(self):
self.reset()
self.result = []
def handle_data(self, d):
self.result.append(d)
def get_attr(self, attrs, name):
for attr in attrs:
if attr[0] == name:
return attr[1]
return ''
def handle_starttag(self, tag, attrs):
if tag == 'a':
self.result.append('<%s|' % (self.get_attr(attrs, 'href'), ))
if tag == 'i':
self.result.append('_')
if tag == 'b' or tag == 'strong':
self.result.append('*')
if tag == 'img':
self.result.append('')
def handle_endtag(self, tag):
if tag == 'a':
self.result.append('>')
if tag == 'b' or tag == 'strong':
self.result.append('*')
if tag == 'i':
self.result.append('_')
def get_data(self):
return ''.join(self.result)
def adjust_tags(html):
s = SlackAdjuster()
s.feed(html)
return s.get_data()
class SlackMethod(NotificationMethod):
""" Method for sending notifications to Slack via the API:
https://api.slack.com/docs/attachments
"""
@classmethod
def method_name(cls):
return 'slack'
def validate(self, repository, config_data):
if not config_data.get('url', ''):
raise CannotValidateNotificationMethodException('Missing Slack Callback URL')
def format_for_slack(self, message):
message = message.replace('\n', '')
message = re.sub(r'\s+', ' ', message)
message = message.replace('<br>', '\n')
return adjust_tags(message)
def perform(self, notification_obj, event_handler, notification_data):
config_data = json.loads(notification_obj.config_json)
url = config_data.get('url', '')
if not url:
return
owner = model.user.get_user_or_org(notification_obj.repository.namespace_user.username)
if not owner:
# Something went wrong.
return
level = event_handler.get_level(notification_data['event_data'], notification_data)
color = {
'info': '#ffffff',
'warning': 'warning',
'error': 'danger',
'success': 'good',
'primary': 'good'
}.get(level, '#ffffff')
summary = event_handler.get_summary(notification_data['event_data'], notification_data)
message = event_handler.get_message(notification_data['event_data'], notification_data)
headers = {'Content-type': 'application/json'}
payload = {
'text': summary,
'username': 'quayiobot',
'attachments': [
{
'fallback': summary,
'text': self.format_for_slack(message),
'color': color,
'mrkdwn_in': ["text"]
}
]
}
try:
resp = requests.post(url, data=json.dumps(payload), headers=headers)
if resp.status_code/100 != 2:
error_message = '%s response for Slack to url: %s' % (resp.status_code, url)
logger.error(error_message)
logger.error(resp.content)
raise NotificationMethodPerformException(error_message)
except requests.exceptions.RequestException as ex:
logger.exception('Slack method was unable to be sent: %s', ex.message)
raise NotificationMethodPerformException(ex.message)