158 lines
5.4 KiB
Python
158 lines
5.4 KiB
Python
import json
|
|
|
|
from peewee import JOIN_LEFT_OUTER
|
|
|
|
from data.model import InvalidNotificationException, db_transaction
|
|
from data.database import (Notification, NotificationKind, User, Team, TeamMember, TeamRole,
|
|
RepositoryNotification, ExternalNotificationEvent, Repository,
|
|
ExternalNotificationMethod, Namespace)
|
|
|
|
|
|
def create_notification(kind_name, target, metadata={}):
|
|
kind_ref = NotificationKind.get(name=kind_name)
|
|
notification = Notification.create(kind=kind_ref, target=target,
|
|
metadata_json=json.dumps(metadata))
|
|
return notification
|
|
|
|
|
|
def create_unique_notification(kind_name, target, metadata={}):
|
|
with db_transaction():
|
|
if list_notifications(target, kind_name, limit=1).count() == 0:
|
|
create_notification(kind_name, target, metadata)
|
|
|
|
|
|
def lookup_notification(user, uuid):
|
|
results = list(list_notifications(user, id_filter=uuid, include_dismissed=True, limit=1))
|
|
if not results:
|
|
return None
|
|
|
|
return results[0]
|
|
|
|
|
|
def list_notifications(user, kind_name=None, id_filter=None, include_dismissed=False,
|
|
page=None, limit=None):
|
|
Org = User.alias()
|
|
AdminTeam = Team.alias()
|
|
AdminTeamMember = TeamMember.alias()
|
|
AdminUser = User.alias()
|
|
|
|
query = (Notification.select()
|
|
.join(User)
|
|
.switch(Notification)
|
|
.join(Org, JOIN_LEFT_OUTER, on=(Org.id == Notification.target))
|
|
.join(AdminTeam, JOIN_LEFT_OUTER, on=(Org.id == AdminTeam.organization))
|
|
.join(TeamRole, JOIN_LEFT_OUTER, on=(AdminTeam.role == TeamRole.id))
|
|
.switch(AdminTeam)
|
|
.join(AdminTeamMember, JOIN_LEFT_OUTER, on=(AdminTeam.id == AdminTeamMember.team))
|
|
.join(AdminUser, JOIN_LEFT_OUTER, on=(AdminTeamMember.user == AdminUser.id))
|
|
.where((Notification.target == user) |
|
|
((AdminUser.id == user) & (TeamRole.name == 'admin')))
|
|
.order_by(Notification.created)
|
|
.desc())
|
|
|
|
if not include_dismissed:
|
|
query = query.switch(Notification).where(Notification.dismissed == False)
|
|
|
|
if kind_name:
|
|
query = (query
|
|
.switch(Notification)
|
|
.join(NotificationKind)
|
|
.where(NotificationKind.name == kind_name))
|
|
|
|
if id_filter:
|
|
query = (query
|
|
.switch(Notification)
|
|
.where(Notification.uuid == id_filter))
|
|
|
|
if page:
|
|
query = query.paginate(page, limit)
|
|
elif limit:
|
|
query = query.limit(limit)
|
|
|
|
return query
|
|
|
|
|
|
def delete_all_notifications_by_kind(kind_name):
|
|
kind_ref = NotificationKind.get(name=kind_name)
|
|
(Notification
|
|
.delete()
|
|
.where(Notification.kind == kind_ref)
|
|
.execute())
|
|
|
|
|
|
def delete_notifications_by_kind(target, kind_name):
|
|
kind_ref = NotificationKind.get(name=kind_name)
|
|
Notification.delete().where(Notification.target == target,
|
|
Notification.kind == kind_ref).execute()
|
|
|
|
|
|
def delete_matching_notifications(target, kind_name, **kwargs):
|
|
kind_ref = NotificationKind.get(name=kind_name)
|
|
|
|
# Load all notifications for the user with the given kind.
|
|
notifications = Notification.select().where(
|
|
Notification.target == target,
|
|
Notification.kind == kind_ref)
|
|
|
|
# For each, match the metadata to the specified values.
|
|
for notification in notifications:
|
|
matches = True
|
|
try:
|
|
metadata = json.loads(notification.metadata_json)
|
|
except:
|
|
continue
|
|
|
|
for (key, value) in kwargs.iteritems():
|
|
if not key in metadata or metadata[key] != value:
|
|
matches = False
|
|
break
|
|
|
|
if not matches:
|
|
continue
|
|
|
|
notification.delete_instance()
|
|
|
|
|
|
def create_repo_notification(repo, event_name, method_name, config):
|
|
event = ExternalNotificationEvent.get(ExternalNotificationEvent.name == event_name)
|
|
method = ExternalNotificationMethod.get(ExternalNotificationMethod.name == method_name)
|
|
|
|
return RepositoryNotification.create(repository=repo, event=event, method=method,
|
|
config_json=json.dumps(config))
|
|
|
|
|
|
def get_repo_notification(uuid):
|
|
try:
|
|
return (RepositoryNotification
|
|
.select(RepositoryNotification, Repository, Namespace)
|
|
.join(Repository)
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
.where(RepositoryNotification.uuid == uuid)
|
|
.get())
|
|
except RepositoryNotification.DoesNotExist:
|
|
raise InvalidNotificationException('No repository notification found with id: %s' % uuid)
|
|
|
|
|
|
def delete_repo_notification(namespace_name, repository_name, uuid):
|
|
found = get_repo_notification(uuid)
|
|
if (found.repository.namespace_user.username != namespace_name or
|
|
found.repository.name != repository_name):
|
|
raise InvalidNotificationException('No repository notifiation found with id: %s' % uuid)
|
|
found.delete_instance()
|
|
return found
|
|
|
|
|
|
def list_repo_notifications(namespace_name, repository_name, event_name=None):
|
|
query = (RepositoryNotification
|
|
.select(RepositoryNotification, Repository, Namespace)
|
|
.join(Repository)
|
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
|
.where(Namespace.username == namespace_name, Repository.name == repository_name))
|
|
|
|
if event_name:
|
|
query = (query
|
|
.switch(RepositoryNotification)
|
|
.join(ExternalNotificationEvent)
|
|
.where(ExternalNotificationEvent.name == event_name))
|
|
|
|
return query
|