From 073b68cf0d2a0fba66dc3cc8022625626021a8a8 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Thu, 7 Jan 2016 16:56:36 -0500 Subject: [PATCH] Fix torrent migration and update backfill to compute torrent pieces --- ..._add_the_torrentinfo_table_and_torrent_.py | 6 +- util/migrate/backfill_content_checksums.py | 121 --------------- ...ll_content_checksums_and_torrent_pieces.py | 145 ++++++++++++++++++ 3 files changed, 149 insertions(+), 123 deletions(-) delete mode 100644 util/migrate/backfill_content_checksums.py create mode 100644 util/migrate/backfill_content_checksums_and_torrent_pieces.py diff --git a/data/migrations/versions/23ca04d0bc8e_add_the_torrentinfo_table_and_torrent_.py b/data/migrations/versions/23ca04d0bc8e_add_the_torrentinfo_table_and_torrent_.py index 1e16f9838..1cab180d5 100644 --- a/data/migrations/versions/23ca04d0bc8e_add_the_torrentinfo_table_and_torrent_.py +++ b/data/migrations/versions/23ca04d0bc8e_add_the_torrentinfo_table_and_torrent_.py @@ -13,6 +13,8 @@ down_revision = '471caec2cb66' from alembic import op import sqlalchemy as sa +from util.migrate import UTF8LongText + def upgrade(tables): ### commands auto generated by Alembic - please adjust! ### op.create_table('torrentinfo', @@ -25,8 +27,8 @@ def upgrade(tables): ) op.create_index('torrentinfo_storage_id', 'torrentinfo', ['storage_id'], unique=False) op.create_index('torrentinfo_storage_id_piece_length', 'torrentinfo', ['storage_id', 'piece_length'], unique=True) - op.add_column(u'blobupload', sa.Column('piece_hashes', sa.Text(), nullable=False)) - op.add_column(u'blobupload', sa.Column('piece_sha_state', sa.Text(), nullable=True)) + op.add_column(u'blobupload', sa.Column('piece_hashes', UTF8LongText(), nullable=False)) + op.add_column(u'blobupload', sa.Column('piece_sha_state', UTF8LongText(), nullable=True)) ### end Alembic commands ### diff --git a/util/migrate/backfill_content_checksums.py b/util/migrate/backfill_content_checksums.py deleted file mode 100644 index c03bb13b6..000000000 --- a/util/migrate/backfill_content_checksums.py +++ /dev/null @@ -1,121 +0,0 @@ -import logging - -from peewee import JOIN_LEFT_OUTER - -from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField, - TextField, fn) - -from data.database import BaseModel, CloseForLongOperation -from app import app, storage -from digest import checksums -from util.migrate.allocator import yield_random_entries - - -BATCH_SIZE = 1000 - - -logger = logging.getLogger(__name__) - - -class Repository(BaseModel): - pass - - -# Vendor the information from tables we will be writing to at the time of this migration -class ImageStorage(BaseModel): - uuid = CharField(index=True, unique=True) - checksum = CharField(null=True) - image_size = BigIntegerField(null=True) - uncompressed_size = BigIntegerField(null=True) - uploading = BooleanField(default=True, null=True) - cas_path = BooleanField(default=True) - content_checksum = CharField(null=True, index=True) - - -class Image(BaseModel): - docker_image_id = CharField(index=True) - repository = ForeignKeyField(Repository) - ancestors = CharField(index=True, default='/', max_length=64535, null=True) - storage = ForeignKeyField(ImageStorage, index=True, null=True) - created = DateTimeField(null=True) - comment = TextField(null=True) - command = TextField(null=True) - aggregate_size = BigIntegerField(null=True) - v1_json_metadata = TextField(null=True) - v1_checksum = CharField(null=True) - - -class ImageStorageLocation(BaseModel): - name = CharField(unique=True, index=True) - - -class ImageStoragePlacement(BaseModel): - storage = ForeignKeyField(ImageStorage) - location = ForeignKeyField(ImageStorageLocation) - - - -def _get_image_storage_locations(storage_id): - placements_query = (ImageStoragePlacement - .select(ImageStoragePlacement, ImageStorageLocation) - .join(ImageStorageLocation) - .switch(ImageStoragePlacement) - .join(ImageStorage, JOIN_LEFT_OUTER) - .where(ImageStorage.id == storage_id)) - - locations = set() - for placement in placements_query: - locations.add(placement.location.name) - - return locations - - -def backfill_content_checksums(): - """ Copies metadata from image storages to their images. """ - logger.debug('Began execution') - logger.debug('This may be a long operation!') - - def batch_query(): - return (ImageStorage - .select(ImageStorage.id, ImageStorage.uuid) - .where(ImageStorage.content_checksum >> None, ImageStorage.uploading == False)) - - max_id = ImageStorage.select(fn.Max(ImageStorage.id)).scalar() - - written = 0 - for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, BATCH_SIZE, - max_id): - locations = _get_image_storage_locations(candidate_storage.id) - - checksum = None - with CloseForLongOperation(app.config): - try: - # Compute the checksum - layer_path = storage.image_layer_path(candidate_storage.uuid) - with storage.stream_read_file(locations, layer_path) as layer_data_handle: - checksum = 'sha256:{0}'.format(checksums.sha256_file(layer_data_handle)) - except Exception as exc: - logger.warning('Unable to compute checksum for storage: %s', candidate_storage.uuid) - checksum = 'unknown:{0}'.format(exc.__class__.__name__) - - # Now update the ImageStorage with the checksum - num_updated = (ImageStorage - .update(content_checksum=checksum) - .where(ImageStorage.id == candidate_storage.id, - ImageStorage.content_checksum >> None)).execute() - if num_updated == 0: - logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid) - abort.set() - - written += num_updated - if (written % BATCH_SIZE) == 0: - logger.debug('%s entries written', written) - - logger.debug('Completed, %s entries written', written) - -if __name__ == '__main__': - logging.basicConfig(level=logging.DEBUG) - logging.getLogger('peewee').setLevel(logging.WARNING) - logging.getLogger('boto').setLevel(logging.WARNING) - logging.getLogger('data.database').setLevel(logging.WARNING) - backfill_content_checksums() diff --git a/util/migrate/backfill_content_checksums_and_torrent_pieces.py b/util/migrate/backfill_content_checksums_and_torrent_pieces.py new file mode 100644 index 000000000..3a290466c --- /dev/null +++ b/util/migrate/backfill_content_checksums_and_torrent_pieces.py @@ -0,0 +1,145 @@ +import logging + +from peewee import (JOIN_LEFT_OUTER, CharField, BigIntegerField, BooleanField, ForeignKeyField, + IntegerField, IntegrityError, fn) + +from data.database import BaseModel, CloseForLongOperation +from data.fields import Base64BinaryField +from app import app, storage +from digest import checksums +from util.migrate.allocator import yield_random_entries +from util.registry.torrent import PieceHasher +from util.registry.filelike import wrap_with_handler + + +BATCH_SIZE = 1000 + + +logger = logging.getLogger(__name__) + + +# Vendor the information from tables we will be writing to at the time of this migration +class ImageStorage(BaseModel): + uuid = CharField(index=True, unique=True) + checksum = CharField(null=True) + image_size = BigIntegerField(null=True) + uncompressed_size = BigIntegerField(null=True) + uploading = BooleanField(default=True, null=True) + cas_path = BooleanField(default=True) + content_checksum = CharField(null=True, index=True) + + +class ImageStorageLocation(BaseModel): + name = CharField(unique=True, index=True) + + +class ImageStoragePlacement(BaseModel): + storage = ForeignKeyField(ImageStorage) + location = ForeignKeyField(ImageStorageLocation) + + +class TorrentInfo(BaseModel): + storage = ForeignKeyField(ImageStorage) + piece_length = IntegerField() + pieces = Base64BinaryField() + + +def _get_image_storage_locations(storage_id): + placements_query = (ImageStoragePlacement + .select(ImageStoragePlacement, ImageStorageLocation) + .join(ImageStorageLocation) + .switch(ImageStoragePlacement) + .join(ImageStorage, JOIN_LEFT_OUTER) + .where(ImageStorage.id == storage_id)) + + locations = set() + for placement in placements_query: + locations.add(placement.location.name) + + return locations + + +def _get_layer_path(storage_record): + """ Returns the path in the storage engine to the layer data referenced by the storage row. """ + if not storage_record.cas_path: + logger.debug('Serving layer from legacy v1 path: %s', storage_record.uuid) + return storage.v1_image_layer_path(storage_record.uuid) + return storage.blob_path(storage_record.content_checksum) + + +def backfill_content_checksums_and_torrent_pieces(piece_length): + """ Hashes the entire file for the content associated with an imagestorage. """ + logger.debug('Began execution') + logger.debug('This may be a long operation!') + + def batch_query(): + return (ImageStorage + .select(ImageStorage.id, ImageStorage.uuid, ImageStorage.content_checksum, + ImageStorage.cas_path) + .join(TorrentInfo, JOIN_LEFT_OUTER, on=((TorrentInfo.storage == ImageStorage.id) & + (TorrentInfo.piece_length == piece_length))) + .where((TorrentInfo.id >> None) | (ImageStorage.content_checksum >> None))) + + max_id = ImageStorage.select(fn.Max(ImageStorage.id)).scalar() + + checksums_written = 0 + pieces_written = 0 + for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, BATCH_SIZE, + max_id): + locations = _get_image_storage_locations(candidate_storage.id) + + checksum = ImageStorage.content_checksum + torrent_pieces = '' + with CloseForLongOperation(app.config): + try: + # Compute the checksum + layer_path = _get_layer_path(candidate_storage) + with storage.stream_read_file(locations, layer_path) as layer_data_handle: + hasher = PieceHasher(piece_length) + wrapped = wrap_with_handler(layer_data_handle, hasher.update) + checksum = 'sha256:{0}'.format(checksums.sha256_file(wrapped)) + torrent_pieces = hasher.final_piece_hashes() + except Exception as exc: + logger.exception('Unable to compute hashes for storage: %s', candidate_storage.uuid) + + # Create a fallback value for the checksum + if checksum is None: + checksum = 'unknown:{0}'.format(exc.__class__.__name__) + + torrent_collision = False + checksum_collision = False + + # Now update the ImageStorage with the checksum + num_updated = (ImageStorage + .update(content_checksum=checksum) + .where(ImageStorage.id == candidate_storage.id, + ImageStorage.content_checksum >> None)).execute() + checksums_written += num_updated + if num_updated == 0: + checksum_collision = True + + try: + TorrentInfo.create(storage=candidate_storage.id, piece_length=piece_length, + pieces=torrent_pieces) + pieces_written += 1 + except IntegrityError: + torrent_collision = True + + if torrent_collision and checksum_collision: + logger.info('Another worker pre-empted us for storage: %s', candidate_storage.uuid) + abort.set() + + if (pieces_written % BATCH_SIZE) == 0 or (checksums_written % BATCH_SIZE) == 0: + logger.debug('%s checksums written, %s torrent pieces written', checksums_written, + pieces_written) + + logger.debug('Completed, %s checksums written, %s torrent pieces written', checksums_written, + pieces_written) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + #logging.getLogger('peewee').setLevel(logging.WARNING) + logging.getLogger('boto').setLevel(logging.WARNING) + logging.getLogger('data.database').setLevel(logging.WARNING) + backfill_content_checksums_and_torrent_pieces(app.config['TORRENT_PIECE_SIZE'])