import logging import zlib from data import model from data.database import ImageStorage from app import app, storage as store from data.database import db, db_random_func from util.gzipstream import ZLIB_GZIP_WINDOW logger = logging.getLogger(__name__) CHUNK_SIZE = 5 * 1024 * 1024 def backfill_sizes_from_data(): logger.setLevel(logging.DEBUG) logger.debug('Starting uncompressed image size backfill') logger.debug('NOTE: This can be a LONG RUNNING OPERATION. Please wait!') # Check for any uncompressed images. has_images = bool(list(ImageStorage .select(ImageStorage.uuid) .where(ImageStorage.uncompressed_size >> None, ImageStorage.image_size > 0, ImageStorage.uploading == False) .limit(1))) if not has_images: logger.debug('Uncompressed backfill: No migration needed') return logger.debug('Uncompressed backfill: Starting migration') encountered = set() while True: # Load the record from the DB. batch_ids = list(ImageStorage .select(ImageStorage.uuid) .where(ImageStorage.uncompressed_size >> None, ImageStorage.image_size > 0, ImageStorage.uploading == False) .limit(100) .order_by(db_random_func())) batch_ids = set([s.uuid for s in batch_ids]) - encountered logger.debug('Found %s images to process', len(batch_ids)) if len(batch_ids) == 0: # We're done! return counter = 1 for uuid in batch_ids: encountered.add(uuid) logger.debug('Processing image ID %s (%s/%s)', uuid, counter, len(batch_ids)) counter = counter + 1 try: with_locs = model.storage.get_storage_by_uuid(uuid) if with_locs.uncompressed_size is not None: logger.debug('Somebody else already filled this in for us: %s', uuid) continue # Read the layer from backing storage and calculate the uncompressed size. logger.debug('Loading data: %s (%s bytes)', uuid, with_locs.image_size) decompressor = zlib.decompressobj(ZLIB_GZIP_WINDOW) uncompressed_size = 0 with store.stream_read_file(with_locs.locations, store.image_layer_path(uuid)) as stream: while True: current_data = stream.read(CHUNK_SIZE) if len(current_data) == 0: break while current_data: uncompressed_size += len(decompressor.decompress(current_data, CHUNK_SIZE)) current_data = decompressor.unconsumed_tail # Write the size to the image storage. We do so under a transaction AFTER checking to # make sure the image storage still exists and has not changed. logger.debug('Writing entry: %s. Size: %s', uuid, uncompressed_size) with app.config['DB_TRANSACTION_FACTORY'](db): current_record = model.storage.get_storage_by_uuid(uuid) if not current_record.uploading and current_record.uncompressed_size == None: current_record.uncompressed_size = uncompressed_size current_record.save() else: logger.debug('Somebody else already filled this in for us, after we did the work: %s', uuid) except model.InvalidImageException: logger.warning('Storage with uuid no longer exists: %s', uuid) except IOError: logger.warning('IOError on %s', uuid) except MemoryError: logger.warning('MemoryError on %s', uuid) if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) logging.getLogger('boto').setLevel(logging.CRITICAL) logging.getLogger('peewee').setLevel(logging.CRITICAL) backfill_sizes_from_data()