import unittest from data import model from workers.notificationworker import NotificationWorker from initdb import setup_database_for_testing, finished_database_for_testing class NotificationWorkerTests(unittest.TestCase): def setUp(self): setup_database_for_testing(self) def tearDown(self): finished_database_for_testing(self) def test_basic_notification(self): # Ensure the public user doesn't have any notifications. target_user = model.user.get_user('public') self.assertEquals(0, len(list(model.notification.list_notifications(target_user)))) # Add a basic build notification. repo = model.repository.get_repository('devtable', 'simple') method_data = { 'target': { 'kind': 'user', 'name': 'public', } } notification = model.notification.create_repo_notification(repo, 'build_success', 'quay_notification', method_data, {}) notification_uuid = notification.uuid event_data = {} # Fire off the queue processing. worker = NotificationWorker(None) worker.process_queue_item({ 'notification_uuid': notification_uuid, 'event_data': event_data, }) # Ensure the notification was handled. self.assertEquals(1, len(list(model.notification.list_notifications(target_user)))) if __name__ == '__main__': unittest.main()