This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/util/migrate/backfill_content_checksums_and_torrent_pieces.py

145 lines
5.4 KiB
Python

import logging
from peewee import (JOIN_LEFT_OUTER, CharField, BigIntegerField, BooleanField, ForeignKeyField,
IntegerField, IntegrityError, fn)
from data.database import BaseModel, CloseForLongOperation
from data.fields import Base64BinaryField
from app import app, storage
from digest import checksums
from util.migrate.allocator import yield_random_entries
from util.registry.torrent import PieceHasher
from util.registry.filelike import wrap_with_handler
BATCH_SIZE = 1000
logger = logging.getLogger(__name__)
# Vendor the information from tables we will be writing to at the time of this migration
class ImageStorage(BaseModel):
uuid = CharField(index=True, unique=True)
checksum = CharField(null=True)
image_size = BigIntegerField(null=True)
uncompressed_size = BigIntegerField(null=True)
uploading = BooleanField(default=True, null=True)
cas_path = BooleanField(default=True)
content_checksum = CharField(null=True, index=True)
class ImageStorageLocation(BaseModel):
name = CharField(unique=True, index=True)
class ImageStoragePlacement(BaseModel):
storage = ForeignKeyField(ImageStorage)
location = ForeignKeyField(ImageStorageLocation)
class TorrentInfo(BaseModel):
storage = ForeignKeyField(ImageStorage)
piece_length = IntegerField()
pieces = Base64BinaryField()
def _get_image_storage_locations(storage_id):
placements_query = (ImageStoragePlacement
.select(ImageStoragePlacement, ImageStorageLocation)
.join(ImageStorageLocation)
.switch(ImageStoragePlacement)
.join(ImageStorage, JOIN_LEFT_OUTER)
.where(ImageStorage.id == storage_id))
locations = set()
for placement in placements_query:
locations.add(placement.location.name)
return locations
def _get_layer_path(storage_record):
""" Returns the path in the storage engine to the layer data referenced by the storage row. """
if not storage_record.cas_path:
logger.debug('Serving layer from legacy v1 path: %s', storage_record.uuid)
return storage.v1_image_layer_path(storage_record.uuid)
return storage.blob_path(storage_record.content_checksum)
def backfill_content_checksums_and_torrent_pieces(piece_length):
""" Hashes the entire file for the content associated with an imagestorage. """
logger.debug('Began execution')
logger.debug('This may be a long operation!')
def batch_query():
return (ImageStorage
.select(ImageStorage.id, ImageStorage.uuid, ImageStorage.content_checksum,
ImageStorage.cas_path)
.join(TorrentInfo, JOIN_LEFT_OUTER, on=((TorrentInfo.storage == ImageStorage.id) &
(TorrentInfo.piece_length == piece_length)))
.where((TorrentInfo.id >> None) | (ImageStorage.content_checksum >> None)))
max_id = ImageStorage.select(fn.Max(ImageStorage.id)).scalar()
checksums_written = 0
pieces_written = 0
for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, BATCH_SIZE,
max_id):
locations = _get_image_storage_locations(candidate_storage.id)
checksum = ImageStorage.content_checksum
torrent_pieces = ''
with CloseForLongOperation(app.config):
try:
# Compute the checksum
layer_path = _get_layer_path(candidate_storage)
with storage.stream_read_file(locations, layer_path) as layer_data_handle:
hasher = PieceHasher(piece_length)
wrapped = wrap_with_handler(layer_data_handle, hasher.update)
checksum = 'sha256:{0}'.format(checksums.sha256_file(wrapped))
torrent_pieces = hasher.final_piece_hashes()
except Exception as exc:
logger.exception('Unable to compute hashes for storage: %s', candidate_storage.uuid)
# Create a fallback value for the checksum
if checksum is None:
checksum = 'unknown:{0}'.format(exc.__class__.__name__)
torrent_collision = False
checksum_collision = False
# Now update the ImageStorage with the checksum
num_updated = (ImageStorage
.update(content_checksum=checksum)
.where(ImageStorage.id == candidate_storage.id,
ImageStorage.content_checksum >> None)).execute()
checksums_written += num_updated
if num_updated == 0:
checksum_collision = True
try:
TorrentInfo.create(storage=candidate_storage.id, piece_length=piece_length,
pieces=torrent_pieces)
pieces_written += 1
except IntegrityError:
torrent_collision = True
if torrent_collision and checksum_collision:
logger.info('Another worker pre-empted us for storage: %s', candidate_storage.uuid)
abort.set()
if (pieces_written % BATCH_SIZE) == 0 or (checksums_written % BATCH_SIZE) == 0:
logger.debug('%s checksums written, %s torrent pieces written', checksums_written,
pieces_written)
logger.debug('Completed, %s checksums written, %s torrent pieces written', checksums_written,
pieces_written)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
#logging.getLogger('peewee').setLevel(logging.WARNING)
logging.getLogger('boto').setLevel(logging.WARNING)
logging.getLogger('data.database').setLevel(logging.WARNING)
backfill_content_checksums_and_torrent_pieces(app.config['TORRENT_PIECE_SIZE'])