Optimistically update backfill items, reducing RTs
This commit is contained in:
parent
493d077f62
commit
a33077b978
3 changed files with 59 additions and 43 deletions
|
@ -5,12 +5,15 @@ from peewee import JOIN_LEFT_OUTER
|
|||
from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
|
||||
TextField, fn)
|
||||
|
||||
from data.database import BaseModel, db, db_for_update, CloseForLongOperation
|
||||
from data.database import BaseModel, CloseForLongOperation
|
||||
from app import app, storage
|
||||
from digest import checksums
|
||||
from util.migrate.allocator import yield_random_entries
|
||||
|
||||
|
||||
BATCH_SIZE = 1000
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -69,7 +72,8 @@ def _get_image_storage_locations(storage_id):
|
|||
|
||||
def backfill_content_checksums():
|
||||
""" Copies metadata from image storages to their images. """
|
||||
logger.debug('Image content checksum backfill: Began execution')
|
||||
logger.debug('Began execution')
|
||||
logger.debug('This may be a long operation!')
|
||||
|
||||
def batch_query():
|
||||
return (ImageStorage
|
||||
|
@ -78,9 +82,9 @@ def backfill_content_checksums():
|
|||
|
||||
max_id = ImageStorage.select(fn.Max(ImageStorage.id)).scalar()
|
||||
|
||||
for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, 1000, max_id):
|
||||
logger.debug('Computing content checksum for storage: %s', candidate_storage.uuid)
|
||||
|
||||
written = 0
|
||||
for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, BATCH_SIZE,
|
||||
max_id):
|
||||
locations = _get_image_storage_locations(candidate_storage.id)
|
||||
|
||||
checksum = None
|
||||
|
@ -95,15 +99,19 @@ def backfill_content_checksums():
|
|||
checksum = 'unknown:{0}'.format(exc.__class__.__name__)
|
||||
|
||||
# Now update the ImageStorage with the checksum
|
||||
with app.config['DB_TRANSACTION_FACTORY'](db):
|
||||
to_update = db_for_update(ImageStorage.get(ImageStorage.id == candidate_storage.id))
|
||||
if to_update.content_checksum is not None:
|
||||
logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid)
|
||||
abort.set()
|
||||
else:
|
||||
logger.debug('Setting content checksum to %s for %s', checksum, candidate_storage.uuid)
|
||||
to_update.content_checksum = checksum
|
||||
to_update.save()
|
||||
num_updated = (ImageStorage
|
||||
.update(content_checksum=checksum)
|
||||
.where(ImageStorage.id == candidate_storage.id,
|
||||
ImageStorage.content_checksum >> None)).execute()
|
||||
if num_updated == 0:
|
||||
logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid)
|
||||
abort.set()
|
||||
|
||||
written += num_updated
|
||||
if (written % BATCH_SIZE) == 0:
|
||||
logger.debug('%s entries written', written)
|
||||
|
||||
logger.debug('Completed, %s entries written', written)
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
|
|
@ -1,11 +1,15 @@
|
|||
import logging
|
||||
|
||||
from data.database import BaseModel, db, db_for_update
|
||||
from data.database import BaseModel
|
||||
from peewee import (fn, CharField, BigIntegerField, ForeignKeyField, BooleanField, DateTimeField,
|
||||
TextField, IntegerField)
|
||||
from app import app
|
||||
from util.migrate.allocator import yield_random_entries
|
||||
|
||||
|
||||
BATCH_SIZE = 1000
|
||||
|
||||
|
||||
class Repository(BaseModel):
|
||||
pass
|
||||
|
||||
|
@ -56,19 +60,20 @@ def backfill_parent_id():
|
|||
|
||||
max_id = Image.select(fn.Max(Image.id)).scalar()
|
||||
|
||||
for to_backfill, abort in yield_random_entries(fetch_batch, Image.id, 1000, max_id):
|
||||
with app.config['DB_TRANSACTION_FACTORY'](db):
|
||||
try:
|
||||
image = db_for_update(Image
|
||||
.select()
|
||||
.where(Image.id == to_backfill.id, Image.parent_id >> None)).get()
|
||||
image.parent_id = int(to_backfill.ancestors.split('/')[-2])
|
||||
image.save()
|
||||
except Image.DoesNotExist:
|
||||
logger.info('Collision with another worker, aborting batch')
|
||||
abort.set()
|
||||
written = 0
|
||||
for to_backfill, abort in yield_random_entries(fetch_batch, Image.id, BATCH_SIZE, max_id):
|
||||
computed_parent = int(to_backfill.ancestors.split('/')[-2])
|
||||
num_changed = (Image
|
||||
.update(parent_id=computed_parent)
|
||||
.where(Image.id == to_backfill.id, Image.parent_id >> None)).execute()
|
||||
if num_changed == 0:
|
||||
logger.info('Collision with another worker, aborting batch')
|
||||
abort.set()
|
||||
written += num_changed
|
||||
if (written % BATCH_SIZE) == 0:
|
||||
logger.debug('%s entries written', written)
|
||||
|
||||
logger.debug('backfill_parent_id: Completed')
|
||||
logger.debug('backfill_parent_id: Completed, updated %s entries', written)
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
|
|
@ -2,11 +2,14 @@ import logging
|
|||
|
||||
from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
|
||||
TextField, fn)
|
||||
from data.database import BaseModel, db, db_for_update
|
||||
from data.database import BaseModel
|
||||
from util.migrate.allocator import yield_random_entries
|
||||
from app import app
|
||||
|
||||
|
||||
BATCH_SIZE = 1000
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -40,30 +43,30 @@ class Image(BaseModel):
|
|||
|
||||
def backfill_checksums():
|
||||
""" Copies checksums from image storages to their images. """
|
||||
logger.debug('Image v1 checksum backfill: Began execution')
|
||||
logger.debug('Began execution')
|
||||
logger.debug('This may be a long operation!')
|
||||
def batch_query():
|
||||
return (Image
|
||||
.select(Image.id)
|
||||
.select(Image, ImageStorage)
|
||||
.join(ImageStorage)
|
||||
.where(Image.v1_checksum >> None, ImageStorage.uploading == False,
|
||||
~(ImageStorage.checksum >> None)))
|
||||
|
||||
max_id = Image.select(fn.Max(Image.id)).scalar()
|
||||
|
||||
for candidate_image, abort in yield_random_entries(batch_query, Image.id, 1000, max_id):
|
||||
with app.config['DB_TRANSACTION_FACTORY'](db):
|
||||
try:
|
||||
image = db_for_update(Image
|
||||
.select(Image, ImageStorage)
|
||||
.join(ImageStorage)
|
||||
.where(Image.id == candidate_image.id,
|
||||
Image.v1_checksum >> None)).get()
|
||||
written = 0
|
||||
for candidate_image, abort in yield_random_entries(batch_query, Image.id, BATCH_SIZE, max_id):
|
||||
num_changed = (Image
|
||||
.update(v1_checksum=candidate_image.storage.checksum)
|
||||
.where(Image.id == candidate_image.id, Image.v1_checksum >> None)).execute()
|
||||
if num_changed == 0:
|
||||
logger.info('Collision with another worker, aborting batch')
|
||||
abort.set()
|
||||
written += num_changed
|
||||
if (written % BATCH_SIZE) == 0:
|
||||
logger.debug('%s entries written', written)
|
||||
|
||||
image.v1_checksum = image.storage.checksum
|
||||
image.save()
|
||||
except Image.DoesNotExist:
|
||||
logger.info('Collision with another worker, aborting batch')
|
||||
abort.set()
|
||||
logger.debug('Completed, updated %s entries', written)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
Reference in a new issue