Update the backfill script to always read the size from the layer data
This commit is contained in:
parent
e0993b26af
commit
45208983bf
1 changed files with 30 additions and 76 deletions
|
@ -1,92 +1,47 @@
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import zlib
|
||||||
|
|
||||||
from data import model
|
from data import model
|
||||||
from data.database import ImageStorage
|
from data.database import ImageStorage
|
||||||
from app import app, storage as store
|
from app import app, storage as store
|
||||||
from data.database import db
|
from data.database import db
|
||||||
from gzip import GzipFile
|
from util.gzipstream import ZLIB_GZIP_WINDOW
|
||||||
from tempfile import SpooledTemporaryFile
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def backfill_sizes_from_json():
|
|
||||||
query = (ImageStorage
|
|
||||||
.select()
|
|
||||||
.where(ImageStorage.uncompressed_size == None, ImageStorage.uploading == False)
|
|
||||||
.limit(100))
|
|
||||||
|
|
||||||
total = 0
|
|
||||||
missing = 0
|
|
||||||
batch_processed = 1
|
|
||||||
|
|
||||||
while batch_processed > 0:
|
|
||||||
batch_processed = 0
|
|
||||||
with app.config['DB_TRANSACTION_FACTORY'](db):
|
|
||||||
for image_storage in query.clone():
|
|
||||||
total += 1
|
|
||||||
batch_processed += 1
|
|
||||||
|
|
||||||
if (total - 1) % 100 == 0:
|
|
||||||
logger.debug('Storing entry: %s', total)
|
|
||||||
|
|
||||||
# Lookup the JSON for the image.
|
|
||||||
uuid = image_storage.uuid
|
|
||||||
with_locations = model.get_storage_by_uuid(uuid)
|
|
||||||
|
|
||||||
try:
|
|
||||||
json_string = store.get_content(with_locations.locations, store.image_json_path(uuid))
|
|
||||||
json_data = json.loads(json_string)
|
|
||||||
size = json_data.get('Size', json_data.get('size', -1))
|
|
||||||
except IOError:
|
|
||||||
logger.debug('Image storage with no json %s', uuid)
|
|
||||||
size = -1
|
|
||||||
|
|
||||||
if size == -1:
|
|
||||||
missing += 1
|
|
||||||
|
|
||||||
logger.debug('Missing entry %s (%s/%s)', uuid, missing, total)
|
|
||||||
|
|
||||||
image_storage.uncompressed_size = size
|
|
||||||
image_storage.save()
|
|
||||||
|
|
||||||
|
|
||||||
def backfill_sizes_from_data():
|
def backfill_sizes_from_data():
|
||||||
storage_ids = list(ImageStorage
|
while True:
|
||||||
.select(ImageStorage.uuid)
|
# Load the record from the DB.
|
||||||
.where(ImageStorage.uncompressed_size == -1, ImageStorage.uploading == False))
|
|
||||||
|
|
||||||
counter = 0
|
|
||||||
for uuid in [s.uuid for s in storage_ids]:
|
|
||||||
counter += 1
|
|
||||||
|
|
||||||
# Load the storage with locations.
|
|
||||||
logger.debug('Loading entry: %s (%s/%s)', uuid, counter, len(storage_ids))
|
|
||||||
with_locations = model.get_storage_by_uuid(uuid)
|
|
||||||
layer_size = -2
|
|
||||||
|
|
||||||
# Read the layer from backing storage and calculate the uncompressed size.
|
|
||||||
try:
|
try:
|
||||||
logger.debug('Loading data: %s (%s bytes)', uuid, with_locations.image_size)
|
record = (ImageStorage
|
||||||
CHUNK_SIZE = 512 * 1024
|
.select(ImageStorage.uuid)
|
||||||
with SpooledTemporaryFile(CHUNK_SIZE) as tarball:
|
.where(ImageStorage.uncompressed_size == None, ImageStorage.uploading == False)
|
||||||
layer_data = store.get_content(with_locations.locations, store.image_layer_path(uuid))
|
.get())
|
||||||
tarball.write(layer_data)
|
except ImageStorage.DoesNotExist:
|
||||||
tarball.seek(0)
|
# We're done!
|
||||||
|
return
|
||||||
|
|
||||||
with GzipFile(fileobj=tarball, mode='rb') as gzip_file:
|
uuid = record.uuid
|
||||||
gzip_file.read()
|
|
||||||
layer_size = gzip_file.size
|
# Read the layer from backing storage and calculate the uncompressed size.
|
||||||
|
logger.debug('Loading data: %s (%s bytes)', uuid, with_locations.image_size)
|
||||||
|
decompressor = zlib.decompressobj(ZLIB_GZIP_WINDOW)
|
||||||
|
stream = store.read_stream(with_locations.locations, store.image_layer_path(uuid))
|
||||||
|
|
||||||
|
uncompressed_size = 0
|
||||||
|
CHUNK_SIZE = 512 * 1024 * 1024
|
||||||
|
while True:
|
||||||
|
current_data = stream.read(CHUNK_SIZE)
|
||||||
|
if len(current_data) == 0:
|
||||||
|
break
|
||||||
|
|
||||||
|
uncompressed_size += len(decompressor.decompress(current_data))
|
||||||
|
|
||||||
except Exception as ex:
|
|
||||||
logger.debug('Could not gunzip entry: %s. Reason: %s', uuid, ex)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Write the size to the image storage. We do so under a transaction AFTER checking to
|
# Write the size to the image storage. We do so under a transaction AFTER checking to
|
||||||
# make sure the image storage still exists and has not changed.
|
# make sure the image storage still exists and has not changed.
|
||||||
logger.debug('Writing entry: %s. Size: %s', uuid, layer_size)
|
logger.debug('Writing entry: %s. Size: %s', uuid, uncompressed_size)
|
||||||
with app.config['DB_TRANSACTION_FACTORY'](db):
|
with app.config['DB_TRANSACTION_FACTORY'](db):
|
||||||
try:
|
try:
|
||||||
current_record = model.get_storage_by_uuid(uuid)
|
current_record = model.get_storage_by_uuid(uuid)
|
||||||
|
@ -94,14 +49,13 @@ def backfill_sizes_from_data():
|
||||||
# Record no longer exists.
|
# Record no longer exists.
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if not current_record.uploading and current_record.uncompressed_size == -1:
|
if not current_record.uploading and current_record.uncompressed_size == None:
|
||||||
current_record.uncompressed_size = layer_size
|
current_record.uncompressed_size = uncompressed_size
|
||||||
current_record.save()
|
#current_record.save()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
logging.getLogger('boto').setLevel(logging.CRITICAL)
|
logging.getLogger('boto').setLevel(logging.CRITICAL)
|
||||||
|
|
||||||
backfill_sizes_from_json()
|
|
||||||
backfill_sizes_from_data()
|
backfill_sizes_from_data()
|
||||||
|
|
Reference in a new issue