Merge pull request #841 from jakedt/fasterbackfill

Optimistically update backfill items, reducing RTs
This commit is contained in:
Jake Moshenko 2015-11-10 11:39:40 -05:00
commit ab4dbb74b0
3 changed files with 59 additions and 43 deletions

View file

@ -5,12 +5,15 @@ from peewee import JOIN_LEFT_OUTER
from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
TextField, fn)
from data.database import BaseModel, db, db_for_update, CloseForLongOperation
from data.database import BaseModel, CloseForLongOperation
from app import app, storage
from digest import checksums
from util.migrate.allocator import yield_random_entries
BATCH_SIZE = 1000
logger = logging.getLogger(__name__)
@ -69,7 +72,8 @@ def _get_image_storage_locations(storage_id):
def backfill_content_checksums():
""" Copies metadata from image storages to their images. """
logger.debug('Image content checksum backfill: Began execution')
logger.debug('Began execution')
logger.debug('This may be a long operation!')
def batch_query():
return (ImageStorage
@ -78,9 +82,9 @@ def backfill_content_checksums():
max_id = ImageStorage.select(fn.Max(ImageStorage.id)).scalar()
for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, 1000, max_id):
logger.debug('Computing content checksum for storage: %s', candidate_storage.uuid)
written = 0
for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, BATCH_SIZE,
max_id):
locations = _get_image_storage_locations(candidate_storage.id)
checksum = None
@ -95,15 +99,19 @@ def backfill_content_checksums():
checksum = 'unknown:{0}'.format(exc.__class__.__name__)
# Now update the ImageStorage with the checksum
with app.config['DB_TRANSACTION_FACTORY'](db):
to_update = db_for_update(ImageStorage.get(ImageStorage.id == candidate_storage.id))
if to_update.content_checksum is not None:
num_updated = (ImageStorage
.update(content_checksum=checksum)
.where(ImageStorage.id == candidate_storage.id,
ImageStorage.content_checksum >> None)).execute()
if num_updated == 0:
logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid)
abort.set()
else:
logger.debug('Setting content checksum to %s for %s', checksum, candidate_storage.uuid)
to_update.content_checksum = checksum
to_update.save()
written += num_updated
if (written % BATCH_SIZE) == 0:
logger.debug('%s entries written', written)
logger.debug('Completed, %s entries written', written)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)

View file

@ -1,11 +1,15 @@
import logging
from data.database import BaseModel, db, db_for_update
from data.database import BaseModel
from peewee import (fn, CharField, BigIntegerField, ForeignKeyField, BooleanField, DateTimeField,
TextField, IntegerField)
from app import app
from util.migrate.allocator import yield_random_entries
BATCH_SIZE = 1000
class Repository(BaseModel):
pass
@ -56,19 +60,20 @@ def backfill_parent_id():
max_id = Image.select(fn.Max(Image.id)).scalar()
for to_backfill, abort in yield_random_entries(fetch_batch, Image.id, 1000, max_id):
with app.config['DB_TRANSACTION_FACTORY'](db):
try:
image = db_for_update(Image
.select()
.where(Image.id == to_backfill.id, Image.parent_id >> None)).get()
image.parent_id = int(to_backfill.ancestors.split('/')[-2])
image.save()
except Image.DoesNotExist:
written = 0
for to_backfill, abort in yield_random_entries(fetch_batch, Image.id, BATCH_SIZE, max_id):
computed_parent = int(to_backfill.ancestors.split('/')[-2])
num_changed = (Image
.update(parent_id=computed_parent)
.where(Image.id == to_backfill.id, Image.parent_id >> None)).execute()
if num_changed == 0:
logger.info('Collision with another worker, aborting batch')
abort.set()
written += num_changed
if (written % BATCH_SIZE) == 0:
logger.debug('%s entries written', written)
logger.debug('backfill_parent_id: Completed')
logger.debug('backfill_parent_id: Completed, updated %s entries', written)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)

View file

@ -2,11 +2,14 @@ import logging
from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
TextField, fn)
from data.database import BaseModel, db, db_for_update
from data.database import BaseModel
from util.migrate.allocator import yield_random_entries
from app import app
BATCH_SIZE = 1000
logger = logging.getLogger(__name__)
@ -40,30 +43,30 @@ class Image(BaseModel):
def backfill_checksums():
""" Copies checksums from image storages to their images. """
logger.debug('Image v1 checksum backfill: Began execution')
logger.debug('Began execution')
logger.debug('This may be a long operation!')
def batch_query():
return (Image
.select(Image.id)
.select(Image, ImageStorage)
.join(ImageStorage)
.where(Image.v1_checksum >> None, ImageStorage.uploading == False,
~(ImageStorage.checksum >> None)))
max_id = Image.select(fn.Max(Image.id)).scalar()
for candidate_image, abort in yield_random_entries(batch_query, Image.id, 1000, max_id):
with app.config['DB_TRANSACTION_FACTORY'](db):
try:
image = db_for_update(Image
.select(Image, ImageStorage)
.join(ImageStorage)
.where(Image.id == candidate_image.id,
Image.v1_checksum >> None)).get()
image.v1_checksum = image.storage.checksum
image.save()
except Image.DoesNotExist:
written = 0
for candidate_image, abort in yield_random_entries(batch_query, Image.id, BATCH_SIZE, max_id):
num_changed = (Image
.update(v1_checksum=candidate_image.storage.checksum)
.where(Image.id == candidate_image.id, Image.v1_checksum >> None)).execute()
if num_changed == 0:
logger.info('Collision with another worker, aborting batch')
abort.set()
written += num_changed
if (written % BATCH_SIZE) == 0:
logger.debug('%s entries written', written)
logger.debug('Completed, updated %s entries', written)
if __name__ == "__main__":