import json import logging import re import requests from flask_mail import Message from app import mail, app, OVERRIDE_CONFIG_DIRECTORY from data import model from util.config.validator import SSL_FILENAMES from workers.queueworker import JobException logger = logging.getLogger(__name__) METHOD_TIMEOUT = app.config.get('NOTIFICATION_SEND_TIMEOUT', 10) # Seconds class InvalidNotificationMethodException(Exception): pass class CannotValidateNotificationMethodException(Exception): pass class NotificationMethodPerformException(JobException): pass def _get_namespace_name_from(repository): # TODO Charlie 2017-07-14: This is hack for a bug in production # because in some places have started calling this method with # pre oci models and in some we have started calling with non pre oci models. We should # remove this when we have switched over to database interfaces. if hasattr(repository, 'namespace_name'): namespace_name = repository.namespace_name else: namespace_name = repository.namespace_user.username return namespace_name SSLClientCert = None if app.config['PREFERRED_URL_SCHEME'] == 'https': # TODO(jschorr): move this into the config provider library SSLClientCert = [OVERRIDE_CONFIG_DIRECTORY + f for f in SSL_FILENAMES] class NotificationMethod(object): def __init__(self): pass @classmethod def method_name(cls): """ Particular method implemented by subclasses. """ raise NotImplementedError def validate(self, namespace_name, repository_name, config_data): """ Validates that the notification can be created with the given data. Throws a CannotValidateNotificationMethodException on failure. """ raise NotImplementedError def perform(self, notification_obj, event_handler, notification_data): """ Performs the notification method. notification_obj: The noticication namedtuple. event_handler: The NotificationEvent handler. notification_data: The dict of notification data placed in the queue. """ raise NotImplementedError @classmethod def get_method(cls, methodname): for subc in cls.__subclasses__(): if subc.method_name() == methodname: return subc() raise InvalidNotificationMethodException('Unable to find method: %s' % methodname) class QuayNotificationMethod(NotificationMethod): @classmethod def method_name(cls): return 'quay_notification' def validate(self, namespace_name, repository_name, config_data): status, err_message, target_users = self.find_targets(namespace_name, repository_name, config_data) if err_message: raise CannotValidateNotificationMethodException(err_message) def find_targets(self, namespace_name, repository_name, config_data): target_info = config_data['target'] if target_info['kind'] == 'user': target = model.user.get_nonrobot_user(target_info['name']) if not target: # Just to be safe. return (True, 'Unknown user %s' % target_info['name'], []) return (True, None, [target]) elif target_info['kind'] == 'org': target = model.organization.get_organization(target_info['name']) if not target: # Just to be safe. return (True, 'Unknown organization %s' % target_info['name'], None) # Only repositories under the organization can cause notifications to that org. if target_info['name'] != _get_namespace_name_from(repository): return (False, 'Organization name must match repository namespace') return (True, None, [target]) elif target_info['kind'] == 'team': # Lookup the team. org_team = None try: org_team = model.team.get_organization_team(_get_namespace_name_from(repository), target_info['name']) except model.InvalidTeamException: # Probably deleted. return (True, 'Unknown team %s' % target_info['name'], None) # Lookup the team's members return (True, None, model.organization.get_organization_team_members(org_team.id)) def perform(self, notification_obj, event_handler, notification_data): repository = notification_obj.repository if not repository: # Probably deleted. return # Lookup the target user or team to which we'll send the notification. config_data = notification_obj.method_config_dict status, err_message, target_users = self.find_targets(_get_namespace_name_from(repository), repository.name, config_data) if not status: raise NotificationMethodPerformException(err_message) # For each of the target users, create a notification. for target_user in set(target_users or []): model.notification.create_notification(event_handler.event_name(), target_user, metadata=notification_data['event_data']) class EmailMethod(NotificationMethod): @classmethod def method_name(cls): return 'email' def validate(self, namespace_name, repository_name, config_data): email = config_data.get('email', '') if not email: raise CannotValidateNotificationMethodException('Missing e-mail address') record = model.repository.get_email_authorized_for_repo(_get_namespace_name_from(repository), repository.name, email) if not record or not record.confirmed: raise CannotValidateNotificationMethodException('The specified e-mail address ' 'is not authorized to receive ' 'notifications for this repository') def perform(self, notification_obj, event_handler, notification_data): config_data = notification_obj.method_config_dict email = config_data.get('email', '') if not email: return with app.app_context(): msg = Message(event_handler.get_summary(notification_data['event_data'], notification_data), recipients=[email]) msg.html = event_handler.get_message(notification_data['event_data'], notification_data) try: mail.send(msg) except Exception as ex: logger.exception('Email was unable to be sent: %s' % ex.message) raise NotificationMethodPerformException(ex.message) class WebhookMethod(NotificationMethod): @classmethod def method_name(cls): return 'webhook' def validate(self, namespace_name, repository_name, config_data): url = config_data.get('url', '') if not url: raise CannotValidateNotificationMethodException('Missing webhook URL') def perform(self, notification_obj, event_handler, notification_data): config_data = notification_obj.method_config_dict url = config_data.get('url', '') if not url: return payload = notification_data['event_data'] headers = {'Content-type': 'application/json'} try: resp = requests.post(url, data=json.dumps(payload), headers=headers, cert=SSLClientCert, timeout=METHOD_TIMEOUT) if resp.status_code / 100 != 2: error_message = '%s response for webhook to url: %s' % (resp.status_code, url) logger.error(error_message) logger.error(resp.content) raise NotificationMethodPerformException(error_message) except requests.exceptions.RequestException as ex: logger.exception('Webhook was unable to be sent: %s' % ex.message) raise NotificationMethodPerformException(ex.message) class FlowdockMethod(NotificationMethod): """ Method for sending notifications to Flowdock via the Team Inbox API: https://www.flowdock.com/api/team-inbox """ @classmethod def method_name(cls): return 'flowdock' def validate(self, namespace_name, repository_name, config_data): token = config_data.get('flow_api_token', '') if not token: raise CannotValidateNotificationMethodException('Missing Flowdock API Token') def perform(self, notification_obj, event_handler, notification_data): config_data = notification_obj.method_config_dict token = config_data.get('flow_api_token', '') if not token: return owner = model.user.get_user_or_org(_get_namespace_name_from(notification_obj.repository)) if not owner: # Something went wrong. return url = 'https://api.flowdock.com/v1/messages/team_inbox/%s' % token headers = {'Content-type': 'application/json'} payload = { 'source': 'Quay', 'from_address': 'support@quay.io', 'subject': event_handler.get_summary(notification_data['event_data'], notification_data), 'content': event_handler.get_message(notification_data['event_data'], notification_data), 'from_name': owner.username, 'project': (_get_namespace_name_from(notification_obj.repository)+ ' ' + notification_obj.repository.name), 'tags': ['#' + event_handler.event_name()], 'link': notification_data['event_data']['homepage'] } try: resp = requests.post(url, data=json.dumps(payload), headers=headers, timeout=METHOD_TIMEOUT) if resp.status_code / 100 != 2: error_message = '%s response for flowdock to url: %s' % (resp.status_code, url) logger.error(error_message) logger.error(resp.content) raise NotificationMethodPerformException(error_message) except requests.exceptions.RequestException as ex: logger.exception('Flowdock method was unable to be sent: %s' % ex.message) raise NotificationMethodPerformException(ex.message) class HipchatMethod(NotificationMethod): """ Method for sending notifications to Hipchat via the API: https://www.hipchat.com/docs/apiv2/method/send_room_notification """ @classmethod def method_name(cls): return 'hipchat' def validate(self, namespace_name, repository_name, config_data): if not config_data.get('notification_token', ''): raise CannotValidateNotificationMethodException('Missing Hipchat Room Notification Token') if not config_data.get('room_id', ''): raise CannotValidateNotificationMethodException('Missing Hipchat Room ID') def perform(self, notification_obj, event_handler, notification_data): config_data = notification_obj.method_config_dict token = config_data.get('notification_token', '') room_id = config_data.get('room_id', '') if not token or not room_id: return owner = model.user.get_user_or_org(_get_namespace_name_from(notification_obj.repository)) if not owner: # Something went wrong. return url = 'https://api.hipchat.com/v2/room/%s/notification?auth_token=%s' % (room_id, token) level = event_handler.get_level(notification_data['event_data'], notification_data) color = { 'info': 'gray', 'warning': 'yellow', 'error': 'red', 'success': 'green', 'primary': 'purple' }.get(level, 'gray') headers = {'Content-type': 'application/json'} payload = { 'color': color, 'message': event_handler.get_message(notification_data['event_data'], notification_data), 'notify': level == 'error', 'message_format': 'html', } try: resp = requests.post(url, data=json.dumps(payload), headers=headers, timeout=METHOD_TIMEOUT) if resp.status_code / 100 != 2: error_message = '%s response for hipchat to url: %s' % (resp.status_code, url) logger.error(error_message) logger.error(resp.content) raise NotificationMethodPerformException(error_message) except requests.exceptions.RequestException as ex: logger.exception('Hipchat method was unable to be sent: %s' % ex.message) raise NotificationMethodPerformException(ex.message) from HTMLParser import HTMLParser class SlackAdjuster(HTMLParser): def __init__(self): self.reset() self.result = [] def handle_data(self, d): self.result.append(d) def get_attr(self, attrs, name): for attr in attrs: if attr[0] == name: return attr[1] return '' def handle_starttag(self, tag, attrs): if tag == 'a': self.result.append('<%s|' % (self.get_attr(attrs, 'href'),)) if tag == 'i': self.result.append('_') if tag == 'b' or tag == 'strong': self.result.append('*') if tag == 'img': self.result.append('') def handle_endtag(self, tag): if tag == 'a': self.result.append('>') if tag == 'b' or tag == 'strong': self.result.append('*') if tag == 'i': self.result.append('_') def get_data(self): return ''.join(self.result) def adjust_tags(html): s = SlackAdjuster() s.feed(html) return s.get_data() class SlackMethod(NotificationMethod): """ Method for sending notifications to Slack via the API: https://api.slack.com/docs/attachments """ @classmethod def method_name(cls): return 'slack' def validate(self, namespace_name, repository_name, config_data): if not config_data.get('url', ''): raise CannotValidateNotificationMethodException('Missing Slack Callback URL') def format_for_slack(self, message): message = message.replace('\n', '') message = re.sub(r'\s+', ' ', message) message = message.replace('
', '\n') return adjust_tags(message) def perform(self, notification_obj, event_handler, notification_data): config_data = notification_obj.method_config_dict url = config_data.get('url', '') if not url: return owner = model.user.get_user_or_org(_get_namespace_name_from(notification_obj.repository)) if not owner: # Something went wrong. return level = event_handler.get_level(notification_data['event_data'], notification_data) color = { 'info': '#ffffff', 'warning': 'warning', 'error': 'danger', 'success': 'good', 'primary': 'good' }.get(level, '#ffffff') summary = event_handler.get_summary(notification_data['event_data'], notification_data) message = event_handler.get_message(notification_data['event_data'], notification_data) headers = {'Content-type': 'application/json'} payload = { 'text': summary, 'username': 'quayiobot', 'attachments': [ { 'fallback': summary, 'text': self.format_for_slack(message), 'color': color, 'mrkdwn_in': ["text"] } ] } try: resp = requests.post(url, data=json.dumps(payload), headers=headers, timeout=METHOD_TIMEOUT) if resp.status_code / 100 != 2: error_message = '%s response for Slack to url: %s' % (resp.status_code, url) logger.error(error_message) logger.error(resp.content) raise NotificationMethodPerformException(error_message) except requests.exceptions.RequestException as ex: logger.exception('Slack method was unable to be sent: %s', ex.message) raise NotificationMethodPerformException(ex.message)