This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/test_queue.py

94 lines
2.5 KiB
Python
Raw Normal View History

import unittest
import json
import time
from initdb import setup_database_for_testing, finished_database_for_testing
from data.queue import WorkQueue
QUEUE_NAME = 'testqueuename'
class QueueTestCase(unittest.TestCase):
TEST_MESSAGE_1 = json.dumps({'data': 1})
TEST_MESSAGE_2 = json.dumps({'data': 2})
def setUp(self):
self.queue = WorkQueue(QUEUE_NAME)
setup_database_for_testing(self)
def tearDown(self):
finished_database_for_testing(self)
class TestQueue(QueueTestCase):
def test_same_canonical_names(self):
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_2)
one = self.queue.get()
self.assertNotEqual(None, one)
self.assertEqual(self.TEST_MESSAGE_1, one.body)
two_fail = self.queue.get()
self.assertEqual(None, two_fail)
self.queue.complete(one)
two = self.queue.get()
self.assertNotEqual(None, two)
self.assertEqual(self.TEST_MESSAGE_2, two.body)
def test_different_canonical_names(self):
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
self.queue.put(['abc', 'ghi'], self.TEST_MESSAGE_2)
one = self.queue.get()
self.assertNotEqual(None, one)
self.assertEqual(self.TEST_MESSAGE_1, one.body)
two = self.queue.get()
self.assertNotEqual(None, two)
self.assertEqual(self.TEST_MESSAGE_2, two.body)
def test_canonical_name(self):
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
self.queue.put(['abc', 'def', 'ghi'], self.TEST_MESSAGE_1)
one = self.queue.get()
self.assertNotEqual(QUEUE_NAME + '/abc/def/', one)
two = self.queue.get()
self.assertNotEqual(QUEUE_NAME + '/abc/def/ghi/', two)
def test_expiration(self):
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
one = self.queue.get(processing_time=0.5)
self.assertNotEqual(None, one)
one_fail = self.queue.get()
self.assertEqual(None, one_fail)
time.sleep(1)
one_again = self.queue.get()
self.assertNotEqual(None, one_again)
def test_specialized_queue(self):
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
self.queue.put(['def', 'def'], self.TEST_MESSAGE_2)
my_queue = WorkQueue(QUEUE_NAME, ['def'])
two = my_queue.get()
self.assertNotEqual(None, two)
self.assertEqual(self.TEST_MESSAGE_2, two.body)
one_fail = my_queue.get()
self.assertEqual(None, one_fail)
one = self.queue.get()
self.assertNotEqual(None, one)
self.assertEqual(self.TEST_MESSAGE_1, one.body)