diff --git a/data/migrations/versions/22af01f81722_backfill_parent_id_and_v1_checksums.py b/data/migrations/versions/22af01f81722_backfill_parent_id_and_v1_checksums.py new file mode 100644 index 000000000..e6d732dcc --- /dev/null +++ b/data/migrations/versions/22af01f81722_backfill_parent_id_and_v1_checksums.py @@ -0,0 +1,19 @@ +"""Backfill parent id and v1 checksums + +Revision ID: 22af01f81722 +Revises: 2827d36939e4 +Create Date: 2015-11-05 16:24:43.679323 + +""" + +# revision identifiers, used by Alembic. +revision = '22af01f81722' +down_revision = '2827d36939e4' + +from util.migrate.backfill_v1_checksums import backfill_checksums + +def upgrade(tables): + backfill_checksums() + +def downgrade(tables): + pass diff --git a/util/migrate/__init__.py b/util/migrate/__init__.py index 6b0a65feb..e1770742d 100644 --- a/util/migrate/__init__.py +++ b/util/migrate/__init__.py @@ -1,5 +1,12 @@ +import logging + from sqlalchemy.types import TypeDecorator, Text from sqlalchemy.dialects.mysql import TEXT as MySQLText, LONGTEXT +from random import shuffle + + +logger = logging.getLogger(__name__) + class UTF8LongText(TypeDecorator): """ Platform-independent UTF-8 LONGTEXT type. @@ -14,3 +21,56 @@ class UTF8LongText(TypeDecorator): return dialect.type_descriptor(LONGTEXT(charset='utf8mb4', collation='utf8mb4_unicode_ci')) else: return dialect.type_descriptor(Text()) + + +def _chance_duplication(pop_size, samples): + """ The chance of randomly selecting a duplicate when you choose the specified number of samples + from the specified population size. + """ + pairs = (samples * (samples - 1)) / 2.0 + unique = (pop_size - 1.0)/pop_size + all_unique = pow(unique, pairs) + return 1 - all_unique + + +def _num_checks(pop_size, desired): + """ Binary search for the proper number of entries to use to get the specified collision + probability. + """ + s_max = pop_size + s_min = 0 + last_test = -1 + s_test = s_max + + while s_max > s_min and last_test != s_test: + last_test = s_test + s_test = (s_max + s_min)/2 + chance = _chance_duplication(pop_size, s_test) + if chance > desired: + s_max = s_test - 1 + else: + s_min = s_test + + return s_test + + +def yield_random_entries(batch_query, batch_size, collision_chance): + """ This method will yield semi-random items from a query in a database friendly way until no + more items match the base query modifier. It will pull batches of batch_size from the query + and yield enough items from each batch so that concurrent workers have a reduced chance of + selecting the same items. For example, if your batches return 10,000 entries, and you desire + only a .03 collision_chance, we will only use 25 random entries before going back to the db + for a new batch. + """ + + # Seed with some data which will pass the condition, but will be immediately discarded + all_candidates = [1] + while len(all_candidates) > 0: + all_candidates = list(batch_query().limit(batch_size)) + shuffle(all_candidates) + num_selections = max(1, _num_checks(len(all_candidates), collision_chance)) + logger.debug('Found %s/%s matching entries, processing %s', len(all_candidates), batch_size, + num_selections) + candidates = all_candidates[0:num_selections] + for candidate in candidates: + yield candidate diff --git a/util/migrate/backfill_content_checksums.py b/util/migrate/backfill_content_checksums.py new file mode 100644 index 000000000..645b5539e --- /dev/null +++ b/util/migrate/backfill_content_checksums.py @@ -0,0 +1,108 @@ +import logging + +from peewee import JOIN_LEFT_OUTER + +from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField, + TextField) + +from data.database import BaseModel, db, db_for_update, CloseForLongOperation +from app import app, storage +from digest import checksums +from util.migrate import yield_random_entries + + +logger = logging.getLogger(__name__) + + +class Repository(BaseModel): + pass + + +# Vendor the information from tables we will be writing to at the time of this migration +class ImageStorage(BaseModel): + uuid = CharField(index=True, unique=True) + checksum = CharField(null=True) + image_size = BigIntegerField(null=True) + uncompressed_size = BigIntegerField(null=True) + uploading = BooleanField(default=True, null=True) + cas_path = BooleanField(default=True) + content_checksum = CharField(null=True, index=True) + + +class Image(BaseModel): + docker_image_id = CharField(index=True) + repository = ForeignKeyField(Repository) + ancestors = CharField(index=True, default='/', max_length=64535, null=True) + storage = ForeignKeyField(ImageStorage, index=True, null=True) + created = DateTimeField(null=True) + comment = TextField(null=True) + command = TextField(null=True) + aggregate_size = BigIntegerField(null=True) + v1_json_metadata = TextField(null=True) + v1_checksum = CharField(null=True) + + +class ImageStorageLocation(BaseModel): + name = CharField(unique=True, index=True) + + +class ImageStoragePlacement(BaseModel): + storage = ForeignKeyField(ImageStorage) + location = ForeignKeyField(ImageStorageLocation) + + + +def _get_image_storage_locations(storage_id): + placements_query = (ImageStoragePlacement + .select(ImageStoragePlacement, ImageStorageLocation) + .join(ImageStorageLocation) + .switch(ImageStoragePlacement) + .join(ImageStorage, JOIN_LEFT_OUTER) + .where(ImageStorage.id == storage_id)) + + locations = set() + for placement in placements_query: + locations.add(placement.location.name) + + return locations + + +def backfill_content_checksums(): + """ Copies metadata from image storages to their images. """ + logger.debug('Image content checksum backfill: Began execution') + + def batch_query(): + return (ImageStorage + .select(ImageStorage.id, ImageStorage.uuid) + .where(ImageStorage.content_checksum >> None, ImageStorage.uploading == False)) + + for candidate_storage in yield_random_entries(batch_query, 10000, 0.1): + logger.debug('Computing content checksum for storage: %s', candidate_storage.uuid) + + locations = _get_image_storage_locations(candidate_storage.id) + + checksum = None + with CloseForLongOperation(app.config): + try: + # Compute the checksum + layer_path = storage.image_layer_path(candidate_storage.uuid) + with storage.stream_read_file(locations, layer_path) as layer_data_handle: + checksum = 'sha256:{0}'.format(checksums.sha256_file(layer_data_handle)) + except Exception as exc: + logger.warning('Unable to compute checksum for storage: %s', candidate_storage.uuid) + checksum = 'unknown:{0}'.format(exc.__class__.__name__) + + # Now update the ImageStorage with the checksum + with app.config['DB_TRANSACTION_FACTORY'](db): + to_update = db_for_update(ImageStorage.get(ImageStorage.id == candidate_storage.id)) + if to_update.content_checksum is not None: + logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid) + else: + logger.debug('Setting content checksum to %s for %s', checksum, candidate_storage.uuid) + to_update.content_checksum = checksum + to_update.save() + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + # logging.getLogger('peewee').setLevel(logging.CRITICAL) + backfill_content_checksums() diff --git a/util/migrate/backfill_v1_checksums.py b/util/migrate/backfill_v1_checksums.py new file mode 100644 index 000000000..aef7af72e --- /dev/null +++ b/util/migrate/backfill_v1_checksums.py @@ -0,0 +1,70 @@ +import logging + +from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField, + TextField) +from data.database import BaseModel, db, db_for_update +from util.migrate import yield_random_entries +from app import app + + +logger = logging.getLogger(__name__) + + +class Repository(BaseModel): + pass + + +# Vendor the information from tables we will be writing to at the time of this migration +class ImageStorage(BaseModel): + uuid = CharField(index=True, unique=True) + checksum = CharField(null=True) + image_size = BigIntegerField(null=True) + uncompressed_size = BigIntegerField(null=True) + uploading = BooleanField(default=True, null=True) + cas_path = BooleanField(default=True) + content_checksum = CharField(null=True, index=True) + + +class Image(BaseModel): + docker_image_id = CharField(index=True) + repository = ForeignKeyField(Repository) + ancestors = CharField(index=True, default='/', max_length=64535, null=True) + storage = ForeignKeyField(ImageStorage, index=True, null=True) + created = DateTimeField(null=True) + comment = TextField(null=True) + command = TextField(null=True) + aggregate_size = BigIntegerField(null=True) + v1_json_metadata = TextField(null=True) + v1_checksum = CharField(null=True) + + +def backfill_checksums(): + """ Copies checksums from image storages to their images. """ + logger.debug('Image v1 checksum backfill: Began execution') + def batch_query(): + return (Image + .select(Image.id) + .join(ImageStorage) + .where(Image.v1_checksum >> None, ImageStorage.uploading == False, + ~(ImageStorage.checksum >> None))) + + for candidate_image in yield_random_entries(batch_query, 10000, 0.1): + logger.debug('Computing content checksum for storage: %s', candidate_image.id) + + with app.config['DB_TRANSACTION_FACTORY'](db): + try: + image = db_for_update(Image + .select(Image, ImageStorage) + .join(ImageStorage) + .where(Image.id == candidate_image.id)).get() + + image.v1_checksum = image.storage.checksum + image.save() + except Image.DoesNotExist: + pass + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + logging.getLogger('peewee').setLevel(logging.CRITICAL) + backfill_checksums()