Change queue to use state-field for claiming items

Before this change, the queue code would check that none of the fields on the item to be claimed had changed between the time when the item was selected and the item is claimed. While this is a safe approach, it also causes quite a bit of lock contention in MySQL, because InnoDB will take a lock on *any* rows examined by the `where` clause of the `update`, even if they will ultimately thrown out due to other clauses (See: http://dev.mysql.com/doc/refman/5.7/en/innodb-locks-set.html: "A ..., an UPDATE, ... generally set record locks on every index record that is scanned in the processing of the SQL statement. It does not matter whether there are WHERE conditions in the statement that would exclude the row. InnoDB does not remember the exact WHERE condition, but only knows which index ranges were scanned").

As a result, we want to minimize the number of fields accessed in the `where` clause on an update to the QueueItem row. To do so, we introduce a new `state_id` column, which is updated on *every change* to the QueueItem rows with a unique, random value. We can then have the queue item claiming code simply check that the `state_id` column has not changed between the retrieval and claiming steps. This minimizes the number of columns being checked to two (`id` and `state_id`), and thus, should significantly reduce lock contention. Note that we can not (yet) reduce to just a single `state_id` column (which should work in theory), because we need to maintain backwards compatibility with existing items in the QueueItem table, which will be given empty `state_id` values when the migration in this change runs.

Also adds a number of tests for other queue operations that we want to make sure operate correctly following this change.

[Delivers #133632501]
This commit is contained in:
Joseph Schorr 2017-01-12 16:13:27 -05:00
parent 19cb64df5d
commit 8c4e86f48b
4 changed files with 206 additions and 36 deletions

View file

@ -1,8 +1,11 @@
import uuid
from datetime import datetime, timedelta
from contextlib import contextmanager
from data.database import QueueItem, db, db_for_update, db_random_func
from util.morecollections import AttrDict
from hashlib import sha256
MINIMUM_EXTENSION = timedelta(seconds=20)
@ -167,7 +170,7 @@ class WorkQueue(object):
except QueueItem.DoesNotExist:
return False
def _queue_body(self, canonical_name_list, message, available_after, retries_remaining):
def _queue_dict(self, canonical_name_list, message, available_after, retries_remaining):
return dict(
queue_name=self._canonical_name([self._queue_name] + canonical_name_list),
body=message,
@ -183,7 +186,7 @@ class WorkQueue(object):
Put an item, if it shouldn't be processed for some number of seconds,
specify that amount as available_after. Returns the ID of the queue item added.
"""
items_to_insert.append(self._queue_body(canonical_name_list, message, available_after,
items_to_insert.append(self._queue_dict(canonical_name_list, message, available_after,
retries_remaining))
yield batch_put
@ -203,7 +206,7 @@ class WorkQueue(object):
Put an item, if it shouldn't be processed for some number of seconds,
specify that amount as available_after. Returns the ID of the queue item added.
"""
item = QueueItem.create(**self._queue_body(canonical_name_list, message, available_after,
item = QueueItem.create(**self._queue_dict(canonical_name_list, message, available_after,
retries_remaining))
if self._metric_queue:
@ -211,15 +214,11 @@ class WorkQueue(object):
return str(item.id)
def get(self, processing_time=300, ordering_required=False):
def _select_available_item(self, ordering_required, now):
""" Selects an available queue item from the queue table and returns it, if any. If none,
return None.
"""
Get an available item and mark it as unavailable for the default of five
minutes. The result of this method must always be composed of simple
python objects which are JSON serializable for network portability reasons.
"""
now = datetime.utcnow()
name_match_query = self._name_match_query()
item = None
try:
if ordering_required:
@ -233,36 +232,81 @@ class WorkQueue(object):
# rows know that another instance is already handling that item.
running = self._running_jobs(now, name_match_query)
avail = self._available_jobs_not_running(now, name_match_query, running)
db_item = avail.order_by(QueueItem.id).get()
return avail.order_by(QueueItem.id).get()
else:
# If we don't require ordering, we grab a random item from any of the first 50 available.
subquery = self._available_jobs(now, name_match_query).limit(50).alias('j1')
db_item = (QueueItem
.select()
.join(subquery, on=QueueItem.id == subquery.c.id)
.order_by(db_random_func())
.get())
return (QueueItem
.select()
.join(subquery, on=QueueItem.id == subquery.c.id)
.order_by(db_random_func())
.get())
set_unavailable_query = (QueueItem
.update(available=False,
processing_expires=now + timedelta(seconds=processing_time),
retries_remaining=QueueItem.retries_remaining-1)
.where(QueueItem.id == db_item.id))
changed_query = (self._available_jobs_where(set_unavailable_query, now)
.where(QueueItem.processing_expires == db_item.processing_expires))
changed = changed_query.execute()
if changed == 1:
item = AttrDict({
'id': db_item.id,
'body': db_item.body,
'retries_remaining': db_item.retries_remaining - 1,
})
self._currently_processing = True
except QueueItem.DoesNotExist:
# No available queue item was found.
return None
def _attempt_to_claim_item(self, db_item, now, processing_time):
""" Attempts to claim the specified queue item for this instance. Returns True on success and
False on failure.
Note that the underlying QueueItem row in the database will be changed on success, but
the db_item object given as a parameter will *not* have its fields updated.
"""
# Try to claim the item. We do so by updating the item's information only if its current
# state ID matches that returned in the previous query. Since all updates to the QueueItem
# must change the state ID, this is guarenteed to only succeed if the item has not yet been
# claimed by another caller.
#
# Note that we use this method because InnoDB takes locks on *every* clause in the WHERE when
# performing the update. Previously, we would check all these columns, resulting in a bunch
# of lock contention. This change mitigates the problem significantly by only checking two
# columns (id and state_id), both of which should be absolutely unique at all times.
#
# TODO(jschorr): Remove the extra `processing_expires` check once this has been pushed to
# production and every box is updating state_id.
set_unavailable_query = (QueueItem
.update(available=False,
processing_expires=now + timedelta(seconds=processing_time),
retries_remaining=QueueItem.retries_remaining - 1,
state_id=str(uuid.uuid4()))
.where(QueueItem.id == db_item.id,
QueueItem.state_id == db_item.state_id,
QueueItem.processing_expires == db_item.processing_expires))
changed = set_unavailable_query.execute()
return changed == 1
def get(self, processing_time=300, ordering_required=False):
"""
Get an available item and mark it as unavailable for the default of five
minutes. The result of this method must always be composed of simple
python objects which are JSON serializable for network portability reasons.
"""
now = datetime.utcnow()
# Select an available queue item.
db_item = self._select_available_item(ordering_required, now)
if db_item is None:
self._currently_processing = False
return None
# Attempt to claim the item for this instance.
was_claimed = self._attempt_to_claim_item(db_item, now, processing_time)
if not was_claimed:
self._currently_processing = False
return None
self._currently_processing = True
# Return a view of the queue item rather than an active db object
return item
return AttrDict({
'id': db_item.id,
'body': db_item.body,
'retries_remaining': db_item.retries_remaining - 1,
})
def cancel(self, item_id):
""" Attempts to cancel the queue item with the given ID from the queue. Returns true on success
@ -312,8 +356,9 @@ class WorkQueue(object):
if has_change:
queue_item.save()
return has_change
except QueueItem.DoesNotExist:
return
return False
def delete_expired(expiration_threshold, deletion_threshold, batch_size):