Merge pull request #1493 from jzelinskie/noorder
queue: explicitly declare ordering requirement
This commit is contained in:
commit
6178371cf5
5 changed files with 91 additions and 57 deletions
|
@ -163,7 +163,8 @@ class BuilderServer(object):
|
|||
logger.debug('Checking for more work for %d active workers',
|
||||
self._lifecycle_manager.num_workers())
|
||||
|
||||
job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time())
|
||||
job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time(),
|
||||
order_required=True)
|
||||
if job_item is None:
|
||||
logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT)
|
||||
continue
|
||||
|
|
|
@ -1,12 +1,11 @@
|
|||
#cloud-config
|
||||
|
||||
ssh_authorized_keys:
|
||||
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCC0m+hVmyR3vn/xoxJe9+atRWBxSK+YXgyufNVDMcb7H00Jfnc341QH3kDVYZamUbhVh/nyc2RP7YbnZR5zORFtgOaNSdkMYrPozzBvxjnvSUokkCCWbLqXDHvIKiR12r+UTSijPJE/Yk702Mb2ejAFuae1C3Ec+qKAoOCagDjpQ3THyb5oaKE7VPHdwCWjWIQLRhC+plu77ObhoXIFJLD13gCi01L/rp4mYVCxIc2lX5A8rkK+bZHnIZwWUQ4t8SIjWxIaUo0FE7oZ83nKuNkYj5ngmLHQLY23Nx2WhE9H6NBthUpik9SmqQPtVYbhIG+bISPoH9Xs8CLrFb0VRjz Joey's Mac
|
||||
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCo6FhAP7mFFOAzM91gtaKW7saahtaN4lur42FMMztz6aqUycIltCmvxo+3FmrXgCG30maMNU36Vm1+9QRtVQEd+eRuoIWP28t+8MT01Fh4zPuE2Wca3pOHSNo3X81FfWJLzmwEHiQKs9HPQqUhezR9PcVWVkbMyAzw85c0UycGmHGFNb0UiRd9HFY6XbgbxhZv/mvKLZ99xE3xkOzS1PNsdSNvjUKwZR7pSUPqNS5S/1NXyR4GhFTU24VPH/bTATOv2ATH+PSzsZ7Qyz9UHj38tKC+ALJHEDJ4HXGzobyOUP78cHGZOfCB5FYubq0zmOudAjKIAhwI8XTFvJ2DX1P3 jimmyzelinskie
|
||||
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDNvw8qo9m8np7yQ/Smv/oklM8bo8VyNRZriGYBDuolWDL/mZpYCQnZJXphQo7RFdNABYistikjJlBuuwUohLf2uSq0iKoFa2TgwI43wViWzvuzU4nA02/ITD5BZdmWAFNyIoqeB50Ol4qUgDwLAZ+7Kv7uCi6chcgr9gTi99jY3GHyZjrMiXMHGVGi+FExFuzhVC2drKjbz5q6oRfQeLtNfG4psl5GU3MQU6FkX4fgoCx0r9R48/b7l4+TT7pWblJQiRfeldixu6308vyoTUEHasdkU3/X0OTaGz/h5XqTKnGQc6stvvoED3w+L3QFp0H5Z8sZ9stSsitmCBrmbcKZ jakemoshenko
|
||||
- ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAgEAo/JkbGO6R7g1ZxARi0xWVM7FOfN02snRAcIO6vT9M7xMUkWVLgD+hM/o91lk+UFiYdql0CATobpFWncRL36KaUqsbw9/1BlI40wg296XHXSSnxhxZ4L7ytf6G1tyN319HXlI2kh9vAf/fy++yDvkH8dI3k1oLoW+mZPET6Pff04/6AXXrRlS5mhmGv9irGwiDHtVKpj6lU8DN/UtOrv1tiQ0pgwEJq05fLGoQfgPNaBCnW2z4Ubpn2gyMcMBMpSwo4hCqJePd349e4bLmFcT+gXYg7Mnup1DoTDlowFFN56wpxQbdp96IxWzU+jYPaIAuRo+BJzCyOS8qBv0Z4RZrgop0qp2JYiVwmViO6TZhIDz6loQJXUOIleQmNgTbiZx8Bwv5GY2jMYoVwlBp7yy5bRjxfbFsJ0vU7TVzNAG7oEJy/74HmHmWzRQlSlQjesr8gRbm9zgR8wqc/L107UOWFg7Cgh8ZNjKuADbXqYuda1Y9m2upcfS26UPz5l5PW5uFRMHZSi8pb1XV6/0Z8H8vwsh37Ur6aLi/5jruRmKhdlsNrB1IiDicBsPW3yg7HHSIdPU4oBNPC77yDCT3l4CKr4el81RrZt7FbJPfY+Ig9Q5O+05f6I8+ZOlJGyZ/Qfyl2aVm1HnlJKuBqPxeic8tMng/9B5N7uZL6Y3k5jFU8c= quentin
|
||||
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDI7LtxLItapmUbt3Gs+4Oxa1i22fkx1+aJDkAjiRWPSX3+cxOzuPfHX9uFzr+qj5hy4J7ErrPp8q9alu+il9lE26GQuUxOZiaUrXu4dRCXXdCqTHARWBxGUXjkxdMp2HIzFpBxmVqcRubrgM36LBzKapdDOqQdz7XnNm5Jmf0tH/N0+TgV60P0WVY1CxmTya+JHNFVgazhd+oIGEhTyW/eszMGcFUgZet7DQFytYIQXYSwwGpGdJ+0InKAJ2SzCt/yuUlSrhrVM8vSGeami1XYmgQiyth1zjteMd8uTrc9NREH7bZTNcMFBqVYE3BYQWGRrv8pMMgP9gxgLbxtVsUl barakmich-titania
|
||||
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDiNawWSZL2MF99zwG9cFjGmML6agsKwaacQEoTsjcjHGixyUnqHXaLdrGma5i/uphZPkI5XRBKiuIROACY/aRoIxJUpV7AQ1Zx87cILx6fDVePvU5lW2DdhlCDUdwjuzDb/WO/c/qMWjOPqRG4q8XvB7nhuORMMgdpDXWVH4LXPmFez1iIBCKNk04l6Se7wiEOQjaBnTDiBDYlWD78r6RdiAU5eIxpq+lKBDTcET0vegwcA/WE4YOlYBbOrgtHrgwWqG/pXxUu77aapDOmfjtDrgim6XP5kEnytg5gCaN9iLvIpT8b1wD/1Z+LoNSZg6m9gkcC2yTRI0apOBa2G8lz silas@pro.local
|
||||
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCC0m+hVmyR3vn/xoxJe9+atRWBxSK+YXgyufNVDMcb7H00Jfnc341QH3kDVYZamUbhVh/nyc2RP7YbnZR5zORFtgOaNSdkMYrPozzBvxjnvSUokkCCWbLqXDHvIKiR12r+UTSijPJE/Yk702Mb2ejAFuae1C3Ec+qKAoOCagDjpQ3THyb5oaKE7VPHdwCWjWIQLRhC+plu77ObhoXIFJLD13gCi01L/rp4mYVCxIc2lX5A8rkK+bZHnIZwWUQ4t8SIjWxIaUo0FE7oZ83nKuNkYj5ngmLHQLY23Nx2WhE9H6NBthUpik9SmqQPtVYbhIG+bISPoH9Xs8CLrFb0VRjz Joey Schorr
|
||||
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCo6FhAP7mFFOAzM91gtaKW7saahtaN4lur42FMMztz6aqUycIltCmvxo+3FmrXgCG30maMNU36Vm1+9QRtVQEd+eRuoIWP28t+8MT01Fh4zPuE2Wca3pOHSNo3X81FfWJLzmwEHiQKs9HPQqUhezR9PcVWVkbMyAzw85c0UycGmHGFNb0UiRd9HFY6XbgbxhZv/mvKLZ99xE3xkOzS1PNsdSNvjUKwZR7pSUPqNS5S/1NXyR4GhFTU24VPH/bTATOv2ATH+PSzsZ7Qyz9UHj38tKC+ALJHEDJ4HXGzobyOUP78cHGZOfCB5FYubq0zmOudAjKIAhwI8XTFvJ2DX1P3 Jimmy Zelinskie
|
||||
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDNvw8qo9m8np7yQ/Smv/oklM8bo8VyNRZriGYBDuolWDL/mZpYCQnZJXphQo7RFdNABYistikjJlBuuwUohLf2uSq0iKoFa2TgwI43wViWzvuzU4nA02/ITD5BZdmWAFNyIoqeB50Ol4qUgDwLAZ+7Kv7uCi6chcgr9gTi99jY3GHyZjrMiXMHGVGi+FExFuzhVC2drKjbz5q6oRfQeLtNfG4psl5GU3MQU6FkX4fgoCx0r9R48/b7l4+TT7pWblJQiRfeldixu6308vyoTUEHasdkU3/X0OTaGz/h5XqTKnGQc6stvvoED3w+L3QFp0H5Z8sZ9stSsitmCBrmbcKZ Jake Moshenko
|
||||
- ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAgEAo/JkbGO6R7g1ZxARi0xWVM7FOfN02snRAcIO6vT9M7xMUkWVLgD+hM/o91lk+UFiYdql0CATobpFWncRL36KaUqsbw9/1BlI40wg296XHXSSnxhxZ4L7ytf6G1tyN319HXlI2kh9vAf/fy++yDvkH8dI3k1oLoW+mZPET6Pff04/6AXXrRlS5mhmGv9irGwiDHtVKpj6lU8DN/UtOrv1tiQ0pgwEJq05fLGoQfgPNaBCnW2z4Ubpn2gyMcMBMpSwo4hCqJePd349e4bLmFcT+gXYg7Mnup1DoTDlowFFN56wpxQbdp96IxWzU+jYPaIAuRo+BJzCyOS8qBv0Z4RZrgop0qp2JYiVwmViO6TZhIDz6loQJXUOIleQmNgTbiZx8Bwv5GY2jMYoVwlBp7yy5bRjxfbFsJ0vU7TVzNAG7oEJy/74HmHmWzRQlSlQjesr8gRbm9zgR8wqc/L107UOWFg7Cgh8ZNjKuADbXqYuda1Y9m2upcfS26UPz5l5PW5uFRMHZSi8pb1XV6/0Z8H8vwsh37Ur6aLi/5jruRmKhdlsNrB1IiDicBsPW3yg7HHSIdPU4oBNPC77yDCT3l4CKr4el81RrZt7FbJPfY+Ig9Q5O+05f6I8+ZOlJGyZ/Qfyl2aVm1HnlJKuBqPxeic8tMng/9B5N7uZL6Y3k5jFU8c= Quentin Machu
|
||||
- ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQC964SY8ojXZVfWknF+Pz+pTHpyb66VBH7OLYnGP+Tm452YKJVFb/rXCpZYHFlzSQtzz9hko8qBoEFXuD2humojx0P7nEtTy8wUClnKcifIqD5b/V1r7ZDa/5hL9Xog11gOXZ17TW1qjN+00qgXwoSh+jM8mAxD7V2ZLnanIDqmpYamT3ZlICz1k4bwYj35gnpSFpijAXeF9LXOEUfDtzNBjeaCvyniYlQyKzpKr8x+oIHumPlxwkFOzGhBMRGrCQ1Kzija8vVZQ6/Tjvxl19jwfgcNT0Zd9vLbHNowJPWQZhLYXdGIb3NxEfAqkGPvGCsaLfsfETYhcFwxr2g+zvf4xvyKgK35PHA/5t7TQryDSKDrQ1qTDUp3dAjzwsBFwEoQ0x68shGC661n/+APMNtj8qR5M9ueIH5WEqdRW10kKzlEm/ESvjyjEVRhXiwWyKkPch/OIUPKexKaEeOBdKocSnNx1+5ntk8OXWRQgjfwtQvm1NE/qD7fViBVUlTRk0c1SVpZaybIZkiMWmA1hzsdUbDP2mzPek1ydsVffw0I8z/dRo5gXQSPq06WfNIKpsiQF8LqP+KU+462A2tbHxFzq9VozI9PeFV+xO59wlJogv6q2yA0Jfv9BFgVgNzItIsUMvStrfkUBTYgaG9djp/vAm+SwMdnLSXILJtMO/3eRQ== Evan Cordell
|
||||
|
||||
write_files:
|
||||
- path: /root/overrides.list
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
from datetime import datetime, timedelta
|
||||
|
||||
from data.database import QueueItem, db, db_for_update
|
||||
from data.database import QueueItem, db, db_for_update, db_random_func
|
||||
from util.morecollections import AttrDict
|
||||
|
||||
|
||||
|
@ -42,30 +42,37 @@ class WorkQueue(object):
|
|||
def _canonical_name(name_list):
|
||||
return '/'.join(name_list) + '/'
|
||||
|
||||
def _running_jobs(self, now, name_match_query):
|
||||
@classmethod
|
||||
def _running_jobs(cls, now, name_match_query):
|
||||
return (QueueItem
|
||||
.select(QueueItem.queue_name)
|
||||
.where(QueueItem.available == False,
|
||||
QueueItem.processing_expires > now,
|
||||
QueueItem.queue_name ** name_match_query))
|
||||
|
||||
def _available_jobs(self, now, name_match_query):
|
||||
return self._available_jobs_where(QueueItem.select(), now, name_match_query)
|
||||
@classmethod
|
||||
def _available_jobs(cls, now, name_match_query):
|
||||
return (cls
|
||||
._available_jobs_where(QueueItem.select(), now)
|
||||
.where(QueueItem.queue_name ** name_match_query))
|
||||
|
||||
def _available_jobs_where(self, query, now, name_match_query):
|
||||
return query.where(QueueItem.queue_name ** name_match_query, QueueItem.available_after <= now,
|
||||
((QueueItem.available == True) | (QueueItem.processing_expires <= now)),
|
||||
QueueItem.retries_remaining > 0)
|
||||
@staticmethod
|
||||
def _available_jobs_where(query, now):
|
||||
return query.where(QueueItem.available_after <= now,
|
||||
((QueueItem.available == True) | (QueueItem.processing_expires <= now)),
|
||||
QueueItem.retries_remaining > 0)
|
||||
|
||||
def _available_jobs_not_running(self, now, name_match_query, running_query):
|
||||
return (self
|
||||
@classmethod
|
||||
def _available_jobs_not_running(cls, now, name_match_query, running_query):
|
||||
return (cls
|
||||
._available_jobs(now, name_match_query)
|
||||
.where(~(QueueItem.queue_name << running_query)))
|
||||
|
||||
def _name_match_query(self):
|
||||
return '%s%%' % self._canonical_name([self._queue_name] + self._canonical_name_match_list)
|
||||
|
||||
def _item_by_id_for_update(self, queue_id):
|
||||
@staticmethod
|
||||
def _item_by_id_for_update(queue_id):
|
||||
return db_for_update(QueueItem.select().where(QueueItem.id == queue_id)).get()
|
||||
|
||||
def get_metrics(self):
|
||||
|
@ -80,8 +87,10 @@ class WorkQueue(object):
|
|||
|
||||
available_not_running_query = self._available_jobs_not_running(now, name_match_query,
|
||||
running_query)
|
||||
available_not_running_count = (available_not_running_query.select(QueueItem.queue_name)
|
||||
.distinct().count())
|
||||
available_not_running_count = (available_not_running_query
|
||||
.select(QueueItem.queue_name)
|
||||
.distinct()
|
||||
.count())
|
||||
|
||||
return (running_count, available_not_running_count, available_count)
|
||||
|
||||
|
@ -131,38 +140,44 @@ class WorkQueue(object):
|
|||
self._metric_queue.put('Added', 1, dimensions={'queue': self._queue_name})
|
||||
return r
|
||||
|
||||
def get(self, processing_time=300):
|
||||
def get(self, processing_time=300, ordering_required=False):
|
||||
"""
|
||||
Get an available item and mark it as unavailable for the default of five
|
||||
minutes. The result of this method must always be composed of simple
|
||||
python objects which are JSON serializable for network portability reasons.
|
||||
"""
|
||||
now = datetime.utcnow()
|
||||
|
||||
name_match_query = self._name_match_query()
|
||||
|
||||
running = self._running_jobs(now, name_match_query)
|
||||
avail = self._available_jobs_not_running(now, name_match_query, running)
|
||||
|
||||
item = None
|
||||
try:
|
||||
# The previous solution to this used a select for update in a
|
||||
# transaction to prevent multiple instances from processing the
|
||||
# same queue item. This suffered performance problems. This solution
|
||||
# instead has instances attempt to update the potential queue item to be
|
||||
# unavailable. However, since their update clause is restricted to items
|
||||
# that are available=False, only one instance's update will succeed, and
|
||||
# it will have a changed row count of 1. Instances that have 0 changed
|
||||
# rows know that another instance is already handling that item.
|
||||
db_item = avail.order_by(QueueItem.id).get()
|
||||
changed_query = (QueueItem.update(
|
||||
available=False,
|
||||
processing_expires=now + timedelta(seconds=processing_time),
|
||||
retries_remaining=QueueItem.retries_remaining-1,
|
||||
)
|
||||
.where(QueueItem.id == db_item.id))
|
||||
|
||||
changed_query = self._available_jobs_where(changed_query, now, name_match_query)
|
||||
try:
|
||||
if ordering_required:
|
||||
# The previous solution to this used a select for update in a
|
||||
# transaction to prevent multiple instances from processing the
|
||||
# same queue item. This suffered performance problems. This solution
|
||||
# instead has instances attempt to update the potential queue item to be
|
||||
# unavailable. However, since their update clause is restricted to items
|
||||
# that are available=False, only one instance's update will succeed, and
|
||||
# it will have a changed row count of 1. Instances that have 0 changed
|
||||
# rows know that another instance is already handling that item.
|
||||
running = self._running_jobs(now, name_match_query)
|
||||
avail = self._available_jobs_not_running(now, name_match_query, running)
|
||||
db_item = avail.order_by(QueueItem.id).get()
|
||||
else:
|
||||
# If we don't require ordering, we grab a random item from any of the first 50 available.
|
||||
subquery = self._available_jobs(now, name_match_query).limit(50).alias('j1')
|
||||
db_item = (QueueItem
|
||||
.select()
|
||||
.join(subquery, on=QueueItem.id == subquery.c.id)
|
||||
.order_by(db_random_func())
|
||||
.get())
|
||||
|
||||
set_unavailable_query = (QueueItem
|
||||
.update(available=False,
|
||||
processing_expires=now + timedelta(seconds=processing_time),
|
||||
retries_remaining=QueueItem.retries_remaining-1)
|
||||
.where(QueueItem.id == db_item.id))
|
||||
changed_query = self._available_jobs_where(set_unavailable_query, now)
|
||||
changed = changed_query.execute()
|
||||
if changed == 1:
|
||||
item = AttrDict({
|
||||
|
@ -187,7 +202,7 @@ class WorkQueue(object):
|
|||
# Load the build queue item for update.
|
||||
try:
|
||||
queue_item = db_for_update(QueueItem.select()
|
||||
.where(QueueItem.id == item_id)).get()
|
||||
.where(QueueItem.id == item_id)).get()
|
||||
except QueueItem.DoesNotExist:
|
||||
return False
|
||||
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
ipython
|
||||
pylint
|
||||
ipdb
|
||||
tqdm
|
||||
|
|
|
@ -47,6 +47,7 @@ class AutoUpdatingQueue(object):
|
|||
class QueueTestCase(unittest.TestCase):
|
||||
TEST_MESSAGE_1 = json.dumps({'data': 1})
|
||||
TEST_MESSAGE_2 = json.dumps({'data': 2})
|
||||
TEST_MESSAGES = [json.dumps({'data': str(i)}) for i in range(1, 101)]
|
||||
|
||||
def setUp(self):
|
||||
self.reporter = SaveLastCountReporter()
|
||||
|
@ -71,14 +72,14 @@ class TestQueue(QueueTestCase):
|
|||
self.assertEqual(self.reporter.running_count, 0)
|
||||
self.assertEqual(self.reporter.total, 1)
|
||||
|
||||
one = self.queue.get()
|
||||
one = self.queue.get(ordering_required=True)
|
||||
self.assertNotEqual(None, one)
|
||||
self.assertEqual(self.TEST_MESSAGE_1, one.body)
|
||||
self.assertEqual(self.reporter.currently_processing, True)
|
||||
self.assertEqual(self.reporter.running_count, 1)
|
||||
self.assertEqual(self.reporter.total, 1)
|
||||
|
||||
two_fail = self.queue.get()
|
||||
two_fail = self.queue.get(ordering_required=True)
|
||||
self.assertEqual(None, two_fail)
|
||||
self.assertEqual(self.reporter.running_count, 1)
|
||||
self.assertEqual(self.reporter.total, 1)
|
||||
|
@ -88,7 +89,7 @@ class TestQueue(QueueTestCase):
|
|||
self.assertEqual(self.reporter.running_count, 0)
|
||||
self.assertEqual(self.reporter.total, 1)
|
||||
|
||||
two = self.queue.get()
|
||||
two = self.queue.get(ordering_required=True)
|
||||
self.assertNotEqual(None, two)
|
||||
self.assertEqual(self.reporter.currently_processing, True)
|
||||
self.assertEqual(self.TEST_MESSAGE_2, two.body)
|
||||
|
@ -101,13 +102,13 @@ class TestQueue(QueueTestCase):
|
|||
self.assertEqual(self.reporter.running_count, 0)
|
||||
self.assertEqual(self.reporter.total, 2)
|
||||
|
||||
one = self.queue.get()
|
||||
one = self.queue.get(ordering_required=True)
|
||||
self.assertNotEqual(None, one)
|
||||
self.assertEqual(self.TEST_MESSAGE_1, one.body)
|
||||
self.assertEqual(self.reporter.running_count, 1)
|
||||
self.assertEqual(self.reporter.total, 2)
|
||||
|
||||
two = self.queue.get()
|
||||
two = self.queue.get(ordering_required=True)
|
||||
self.assertNotEqual(None, two)
|
||||
self.assertEqual(self.TEST_MESSAGE_2, two.body)
|
||||
self.assertEqual(self.reporter.running_count, 2)
|
||||
|
@ -117,10 +118,10 @@ class TestQueue(QueueTestCase):
|
|||
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1)
|
||||
self.queue.put(['abc', 'def', 'ghi'], self.TEST_MESSAGE_1)
|
||||
|
||||
one = self.queue.get()
|
||||
one = self.queue.get(ordering_required=True)
|
||||
self.assertNotEqual(QUEUE_NAME + '/abc/def/', one)
|
||||
|
||||
two = self.queue.get()
|
||||
two = self.queue.get(ordering_required=True)
|
||||
self.assertNotEqual(QUEUE_NAME + '/abc/def/ghi/', two)
|
||||
|
||||
def test_expiration(self):
|
||||
|
@ -128,12 +129,12 @@ class TestQueue(QueueTestCase):
|
|||
self.assertEqual(self.reporter.running_count, 0)
|
||||
self.assertEqual(self.reporter.total, 1)
|
||||
|
||||
one = self.queue.get(processing_time=0.5)
|
||||
one = self.queue.get(processing_time=0.5, ordering_required=True)
|
||||
self.assertNotEqual(None, one)
|
||||
self.assertEqual(self.reporter.running_count, 1)
|
||||
self.assertEqual(self.reporter.total, 1)
|
||||
|
||||
one_fail = self.queue.get()
|
||||
one_fail = self.queue.get(ordering_required=True)
|
||||
self.assertEqual(None, one_fail)
|
||||
|
||||
time.sleep(1)
|
||||
|
@ -141,7 +142,7 @@ class TestQueue(QueueTestCase):
|
|||
self.assertEqual(self.reporter.running_count, 0)
|
||||
self.assertEqual(self.reporter.total, 1)
|
||||
|
||||
one_again = self.queue.get()
|
||||
one_again = self.queue.get(ordering_required=True)
|
||||
self.assertNotEqual(None, one_again)
|
||||
self.assertEqual(self.reporter.running_count, 1)
|
||||
self.assertEqual(self.reporter.total, 1)
|
||||
|
@ -152,17 +153,34 @@ class TestQueue(QueueTestCase):
|
|||
|
||||
my_queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory, ['def']))
|
||||
|
||||
two = my_queue.get()
|
||||
two = my_queue.get(ordering_required=True)
|
||||
self.assertNotEqual(None, two)
|
||||
self.assertEqual(self.TEST_MESSAGE_2, two.body)
|
||||
|
||||
one_fail = my_queue.get()
|
||||
one_fail = my_queue.get(ordering_required=True)
|
||||
self.assertEqual(None, one_fail)
|
||||
|
||||
one = self.queue.get()
|
||||
one = self.queue.get(ordering_required=True)
|
||||
self.assertNotEqual(None, one)
|
||||
self.assertEqual(self.TEST_MESSAGE_1, one.body)
|
||||
|
||||
def test_random_queue_no_duplicates(self):
|
||||
for msg in self.TEST_MESSAGES:
|
||||
self.queue.put(['abc', 'def'], msg)
|
||||
seen = set()
|
||||
|
||||
for _ in range(1, 101):
|
||||
item = self.queue.get()
|
||||
json_body = json.loads(item.body)
|
||||
msg = str(json_body['data'])
|
||||
self.assertTrue(msg not in seen)
|
||||
seen.add(msg)
|
||||
|
||||
for body in self.TEST_MESSAGES:
|
||||
json_body = json.loads(body)
|
||||
msg = str(json_body['data'])
|
||||
self.assertIn(msg, seen)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Reference in a new issue