import json from peewee import JOIN_LEFT_OUTER from data.model import InvalidNotificationException, db_transaction from data.database import (Notification, NotificationKind, User, Team, TeamMember, TeamRole, RepositoryNotification, ExternalNotificationEvent, Repository, ExternalNotificationMethod, Namespace) def create_notification(kind_name, target, metadata={}): kind_ref = NotificationKind.get(name=kind_name) notification = Notification.create(kind=kind_ref, target=target, metadata_json=json.dumps(metadata)) return notification def create_unique_notification(kind_name, target, metadata={}): with db_transaction(): if list_notifications(target, kind_name, limit=1).count() == 0: create_notification(kind_name, target, metadata) def lookup_notification(user, uuid): results = list(list_notifications(user, id_filter=uuid, include_dismissed=True, limit=1)) if not results: return None return results[0] def list_notifications(user, kind_name=None, id_filter=None, include_dismissed=False, page=None, limit=None): Org = User.alias() AdminTeam = Team.alias() AdminTeamMember = TeamMember.alias() AdminUser = User.alias() query = (Notification.select() .join(User) .switch(Notification) .join(Org, JOIN_LEFT_OUTER, on=(Org.id == Notification.target)) .join(AdminTeam, JOIN_LEFT_OUTER, on=(Org.id == AdminTeam.organization)) .join(TeamRole, JOIN_LEFT_OUTER, on=(AdminTeam.role == TeamRole.id)) .switch(AdminTeam) .join(AdminTeamMember, JOIN_LEFT_OUTER, on=(AdminTeam.id == AdminTeamMember.team)) .join(AdminUser, JOIN_LEFT_OUTER, on=(AdminTeamMember.user == AdminUser.id)) .where((Notification.target == user) | ((AdminUser.id == user) & (TeamRole.name == 'admin'))) .order_by(Notification.created) .desc()) if not include_dismissed: query = query.switch(Notification).where(Notification.dismissed == False) if kind_name: query = (query .switch(Notification) .join(NotificationKind) .where(NotificationKind.name == kind_name)) if id_filter: query = (query .switch(Notification) .where(Notification.uuid == id_filter)) if page: query = query.paginate(page, limit) elif limit: query = query.limit(limit) return query def delete_all_notifications_by_kind(kind_name): kind_ref = NotificationKind.get(name=kind_name) (Notification .delete() .where(Notification.kind == kind_ref) .execute()) def delete_notifications_by_kind(target, kind_name): kind_ref = NotificationKind.get(name=kind_name) Notification.delete().where(Notification.target == target, Notification.kind == kind_ref).execute() def delete_matching_notifications(target, kind_name, **kwargs): kind_ref = NotificationKind.get(name=kind_name) # Load all notifications for the user with the given kind. notifications = Notification.select().where( Notification.target == target, Notification.kind == kind_ref) # For each, match the metadata to the specified values. for notification in notifications: matches = True try: metadata = json.loads(notification.metadata_json) except: continue for (key, value) in kwargs.iteritems(): if not key in metadata or metadata[key] != value: matches = False break if not matches: continue notification.delete_instance() def create_repo_notification(repo, event_name, method_name, method_config, event_config, title=None): event = ExternalNotificationEvent.get(ExternalNotificationEvent.name == event_name) method = ExternalNotificationMethod.get(ExternalNotificationMethod.name == method_name) return RepositoryNotification.create(repository=repo, event=event, method=method, config_json=json.dumps(method_config), title=title, event_config_json=json.dumps(event_config)) def get_repo_notification(uuid): try: return (RepositoryNotification .select(RepositoryNotification, Repository, Namespace) .join(Repository) .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .where(RepositoryNotification.uuid == uuid) .get()) except RepositoryNotification.DoesNotExist: raise InvalidNotificationException('No repository notification found with id: %s' % uuid) def delete_repo_notification(namespace_name, repository_name, uuid): found = get_repo_notification(uuid) if (found.repository.namespace_user.username != namespace_name or found.repository.name != repository_name): raise InvalidNotificationException('No repository notifiation found with id: %s' % uuid) found.delete_instance() return found def list_repo_notifications(namespace_name, repository_name, event_name=None): query = (RepositoryNotification .select(RepositoryNotification, Repository, Namespace) .join(Repository) .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .where(Namespace.username == namespace_name, Repository.name == repository_name)) if event_name: query = (query .switch(RepositoryNotification) .join(ExternalNotificationEvent) .where(ExternalNotificationEvent.name == event_name)) return query