import logging from peewee import (JOIN_LEFT_OUTER, CharField, BigIntegerField, BooleanField, ForeignKeyField, IntegerField, IntegrityError, fn) from data.database import BaseModel, CloseForLongOperation from data.fields import Base64BinaryField from app import app, storage from digest import checksums from util.migrate.allocator import yield_random_entries from util.registry.torrent import PieceHasher from util.registry.filelike import wrap_with_handler BATCH_SIZE = 1000 logger = logging.getLogger(__name__) # Vendor the information from tables we will be writing to at the time of this migration class ImageStorage(BaseModel): uuid = CharField(index=True, unique=True) checksum = CharField(null=True) image_size = BigIntegerField(null=True) uncompressed_size = BigIntegerField(null=True) uploading = BooleanField(default=True, null=True) cas_path = BooleanField(default=True) content_checksum = CharField(null=True, index=True) class ImageStorageLocation(BaseModel): name = CharField(unique=True, index=True) class ImageStoragePlacement(BaseModel): storage = ForeignKeyField(ImageStorage) location = ForeignKeyField(ImageStorageLocation) class TorrentInfo(BaseModel): storage = ForeignKeyField(ImageStorage) piece_length = IntegerField() pieces = Base64BinaryField() def _get_image_storage_locations(storage_id): placements_query = (ImageStoragePlacement .select(ImageStoragePlacement, ImageStorageLocation) .join(ImageStorageLocation) .switch(ImageStoragePlacement) .join(ImageStorage, JOIN_LEFT_OUTER) .where(ImageStorage.id == storage_id)) locations = set() for placement in placements_query: locations.add(placement.location.name) return locations def _get_layer_path(storage_record): """ Returns the path in the storage engine to the layer data referenced by the storage row. """ if not storage_record.cas_path: logger.debug('Serving layer from legacy v1 path: %s', storage_record.uuid) return storage.v1_image_layer_path(storage_record.uuid) return storage.blob_path(storage_record.content_checksum) def backfill_content_checksums_and_torrent_pieces(piece_length): """ Hashes the entire file for the content associated with an imagestorage. """ logger.debug('Began execution') logger.debug('This may be a long operation!') def batch_query(): return (ImageStorage .select(ImageStorage.id, ImageStorage.uuid, ImageStorage.content_checksum, ImageStorage.cas_path) .join(TorrentInfo, JOIN_LEFT_OUTER, on=((TorrentInfo.storage == ImageStorage.id) & (TorrentInfo.piece_length == piece_length))) .where((TorrentInfo.id >> None) | (ImageStorage.content_checksum >> None))) max_id = ImageStorage.select(fn.Max(ImageStorage.id)).scalar() checksums_written = 0 pieces_written = 0 for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, BATCH_SIZE, max_id): locations = _get_image_storage_locations(candidate_storage.id) checksum = ImageStorage.content_checksum torrent_pieces = '' with CloseForLongOperation(app.config): try: # Compute the checksum layer_path = _get_layer_path(candidate_storage) with storage.stream_read_file(locations, layer_path) as layer_data_handle: hasher = PieceHasher(piece_length) wrapped = wrap_with_handler(layer_data_handle, hasher.update) checksum = 'sha256:{0}'.format(checksums.sha256_file(wrapped)) torrent_pieces = hasher.final_piece_hashes() except Exception as exc: logger.exception('Unable to compute hashes for storage: %s', candidate_storage.uuid) # Create a fallback value for the checksum if checksum is None: checksum = 'unknown:{0}'.format(exc.__class__.__name__) torrent_collision = False checksum_collision = False # Now update the ImageStorage with the checksum num_updated = (ImageStorage .update(content_checksum=checksum) .where(ImageStorage.id == candidate_storage.id, ImageStorage.content_checksum >> None)).execute() checksums_written += num_updated if num_updated == 0: checksum_collision = True try: TorrentInfo.create(storage=candidate_storage.id, piece_length=piece_length, pieces=torrent_pieces) pieces_written += 1 except IntegrityError: torrent_collision = True if torrent_collision and checksum_collision: logger.info('Another worker pre-empted us for storage: %s', candidate_storage.uuid) abort.set() if (pieces_written % BATCH_SIZE) == 0 or (checksums_written % BATCH_SIZE) == 0: logger.debug('%s checksums written, %s torrent pieces written', checksums_written, pieces_written) logger.debug('Completed, %s checksums written, %s torrent pieces written', checksums_written, pieces_written) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) #logging.getLogger('peewee').setLevel(logging.WARNING) logging.getLogger('boto').setLevel(logging.WARNING) logging.getLogger('data.database').setLevel(logging.WARNING) backfill_content_checksums_and_torrent_pieces(app.config['BITTORRENT_PIECE_SIZE'])