- Add the rotation_duration to the keys API - Have the key service UI use the new rotation_duration field - Fix notification deletion lookup path - Add proper support for the new notification in the UI - Only delete expired keys after 7 days (configurable) - Fix angular digest loop - Fix unit tests - Regenerate initdb
		
			
				
	
	
		
			171 lines
		
	
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			171 lines
		
	
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import json
 | |
| 
 | |
| from data.model import InvalidNotificationException, db_transaction
 | |
| from data.database import (Notification, NotificationKind, User, Team, TeamMember, TeamRole,
 | |
|                            RepositoryNotification, ExternalNotificationEvent, Repository,
 | |
|                            ExternalNotificationMethod, Namespace)
 | |
| 
 | |
| 
 | |
| def create_notification(kind_name, target, metadata={}, lookup_path=None):
 | |
|   kind_ref = NotificationKind.get(name=kind_name)
 | |
|   notification = Notification.create(kind=kind_ref, target=target,
 | |
|                                      metadata_json=json.dumps(metadata),
 | |
|                                      lookup_path=lookup_path)
 | |
|   return notification
 | |
| 
 | |
| 
 | |
| def create_unique_notification(kind_name, target, metadata={}):
 | |
|   with db_transaction():
 | |
|     if list_notifications(target, kind_name).count() == 0:
 | |
|       create_notification(kind_name, target, metadata)
 | |
| 
 | |
| 
 | |
| def lookup_notification(user, uuid):
 | |
|   results = list(list_notifications(user, id_filter=uuid, include_dismissed=True, limit=1))
 | |
|   if not results:
 | |
|     return None
 | |
| 
 | |
|   return results[0]
 | |
| 
 | |
| 
 | |
| def lookup_notifications_by_path_prefix(prefix):
 | |
|   return list((Notification
 | |
|                .select()
 | |
|                .where(Notification.lookup_path % prefix)))
 | |
| 
 | |
| 
 | |
| def list_notifications(user, kind_name=None, id_filter=None, include_dismissed=False,
 | |
|                        page=None, limit=None):
 | |
| 
 | |
|   base_query = Notification.select().join(NotificationKind)
 | |
| 
 | |
|   if kind_name is not None:
 | |
|     base_query = base_query.where(NotificationKind.name == kind_name)
 | |
| 
 | |
|   if id_filter is not None:
 | |
|     base_query = base_query.where(Notification.uuid == id_filter)
 | |
| 
 | |
|   if not include_dismissed:
 | |
|     base_query = base_query.where(Notification.dismissed == False)
 | |
| 
 | |
|   # Lookup directly for the user.
 | |
|   user_direct = base_query.clone().where(Notification.target == user)
 | |
| 
 | |
|   # Lookup via organizations admined by the user.
 | |
|   Org = User.alias()
 | |
|   AdminTeam = Team.alias()
 | |
|   AdminTeamMember = TeamMember.alias()
 | |
|   AdminUser = User.alias()
 | |
| 
 | |
|   via_orgs = (base_query.clone()
 | |
|               .join(Org, on=(Org.id == Notification.target))
 | |
|               .join(AdminTeam, on=(Org.id == AdminTeam.organization))
 | |
|               .join(TeamRole, on=(AdminTeam.role == TeamRole.id))
 | |
|               .switch(AdminTeam)
 | |
|               .join(AdminTeamMember, on=(AdminTeam.id == AdminTeamMember.team))
 | |
|               .join(AdminUser, on=(AdminTeamMember.user == AdminUser.id))
 | |
|               .where((AdminUser.id == user) & (TeamRole.name == 'admin')))
 | |
| 
 | |
|   query = user_direct | via_orgs
 | |
| 
 | |
|   if page:
 | |
|     query = query.paginate(page, limit)
 | |
|   elif limit:
 | |
|     query = query.limit(limit)
 | |
| 
 | |
|   return query.order_by(base_query.c.created.desc())
 | |
| 
 | |
| 
 | |
| def delete_all_notifications_by_path_prefix(prefix):
 | |
|   (Notification
 | |
|    .delete()
 | |
|    .where(Notification.lookup_path ** (prefix + '%'))
 | |
|    .execute())
 | |
| 
 | |
| 
 | |
| def delete_all_notifications_by_kind(kind_name):
 | |
|   kind_ref = NotificationKind.get(name=kind_name)
 | |
|   (Notification
 | |
|    .delete()
 | |
|    .where(Notification.kind == kind_ref)
 | |
|    .execute())
 | |
| 
 | |
| 
 | |
| def delete_notifications_by_kind(target, kind_name):
 | |
|   kind_ref = NotificationKind.get(name=kind_name)
 | |
|   Notification.delete().where(Notification.target == target,
 | |
|                               Notification.kind == kind_ref).execute()
 | |
| 
 | |
| 
 | |
| def delete_matching_notifications(target, kind_name, **kwargs):
 | |
|   kind_ref = NotificationKind.get(name=kind_name)
 | |
| 
 | |
|   # Load all notifications for the user with the given kind.
 | |
|   notifications = (Notification
 | |
|                    .select()
 | |
|                    .where(Notification.target == target,
 | |
|                           Notification.kind == kind_ref))
 | |
| 
 | |
|   # For each, match the metadata to the specified values.
 | |
|   for notification in notifications:
 | |
|     matches = True
 | |
|     try:
 | |
|       metadata = json.loads(notification.metadata_json)
 | |
|     except:
 | |
|       continue
 | |
| 
 | |
|     for (key, value) in kwargs.iteritems():
 | |
|       if not key in metadata or metadata[key] != value:
 | |
|         matches = False
 | |
|         break
 | |
| 
 | |
|     if not matches:
 | |
|       continue
 | |
| 
 | |
|     notification.delete_instance()
 | |
| 
 | |
| 
 | |
| def create_repo_notification(repo, event_name, method_name, method_config, event_config, title=None):
 | |
|   event = ExternalNotificationEvent.get(ExternalNotificationEvent.name == event_name)
 | |
|   method = ExternalNotificationMethod.get(ExternalNotificationMethod.name == method_name)
 | |
| 
 | |
|   return RepositoryNotification.create(repository=repo, event=event, method=method,
 | |
|                                        config_json=json.dumps(method_config), title=title,
 | |
|                                        event_config_json=json.dumps(event_config))
 | |
| 
 | |
| 
 | |
| def get_repo_notification(uuid):
 | |
|   try:
 | |
|     return (RepositoryNotification
 | |
|             .select(RepositoryNotification, Repository, Namespace)
 | |
|             .join(Repository)
 | |
|             .join(Namespace, on=(Repository.namespace_user == Namespace.id))
 | |
|             .where(RepositoryNotification.uuid == uuid)
 | |
|             .get())
 | |
|   except RepositoryNotification.DoesNotExist:
 | |
|     raise InvalidNotificationException('No repository notification found with id: %s' % uuid)
 | |
| 
 | |
| 
 | |
| def delete_repo_notification(namespace_name, repository_name, uuid):
 | |
|   found = get_repo_notification(uuid)
 | |
|   if (found.repository.namespace_user.username != namespace_name or
 | |
|       found.repository.name != repository_name):
 | |
|     raise InvalidNotificationException('No repository notifiation found with id: %s' % uuid)
 | |
|   found.delete_instance()
 | |
|   return found
 | |
| 
 | |
| 
 | |
| def list_repo_notifications(namespace_name, repository_name, event_name=None):
 | |
|   query = (RepositoryNotification
 | |
|            .select(RepositoryNotification, Repository, Namespace)
 | |
|            .join(Repository)
 | |
|            .join(Namespace, on=(Repository.namespace_user == Namespace.id))
 | |
|            .where(Namespace.username == namespace_name, Repository.name == repository_name))
 | |
| 
 | |
|   if event_name:
 | |
|     query = (query
 | |
|              .switch(RepositoryNotification)
 | |
|              .join(ExternalNotificationEvent)
 | |
|              .where(ExternalNotificationEvent.name == event_name))
 | |
| 
 | |
|   return query
 |