Merge pull request #2287 from coreos-inc/backfill-state-id
Switch QueueItem state_id to be unique after a backfill
This commit is contained in:
		
						commit
						6b0691577e
					
				
					 3 changed files with 35 additions and 6 deletions
				
			
		|  | @ -762,7 +762,7 @@ class QueueItem(BaseModel): | ||||||
|   available = BooleanField(default=True) |   available = BooleanField(default=True) | ||||||
|   processing_expires = DateTimeField(null=True) |   processing_expires = DateTimeField(null=True) | ||||||
|   retries_remaining = IntegerField(default=5) |   retries_remaining = IntegerField(default=5) | ||||||
|   state_id = CharField(default=uuid_generator, index=True, unique=False) |   state_id = CharField(default=uuid_generator, index=True, unique=True) | ||||||
| 
 | 
 | ||||||
|   class Meta: |   class Meta: | ||||||
|     database = db |     database = db | ||||||
|  |  | ||||||
|  | @ -0,0 +1,33 @@ | ||||||
|  | """Backfill state_id and make it unique | ||||||
|  | 
 | ||||||
|  | Revision ID: d42c175b439a | ||||||
|  | Revises: 3e8cc74a1e7b | ||||||
|  | Create Date: 2017-01-18 15:11:01.635632 | ||||||
|  | 
 | ||||||
|  | """ | ||||||
|  | 
 | ||||||
|  | # revision identifiers, used by Alembic. | ||||||
|  | revision = 'd42c175b439a' | ||||||
|  | down_revision = '3e8cc74a1e7b' | ||||||
|  | 
 | ||||||
|  | from alembic import op | ||||||
|  | import sqlalchemy as sa | ||||||
|  | from sqlalchemy.dialects import mysql | ||||||
|  | 
 | ||||||
|  | def upgrade(tables): | ||||||
|  |     # Backfill the queueitem table's state_id field with unique values for all entries which are | ||||||
|  |     # empty. | ||||||
|  |     conn = op.get_bind() | ||||||
|  |     conn.execute("update queueitem set state_id = id where state_id = ''") | ||||||
|  | 
 | ||||||
|  |     # ### commands auto generated by Alembic - please adjust! ### | ||||||
|  |     op.drop_index('queueitem_state_id', table_name='queueitem') | ||||||
|  |     op.create_index('queueitem_state_id', 'queueitem', ['state_id'], unique=True) | ||||||
|  |     # ### end Alembic commands ### | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def downgrade(tables): | ||||||
|  |     # ### commands auto generated by Alembic - please adjust! ### | ||||||
|  |     op.drop_index('queueitem_state_id', table_name='queueitem') | ||||||
|  |     op.create_index('queueitem_state_id', 'queueitem', ['state_id'], unique=False) | ||||||
|  |     # ### end Alembic commands ### | ||||||
|  | @ -263,17 +263,13 @@ class WorkQueue(object): | ||||||
|     # performing the update. Previously, we would check all these columns, resulting in a bunch |     # performing the update. Previously, we would check all these columns, resulting in a bunch | ||||||
|     # of lock contention. This change mitigates the problem significantly by only checking two |     # of lock contention. This change mitigates the problem significantly by only checking two | ||||||
|     # columns (id and state_id), both of which should be absolutely unique at all times. |     # columns (id and state_id), both of which should be absolutely unique at all times. | ||||||
|     # |  | ||||||
|     # TODO(jschorr): Remove the extra `processing_expires` check once this has been pushed to |  | ||||||
|     # production and every box is updating state_id. |  | ||||||
|     set_unavailable_query = (QueueItem |     set_unavailable_query = (QueueItem | ||||||
|                              .update(available=False, |                              .update(available=False, | ||||||
|                                      processing_expires=now + timedelta(seconds=processing_time), |                                      processing_expires=now + timedelta(seconds=processing_time), | ||||||
|                                      retries_remaining=QueueItem.retries_remaining - 1, |                                      retries_remaining=QueueItem.retries_remaining - 1, | ||||||
|                                      state_id=str(uuid.uuid4())) |                                      state_id=str(uuid.uuid4())) | ||||||
|                              .where(QueueItem.id == db_item.id, |                              .where(QueueItem.id == db_item.id, | ||||||
|                                     QueueItem.state_id == db_item.state_id, |                                     QueueItem.state_id == db_item.state_id)) | ||||||
|                                     QueueItem.processing_expires == db_item.processing_expires)) |  | ||||||
| 
 | 
 | ||||||
|     changed = set_unavailable_query.execute() |     changed = set_unavailable_query.execute() | ||||||
|     return changed == 1 |     return changed == 1 | ||||||
|  |  | ||||||
		Reference in a new issue