Merge pull request #2276 from coreos-inc/queue-improvements

Various improvements to the queue code
This commit is contained in:
josephschorr 2017-01-17 14:04:14 -05:00 committed by GitHub
commit 0f203b01d3
4 changed files with 251 additions and 64 deletions

View file

@ -762,6 +762,7 @@ class QueueItem(BaseModel):
available = BooleanField(default=True) available = BooleanField(default=True)
processing_expires = DateTimeField(null=True) processing_expires = DateTimeField(null=True)
retries_remaining = IntegerField(default=5) retries_remaining = IntegerField(default=5)
state_id = CharField(default=uuid_generator, index=True, unique=True)
class Meta: class Meta:
database = db database = db
@ -774,6 +775,11 @@ class QueueItem(BaseModel):
(('processing_expires', 'available_after', 'queue_name', 'retries_remaining', 'available'), False), (('processing_expires', 'available_after', 'queue_name', 'retries_remaining', 'available'), False),
) )
def save(self, *args, **kwargs):
# Always change the queue item's state ID when we update it.
self.state_id = str(uuid.uuid4())
super(QueueItem, self).save(*args, **kwargs)
class RepositoryBuild(BaseModel): class RepositoryBuild(BaseModel):
uuid = CharField(default=uuid_generator, index=True) uuid = CharField(default=uuid_generator, index=True)

View file

@ -0,0 +1,28 @@
"""Add state_id field to QueueItem
Revision ID: fc47c1ec019f
Revises: f5167870dd66
Create Date: 2017-01-12 15:44:23.643016
"""
# revision identifiers, used by Alembic.
revision = 'fc47c1ec019f'
down_revision = 'f5167870dd66'
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
def upgrade(tables):
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('queueitem', sa.Column('state_id', sa.String(length=255), nullable=False, server_default=''))
op.create_index('queueitem_state_id', 'queueitem', ['state_id'], unique=True)
# ### end Alembic commands ###
def downgrade(tables):
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('queueitem_state_id', table_name='queueitem')
op.drop_column('queueitem', 'state_id')
# ### end Alembic commands ###

View file

@ -1,22 +1,17 @@
import uuid
from datetime import datetime, timedelta from datetime import datetime, timedelta
from contextlib import contextmanager from contextlib import contextmanager
from data.database import QueueItem, db, db_for_update, db_random_func from data.database import QueueItem, db, db_for_update, db_random_func
from util.morecollections import AttrDict from util.morecollections import AttrDict
from hashlib import sha256
MINIMUM_EXTENSION = timedelta(seconds=20) MINIMUM_EXTENSION = timedelta(seconds=20)
DEFAULT_BATCH_SIZE = 1000 DEFAULT_BATCH_SIZE = 1000
class NoopWith:
def __enter__(self):
pass
def __exit__(self, type, value, traceback):
pass
class BuildMetricQueueReporter(object): class BuildMetricQueueReporter(object):
""" Metric queue reporter for the build system. """ """ Metric queue reporter for the build system. """
def __init__(self, metric_queue): def __init__(self, metric_queue):
@ -175,7 +170,7 @@ class WorkQueue(object):
except QueueItem.DoesNotExist: except QueueItem.DoesNotExist:
return False return False
def _queue_body(self, canonical_name_list, message, available_after, retries_remaining): def _queue_dict(self, canonical_name_list, message, available_after, retries_remaining):
return dict( return dict(
queue_name=self._canonical_name([self._queue_name] + canonical_name_list), queue_name=self._canonical_name([self._queue_name] + canonical_name_list),
body=message, body=message,
@ -191,7 +186,7 @@ class WorkQueue(object):
Put an item, if it shouldn't be processed for some number of seconds, Put an item, if it shouldn't be processed for some number of seconds,
specify that amount as available_after. Returns the ID of the queue item added. specify that amount as available_after. Returns the ID of the queue item added.
""" """
items_to_insert.append(self._queue_body(canonical_name_list, message, available_after, items_to_insert.append(self._queue_dict(canonical_name_list, message, available_after,
retries_remaining)) retries_remaining))
yield batch_put yield batch_put
@ -211,7 +206,7 @@ class WorkQueue(object):
Put an item, if it shouldn't be processed for some number of seconds, Put an item, if it shouldn't be processed for some number of seconds,
specify that amount as available_after. Returns the ID of the queue item added. specify that amount as available_after. Returns the ID of the queue item added.
""" """
item = QueueItem.create(**self._queue_body(canonical_name_list, message, available_after, item = QueueItem.create(**self._queue_dict(canonical_name_list, message, available_after,
retries_remaining)) retries_remaining))
if self._metric_queue: if self._metric_queue:
@ -219,15 +214,11 @@ class WorkQueue(object):
return str(item.id) return str(item.id)
def get(self, processing_time=300, ordering_required=False): def _select_available_item(self, ordering_required, now):
""" Selects an available queue item from the queue table and returns it, if any. If none,
return None.
""" """
Get an available item and mark it as unavailable for the default of five
minutes. The result of this method must always be composed of simple
python objects which are JSON serializable for network portability reasons.
"""
now = datetime.utcnow()
name_match_query = self._name_match_query() name_match_query = self._name_match_query()
item = None
try: try:
if ordering_required: if ordering_required:
@ -241,64 +232,91 @@ class WorkQueue(object):
# rows know that another instance is already handling that item. # rows know that another instance is already handling that item.
running = self._running_jobs(now, name_match_query) running = self._running_jobs(now, name_match_query)
avail = self._available_jobs_not_running(now, name_match_query, running) avail = self._available_jobs_not_running(now, name_match_query, running)
db_item = avail.order_by(QueueItem.id).get() return avail.order_by(QueueItem.id).get()
else: else:
# If we don't require ordering, we grab a random item from any of the first 50 available. # If we don't require ordering, we grab a random item from any of the first 50 available.
subquery = self._available_jobs(now, name_match_query).limit(50).alias('j1') subquery = self._available_jobs(now, name_match_query).limit(50).alias('j1')
db_item = (QueueItem return (QueueItem
.select() .select()
.join(subquery, on=QueueItem.id == subquery.c.id) .join(subquery, on=QueueItem.id == subquery.c.id)
.order_by(db_random_func()) .order_by(db_random_func())
.get()) .get())
except QueueItem.DoesNotExist:
# No available queue item was found.
return None
def _attempt_to_claim_item(self, db_item, now, processing_time):
""" Attempts to claim the specified queue item for this instance. Returns True on success and
False on failure.
Note that the underlying QueueItem row in the database will be changed on success, but
the db_item object given as a parameter will *not* have its fields updated.
"""
# Try to claim the item. We do so by updating the item's information only if its current
# state ID matches that returned in the previous query. Since all updates to the QueueItem
# must change the state ID, this is guarenteed to only succeed if the item has not yet been
# claimed by another caller.
#
# Note that we use this method because InnoDB takes locks on *every* clause in the WHERE when
# performing the update. Previously, we would check all these columns, resulting in a bunch
# of lock contention. This change mitigates the problem significantly by only checking two
# columns (id and state_id), both of which should be absolutely unique at all times.
#
# TODO(jschorr): Remove the extra `processing_expires` check once this has been pushed to
# production and every box is updating state_id.
set_unavailable_query = (QueueItem set_unavailable_query = (QueueItem
.update(available=False, .update(available=False,
processing_expires=now + timedelta(seconds=processing_time), processing_expires=now + timedelta(seconds=processing_time),
retries_remaining=QueueItem.retries_remaining-1) retries_remaining=QueueItem.retries_remaining - 1,
.where(QueueItem.id == db_item.id)) state_id=str(uuid.uuid4()))
changed_query = (self._available_jobs_where(set_unavailable_query, now) .where(QueueItem.id == db_item.id,
.where(QueueItem.processing_expires == db_item.processing_expires)) QueueItem.state_id == db_item.state_id,
changed = changed_query.execute() QueueItem.processing_expires == db_item.processing_expires))
if changed == 1:
item = AttrDict({ changed = set_unavailable_query.execute()
return changed == 1
def get(self, processing_time=300, ordering_required=False):
"""
Get an available item and mark it as unavailable for the default of five
minutes. The result of this method must always be composed of simple
python objects which are JSON serializable for network portability reasons.
"""
now = datetime.utcnow()
# Select an available queue item.
db_item = self._select_available_item(ordering_required, now)
if db_item is None:
self._currently_processing = False
return None
# Attempt to claim the item for this instance.
was_claimed = self._attempt_to_claim_item(db_item, now, processing_time)
if not was_claimed:
self._currently_processing = False
return None
self._currently_processing = True
# Return a view of the queue item rather than an active db object
return AttrDict({
'id': db_item.id, 'id': db_item.id,
'body': db_item.body, 'body': db_item.body,
'retries_remaining': db_item.retries_remaining - 1, 'retries_remaining': db_item.retries_remaining - 1,
}) })
self._currently_processing = True
except QueueItem.DoesNotExist:
self._currently_processing = False
# Return a view of the queue item rather than an active db object
return item
def cancel(self, item_id): def cancel(self, item_id):
""" Attempts to cancel the queue item with the given ID from the queue. Returns true on success """ Attempts to cancel the queue item with the given ID from the queue. Returns true on success
and false if the queue item could not be canceled. and false if the queue item could not be canceled.
""" """
count_removed = QueueItem.delete().where(QueueItem.id == item_id).execute()
with self._transaction_factory(db): return count_removed > 0
# Load the build queue item for update.
try:
queue_item = db_for_update(QueueItem.select()
.where(QueueItem.id == item_id)).get()
except QueueItem.DoesNotExist:
return False
# Delete the queue item.
queue_item.delete_instance(recursive=True)
return True
def complete(self, completed_item): def complete(self, completed_item):
with self._transaction_factory(db): self._currently_processing = not self.cancel(completed_item.id)
try:
completed_item_obj = self._item_by_id_for_update(completed_item.id)
except QueueItem.DoesNotExist:
self._currently_processing = False
return
completed_item_obj.delete_instance(recursive=True)
self._currently_processing = False
def incomplete(self, incomplete_item, retry_after=300, restore_retry=False): def incomplete(self, incomplete_item, retry_after=300, restore_retry=False):
with self._transaction_factory(db): with self._transaction_factory(db):
@ -338,8 +356,9 @@ class WorkQueue(object):
if has_change: if has_change:
queue_item.save() queue_item.save()
return has_change
except QueueItem.DoesNotExist: except QueueItem.DoesNotExist:
return return False
def delete_expired(expiration_threshold, deletion_threshold, batch_size): def delete_expired(expiration_threshold, deletion_threshold, batch_size):

View file

@ -7,8 +7,8 @@ from functools import wraps
from app import app from app import app
from initdb import setup_database_for_testing, finished_database_for_testing from initdb import setup_database_for_testing, finished_database_for_testing
from data.queue import WorkQueue from data.database import QueueItem
from datetime import timedelta from data.queue import WorkQueue, MINIMUM_EXTENSION
QUEUE_NAME = 'testqueuename' QUEUE_NAME = 'testqueuename'
@ -63,6 +63,72 @@ class QueueTestCase(unittest.TestCase):
class TestQueue(QueueTestCase): class TestQueue(QueueTestCase):
def test_get_single_item(self):
# Add a single item to the queue.
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)
# Have two "instances" retrieve an item to claim. Since there is only one, both calls should
# return the same item.
now = datetime.utcnow()
first_item = self.queue._select_available_item(False, now)
second_item = self.queue._select_available_item(False, now)
self.assertEquals(first_item.id, second_item.id)
self.assertEquals(first_item.state_id, second_item.state_id)
# Have both "instances" now try to claim the item. Only one should succeed.
first_claimed = self.queue._attempt_to_claim_item(first_item, now, 300)
second_claimed = self.queue._attempt_to_claim_item(first_item, now, 300)
self.assertTrue(first_claimed)
self.assertFalse(second_claimed)
# Ensure the item is no longer available.
self.assertIsNone(self.queue.get())
# Ensure the item's state ID has changed.
self.assertNotEqual(first_item.state_id, QueueItem.get().state_id)
def test_extend_processing(self):
# Add and retrieve a queue item.
self.queue.put(['abc', 'def'], self.TEST_MESSAGE_1, available_after=-1)
queue_item = self.queue.get(processing_time=10)
self.assertIsNotNone(queue_item)
existing_db_item = QueueItem.get(id=queue_item.id)
# Call extend processing with a timedelta less than the minimum and ensure its
# processing_expires and state_id do not change.
changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() - 1)
self.assertFalse(changed)
updated_db_item = QueueItem.get(id=queue_item.id)
self.assertEquals(existing_db_item.processing_expires, updated_db_item.processing_expires)
self.assertEquals(existing_db_item.state_id, updated_db_item.state_id)
# Call extend processing with a timedelta greater than the minimum and ensure its
# processing_expires and state_id are changed.
changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() + 1)
self.assertTrue(changed)
updated_db_item = QueueItem.get(id=queue_item.id)
self.assertNotEqual(existing_db_item.processing_expires, updated_db_item.processing_expires)
self.assertNotEqual(existing_db_item.state_id, updated_db_item.state_id)
# Call extend processing with a timedelta less than the minimum but also with new data and
# ensure its processing_expires and state_id are changed.
changed = self.queue.extend_processing(queue_item, 10 + MINIMUM_EXTENSION.total_seconds() - 1,
updated_data='newbody')
self.assertTrue(changed)
updated_db_item = QueueItem.get(id=queue_item.id)
self.assertNotEqual(existing_db_item.processing_expires, updated_db_item.processing_expires)
self.assertNotEqual(existing_db_item.state_id, updated_db_item.state_id)
self.assertEquals('newbody', updated_db_item.body)
def test_same_canonical_names(self): def test_same_canonical_names(self):
self.assertEqual(self.reporter.currently_processing, None) self.assertEqual(self.reporter.currently_processing, None)
self.assertEqual(self.reporter.running_count, None) self.assertEqual(self.reporter.running_count, None)
@ -244,6 +310,74 @@ class TestQueue(QueueTestCase):
count = self.queue.num_available_jobs_between(now, now, 'abc') count = self.queue.num_available_jobs_between(now, now, 'abc')
self.assertEqual(0, count) self.assertEqual(0, count)
def test_incomplete(self):
# Add an item.
self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10)
now = datetime.utcnow()
count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now,
['/somenamespace'])
self.assertEqual(1, count)
# Retrieve it.
item = self.queue.get()
self.assertIsNotNone(item)
self.assertTrue(self.reporter.currently_processing)
# Mark it as incomplete.
self.queue.incomplete(item, retry_after=-1)
self.assertFalse(self.reporter.currently_processing)
# Retrieve again to ensure it is once again available.
same_item = self.queue.get()
self.assertIsNotNone(same_item)
self.assertTrue(self.reporter.currently_processing)
self.assertEquals(item.id, same_item.id)
def test_complete(self):
# Add an item.
self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10)
now = datetime.utcnow()
count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now,
['/somenamespace'])
self.assertEqual(1, count)
# Retrieve it.
item = self.queue.get()
self.assertIsNotNone(item)
self.assertTrue(self.reporter.currently_processing)
# Mark it as complete.
self.queue.complete(item)
self.assertFalse(self.reporter.currently_processing)
def test_cancel(self):
# Add an item.
self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_1, available_after=-10)
self.queue.put(['somenamespace', 'abc', 'def'], self.TEST_MESSAGE_2, available_after=-5)
now = datetime.utcnow()
count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now,
['/somenamespace'])
self.assertEqual(2, count)
# Retrieve it.
item = self.queue.get()
self.assertIsNotNone(item)
# Make sure we can cancel it.
self.assertTrue(self.queue.cancel(item.id))
now = datetime.utcnow()
count = self.queue.num_available_jobs_between(now - timedelta(seconds=60), now,
['/somenamespace'])
self.assertEqual(1, count)
# Make sure it is gone.
self.assertFalse(self.queue.cancel(item.id))
def test_deleted_namespaced_items(self): def test_deleted_namespaced_items(self):
self.queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory, self.queue = AutoUpdatingQueue(WorkQueue(QUEUE_NAME, self.transaction_factory,
reporter=self.reporter, reporter=self.reporter,