Add an end-to-end test for the notifications queue

This commit is contained in:
Joseph Schorr 2016-09-21 15:15:35 -04:00
parent c0ae8e301b
commit a5fef119c9

View file

@ -0,0 +1,45 @@
import unittest
from data import model
from workers.notificationworker import NotificationWorker
from initdb import setup_database_for_testing, finished_database_for_testing
class NotificationWorkerTests(unittest.TestCase):
def setUp(self):
setup_database_for_testing(self)
def tearDown(self):
finished_database_for_testing(self)
def test_basic_notification(self):
# Ensure the public user doesn't have any notifications.
target_user = model.user.get_user('public')
self.assertEquals(0, len(list(model.notification.list_notifications(target_user))))
# Add a basic build notification.
repo = model.repository.get_repository('devtable', 'simple')
method_data = {
'target': {
'kind': 'user',
'name': 'public',
}
}
notification = model.notification.create_repo_notification(repo, 'build_success',
'quay_notification', method_data, {})
notification_uuid = notification.uuid
event_data = {}
# Fire off the queue processing.
worker = NotificationWorker(None)
worker.process_queue_item({
'notification_uuid': notification_uuid,
'event_data': event_data,
})
# Ensure the notification was handled.
self.assertEquals(1, len(list(model.notification.list_notifications(target_user))))
if __name__ == '__main__':
unittest.main()