Select random records to use for the backfill script for uncompressed sizes, can now be parallelized.

This commit is contained in:
Jake Moshenko 2014-10-06 17:15:45 -04:00
parent c4266140e2
commit 153dbc3f92

View file

@ -4,7 +4,7 @@ import zlib
from data import model
from data.database import ImageStorage
from app import app, storage as store
from data.database import db
from data.database import db, db_random_func
from util.gzipstream import ZLIB_GZIP_WINDOW
@ -17,45 +17,50 @@ CHUNK_SIZE = 5 * 1024 * 1024
def backfill_sizes_from_data():
while True:
# Load the record from the DB.
try:
record = (ImageStorage
.select(ImageStorage.uuid)
.where(ImageStorage.uncompressed_size >> None, ImageStorage.uploading == False)
.get())
except ImageStorage.DoesNotExist:
batch_ids = list(ImageStorage
.select(ImageStorage.uuid)
.where(ImageStorage.uncompressed_size >> None,
ImageStorage.uploading == False)
.limit(100)
.order_by(db_random_func()))
if len(batch_ids) == 0:
# We're done!
return
uuid = record.uuid
for record in batch_ids:
uuid = record.uuid
with_locations = model.get_storage_by_uuid(uuid)
# Read the layer from backing storage and calculate the uncompressed size.
logger.debug('Loading data: %s (%s bytes)', uuid, with_locations.image_size)
decompressor = zlib.decompressobj(ZLIB_GZIP_WINDOW)
uncompressed_size = 0
with store.stream_read_file(with_locations.locations, store.image_layer_path(uuid)) as stream:
while True:
current_data = stream.read(CHUNK_SIZE)
if len(current_data) == 0:
break
uncompressed_size += len(decompressor.decompress(current_data))
# Write the size to the image storage. We do so under a transaction AFTER checking to
# make sure the image storage still exists and has not changed.
logger.debug('Writing entry: %s. Size: %s', uuid, uncompressed_size)
with app.config['DB_TRANSACTION_FACTORY'](db):
try:
current_record = model.get_storage_by_uuid(uuid)
except model.InvalidImageException:
logger.warning('Storage with uuid no longer exists: %s', uuid)
with_locations = model.get_storage_by_uuid(uuid)
if with_locations.uncompressed_size is not None:
logger.debug('Somebody else already filled this in for us: %s', uuid)
continue
if not current_record.uploading and current_record.uncompressed_size == None:
current_record.uncompressed_size = uncompressed_size
current_record.save()
# Read the layer from backing storage and calculate the uncompressed size.
logger.debug('Loading data: %s (%s bytes)', uuid, with_locations.image_size)
decompressor = zlib.decompressobj(ZLIB_GZIP_WINDOW)
uncompressed_size = 0
with store.stream_read_file(with_locations.locations, store.image_layer_path(uuid)) as stream:
while True:
current_data = stream.read(CHUNK_SIZE)
if len(current_data) == 0:
break
uncompressed_size += len(decompressor.decompress(current_data))
# Write the size to the image storage. We do so under a transaction AFTER checking to
# make sure the image storage still exists and has not changed.
logger.debug('Writing entry: %s. Size: %s', uuid, uncompressed_size)
with app.config['DB_TRANSACTION_FACTORY'](db):
try:
current_record = model.get_storage_by_uuid(uuid)
except model.InvalidImageException:
logger.warning('Storage with uuid no longer exists: %s', uuid)
continue
if not current_record.uploading and current_record.uncompressed_size == None:
current_record.uncompressed_size = uncompressed_size
current_record.save()
if __name__ == "__main__":