Switch QueueItem state_id to be unique after a backfill

This commit is contained in:
Joseph Schorr 2017-01-18 15:19:45 -05:00
parent e2748fccd9
commit 71ec23b550
3 changed files with 35 additions and 6 deletions

View file

@ -762,7 +762,7 @@ class QueueItem(BaseModel):
available = BooleanField(default=True)
processing_expires = DateTimeField(null=True)
retries_remaining = IntegerField(default=5)
state_id = CharField(default=uuid_generator, index=True, unique=False)
state_id = CharField(default=uuid_generator, index=True, unique=True)
class Meta:
database = db

View file

@ -0,0 +1,33 @@
"""Backfill state_id and make it unique
Revision ID: d42c175b439a
Revises: 3e8cc74a1e7b
Create Date: 2017-01-18 15:11:01.635632
"""
# revision identifiers, used by Alembic.
revision = 'd42c175b439a'
down_revision = '3e8cc74a1e7b'
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
def upgrade(tables):
# Backfill the queueitem table's state_id field with unique values for all entries which are
# empty.
conn = op.get_bind()
conn.execute("update queueitem set state_id = id where state_id = ''")
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('queueitem_state_id', table_name='queueitem')
op.create_index('queueitem_state_id', 'queueitem', ['state_id'], unique=True)
# ### end Alembic commands ###
def downgrade(tables):
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('queueitem_state_id', table_name='queueitem')
op.create_index('queueitem_state_id', 'queueitem', ['state_id'], unique=False)
# ### end Alembic commands ###

View file

@ -263,17 +263,13 @@ class WorkQueue(object):
# performing the update. Previously, we would check all these columns, resulting in a bunch
# of lock contention. This change mitigates the problem significantly by only checking two
# columns (id and state_id), both of which should be absolutely unique at all times.
#
# TODO(jschorr): Remove the extra `processing_expires` check once this has been pushed to
# production and every box is updating state_id.
set_unavailable_query = (QueueItem
.update(available=False,
processing_expires=now + timedelta(seconds=processing_time),
retries_remaining=QueueItem.retries_remaining - 1,
state_id=str(uuid.uuid4()))
.where(QueueItem.id == db_item.id,
QueueItem.state_id == db_item.state_id,
QueueItem.processing_expires == db_item.processing_expires))
QueueItem.state_id == db_item.state_id))
changed = set_unavailable_query.execute()
return changed == 1