From a33077b9785351d23844a0b9b6ced619d502b662 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Tue, 10 Nov 2015 11:10:09 -0500 Subject: [PATCH] Optimistically update backfill items, reducing RTs --- util/migrate/backfill_content_checksums.py | 36 +++++++++++++--------- util/migrate/backfill_parent_id.py | 31 +++++++++++-------- util/migrate/backfill_v1_checksums.py | 35 +++++++++++---------- 3 files changed, 59 insertions(+), 43 deletions(-) diff --git a/util/migrate/backfill_content_checksums.py b/util/migrate/backfill_content_checksums.py index 86770a35b..2b2c0cdca 100644 --- a/util/migrate/backfill_content_checksums.py +++ b/util/migrate/backfill_content_checksums.py @@ -5,12 +5,15 @@ from peewee import JOIN_LEFT_OUTER from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField, TextField, fn) -from data.database import BaseModel, db, db_for_update, CloseForLongOperation +from data.database import BaseModel, CloseForLongOperation from app import app, storage from digest import checksums from util.migrate.allocator import yield_random_entries +BATCH_SIZE = 1000 + + logger = logging.getLogger(__name__) @@ -69,7 +72,8 @@ def _get_image_storage_locations(storage_id): def backfill_content_checksums(): """ Copies metadata from image storages to their images. """ - logger.debug('Image content checksum backfill: Began execution') + logger.debug('Began execution') + logger.debug('This may be a long operation!') def batch_query(): return (ImageStorage @@ -78,9 +82,9 @@ def backfill_content_checksums(): max_id = ImageStorage.select(fn.Max(ImageStorage.id)).scalar() - for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, 1000, max_id): - logger.debug('Computing content checksum for storage: %s', candidate_storage.uuid) - + written = 0 + for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, BATCH_SIZE, + max_id): locations = _get_image_storage_locations(candidate_storage.id) checksum = None @@ -95,15 +99,19 @@ def backfill_content_checksums(): checksum = 'unknown:{0}'.format(exc.__class__.__name__) # Now update the ImageStorage with the checksum - with app.config['DB_TRANSACTION_FACTORY'](db): - to_update = db_for_update(ImageStorage.get(ImageStorage.id == candidate_storage.id)) - if to_update.content_checksum is not None: - logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid) - abort.set() - else: - logger.debug('Setting content checksum to %s for %s', checksum, candidate_storage.uuid) - to_update.content_checksum = checksum - to_update.save() + num_updated = (ImageStorage + .update(content_checksum=checksum) + .where(ImageStorage.id == candidate_storage.id, + ImageStorage.content_checksum >> None)).execute() + if num_updated == 0: + logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid) + abort.set() + + written += num_updated + if (written % BATCH_SIZE) == 0: + logger.debug('%s entries written', written) + + logger.debug('Completed, %s entries written', written) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) diff --git a/util/migrate/backfill_parent_id.py b/util/migrate/backfill_parent_id.py index fc2db3bff..0c1621775 100644 --- a/util/migrate/backfill_parent_id.py +++ b/util/migrate/backfill_parent_id.py @@ -1,11 +1,15 @@ import logging -from data.database import BaseModel, db, db_for_update +from data.database import BaseModel from peewee import (fn, CharField, BigIntegerField, ForeignKeyField, BooleanField, DateTimeField, TextField, IntegerField) from app import app from util.migrate.allocator import yield_random_entries + +BATCH_SIZE = 1000 + + class Repository(BaseModel): pass @@ -56,19 +60,20 @@ def backfill_parent_id(): max_id = Image.select(fn.Max(Image.id)).scalar() - for to_backfill, abort in yield_random_entries(fetch_batch, Image.id, 1000, max_id): - with app.config['DB_TRANSACTION_FACTORY'](db): - try: - image = db_for_update(Image - .select() - .where(Image.id == to_backfill.id, Image.parent_id >> None)).get() - image.parent_id = int(to_backfill.ancestors.split('/')[-2]) - image.save() - except Image.DoesNotExist: - logger.info('Collision with another worker, aborting batch') - abort.set() + written = 0 + for to_backfill, abort in yield_random_entries(fetch_batch, Image.id, BATCH_SIZE, max_id): + computed_parent = int(to_backfill.ancestors.split('/')[-2]) + num_changed = (Image + .update(parent_id=computed_parent) + .where(Image.id == to_backfill.id, Image.parent_id >> None)).execute() + if num_changed == 0: + logger.info('Collision with another worker, aborting batch') + abort.set() + written += num_changed + if (written % BATCH_SIZE) == 0: + logger.debug('%s entries written', written) - logger.debug('backfill_parent_id: Completed') + logger.debug('backfill_parent_id: Completed, updated %s entries', written) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) diff --git a/util/migrate/backfill_v1_checksums.py b/util/migrate/backfill_v1_checksums.py index e9516cf91..0c4e190ae 100644 --- a/util/migrate/backfill_v1_checksums.py +++ b/util/migrate/backfill_v1_checksums.py @@ -2,11 +2,14 @@ import logging from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField, TextField, fn) -from data.database import BaseModel, db, db_for_update +from data.database import BaseModel from util.migrate.allocator import yield_random_entries from app import app +BATCH_SIZE = 1000 + + logger = logging.getLogger(__name__) @@ -40,30 +43,30 @@ class Image(BaseModel): def backfill_checksums(): """ Copies checksums from image storages to their images. """ - logger.debug('Image v1 checksum backfill: Began execution') + logger.debug('Began execution') + logger.debug('This may be a long operation!') def batch_query(): return (Image - .select(Image.id) + .select(Image, ImageStorage) .join(ImageStorage) .where(Image.v1_checksum >> None, ImageStorage.uploading == False, ~(ImageStorage.checksum >> None))) max_id = Image.select(fn.Max(Image.id)).scalar() - for candidate_image, abort in yield_random_entries(batch_query, Image.id, 1000, max_id): - with app.config['DB_TRANSACTION_FACTORY'](db): - try: - image = db_for_update(Image - .select(Image, ImageStorage) - .join(ImageStorage) - .where(Image.id == candidate_image.id, - Image.v1_checksum >> None)).get() + written = 0 + for candidate_image, abort in yield_random_entries(batch_query, Image.id, BATCH_SIZE, max_id): + num_changed = (Image + .update(v1_checksum=candidate_image.storage.checksum) + .where(Image.id == candidate_image.id, Image.v1_checksum >> None)).execute() + if num_changed == 0: + logger.info('Collision with another worker, aborting batch') + abort.set() + written += num_changed + if (written % BATCH_SIZE) == 0: + logger.debug('%s entries written', written) - image.v1_checksum = image.storage.checksum - image.save() - except Image.DoesNotExist: - logger.info('Collision with another worker, aborting batch') - abort.set() + logger.debug('Completed, updated %s entries', written) if __name__ == "__main__":