This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/notificationmethod.py

242 lines
7.8 KiB
Python

import logging
import io
import os.path
import tarfile
import base64
import json
import requests
from flask.ext.mail import Message
from app import mail, app, get_app_url
from data import model
logger = logging.getLogger(__name__)
class InvalidNotificationMethodException(Exception):
pass
class CannotValidateNotificationMethodException(Exception):
pass
class NotificationMethod(object):
def __init__(self):
pass
@classmethod
def method_name(cls):
"""
Particular method implemented by subclasses.
"""
raise NotImplementedError
def validate(self, repository, config_data):
"""
Validates that the notification can be created with the given data. Throws
a CannotValidateNotificationMethodException on failure.
"""
raise NotImplementedError
def perform(self, notification, event_handler, notification_data):
"""
Performs the notification method.
notification: The noticication record itself.
event_handler: The NotificationEvent handler.
notification_data: The dict of notification data placed in the queue.
"""
raise NotImplementedError
@classmethod
def get_method(cls, methodname):
for subc in cls.__subclasses__():
if subc.method_name() == methodname:
return subc()
raise InvalidNotificationMethodException('Unable to find method: %s' % methodname)
class QuayNotificationMethod(NotificationMethod):
@classmethod
def method_name(cls):
return 'quay_notification'
def validate(self, repository, config_data):
status, err_message, target_users = self.find_targets(repository, config_data)
if err_message:
raise CannotValidateNotificationMethodException(err_message)
def find_targets(self, repository, config_data):
target_info = config_data['target']
if target_info['kind'] == 'user':
target = model.get_user(target_info['name'])
if not target:
# Just to be safe.
return (True, 'Unknown user %s' % target_info['name'], [])
return (True, None, [target])
elif target_info['kind'] == 'org':
target = model.get_organization(target_info['name'])
if not target:
# Just to be safe.
return (True, 'Unknown organization %s' % target_info['name'], None)
# Only repositories under the organization can cause notifications to that org.
if target_info['name'] != repository.namespace:
return (False, 'Organization name must match repository namespace')
return (True, None, [target])
elif target_info['kind'] == 'team':
# Lookup the team.
team = None
try:
team = model.get_organization_team(repository.namespace, target_info['name'])
except model.InvalidTeamException:
# Probably deleted.
return (True, 'Unknown team %s' % target_info['name'], None)
# Lookup the team's members
return (True, None, model.get_organization_team_members(team.id))
def perform(self, notification, event_handler, notification_data):
repository = notification.repository
if not repository:
# Probably deleted.
return True
# Lookup the target user or team to which we'll send the notification.
config_data = json.loads(notification.config_json)
status, err_message, target_users = self.find_targets(repository, config_data)
if not status:
return False
# For each of the target users, create a notification.
for target_user in set(target_users or []):
model.create_notification(event_handler.event_name(), target_user,
metadata=notification_data['event_data'])
return True
class EmailMethod(NotificationMethod):
@classmethod
def method_name(cls):
return 'email'
def validate(self, repository, config_data):
email = config_data.get('email', '')
if not email:
raise CannotValidateNotificationMethodException('Missing e-mail address')
record = model.get_email_authorized_for_repo(repository.namespace, repository.name, email)
if not record or not record.confirmed:
raise CannotValidateNotificationMethodException('The specified e-mail address '
'is not authorized to receive '
'notifications for this repository')
def perform(self, notification, event_handler, notification_data):
config_data = json.loads(notification.config_json)
email = config_data.get('email', '')
if not email:
return False
msg = Message(event_handler.get_summary(notification_data['event_data'], notification_data),
sender='support@quay.io',
recipients=[email])
msg.html = event_handler.get_message(notification_data['event_data'], notification_data)
try:
with app.app_context():
mail.send(msg)
except Exception as ex:
logger.exception('Email was unable to be sent: %s' % ex.message)
return False
return True
class WebhookMethod(NotificationMethod):
@classmethod
def method_name(cls):
return 'webhook'
def validate(self, repository, config_data):
url = config_data.get('url', '')
if not url:
raise CannotValidateNotificationMethodException('Missing webhook URL')
def perform(self, notification, event_handler, notification_data):
config_data = json.loads(notification.config_json)
url = config_data.get('url', '')
if not url:
return False
payload = notification_data['event_data']
headers = {'Content-type': 'application/json'}
try:
resp = requests.post(url, data=json.dumps(payload), headers=headers)
if resp.status_code/100 != 2:
logger.error('%s response for webhook to url: %s' % (resp.status_code,
url))
return False
except requests.exceptions.RequestException as ex:
logger.exception('Webhook was unable to be sent: %s' % ex.message)
return False
return True
class FlowdockMethod(NotificationMethod):
""" Method for sending notifications to Flowdock via the Team Inbox API:
https://www.flowdock.com/api/team-inbox
"""
@classmethod
def method_name(cls):
return 'flowdock'
def validate(self, repository, config_data):
token = config_data.get('flow_api_token', '')
if not token:
raise CannotValidateNotificationMethodException('Missing Flowdock API Token')
def perform(self, notification, event_handler, notification_data):
config_data = json.loads(notification.config_json)
token = config_data.get('flow_api_token', '')
if not token:
return False
owner = model.get_user(notification.repository.namespace)
if not owner:
# Something went wrong.
return False
url = 'https://api.flowdock.com/v1/messages/team_inbox/%s' % token
headers = {'Content-type': 'application/json'}
payload = {
'source': 'Quay',
'from_address': 'support@quay.io',
'subject': event_handler.get_summary(notification_data['event_data'], notification_data),
'content': event_handler.get_message(notification_data['event_data'], notification_data),
'from_name': owner.username,
'project': notification.repository.namespace + ' ' + notification.repository.name,
'tags': ['#' + event_handler.event_name()],
'link': notification_data['event_data']['homepage']
}
try:
resp = requests.post(url, data=json.dumps(payload), headers=headers)
if resp.status_code/100 != 2:
logger.error('%s response for flowdock to url: %s' % (resp.status_code,
url))
logger.error(resp.content)
return False
except requests.exceptions.RequestException as ex:
logger.exception('Flowdock method was unable to be sent: %s' % ex.message)
return False
return True