Backfill the v1 checksums from imagestorage

This commit is contained in:
Jake Moshenko 2015-11-05 16:28:12 -05:00
parent 0e4d749f89
commit 9036ca2f2f
4 changed files with 257 additions and 0 deletions

View file

@ -0,0 +1,19 @@
"""Backfill parent id and v1 checksums
Revision ID: 22af01f81722
Revises: 2827d36939e4
Create Date: 2015-11-05 16:24:43.679323
"""
# revision identifiers, used by Alembic.
revision = '22af01f81722'
down_revision = '2827d36939e4'
from util.migrate.backfill_v1_checksums import backfill_checksums
def upgrade(tables):
backfill_checksums()
def downgrade(tables):
pass

View file

@ -1,5 +1,12 @@
import logging
from sqlalchemy.types import TypeDecorator, Text from sqlalchemy.types import TypeDecorator, Text
from sqlalchemy.dialects.mysql import TEXT as MySQLText, LONGTEXT from sqlalchemy.dialects.mysql import TEXT as MySQLText, LONGTEXT
from random import shuffle
logger = logging.getLogger(__name__)
class UTF8LongText(TypeDecorator): class UTF8LongText(TypeDecorator):
""" Platform-independent UTF-8 LONGTEXT type. """ Platform-independent UTF-8 LONGTEXT type.
@ -14,3 +21,56 @@ class UTF8LongText(TypeDecorator):
return dialect.type_descriptor(LONGTEXT(charset='utf8mb4', collation='utf8mb4_unicode_ci')) return dialect.type_descriptor(LONGTEXT(charset='utf8mb4', collation='utf8mb4_unicode_ci'))
else: else:
return dialect.type_descriptor(Text()) return dialect.type_descriptor(Text())
def _chance_duplication(pop_size, samples):
""" The chance of randomly selecting a duplicate when you choose the specified number of samples
from the specified population size.
"""
pairs = (samples * (samples - 1)) / 2.0
unique = (pop_size - 1.0)/pop_size
all_unique = pow(unique, pairs)
return 1 - all_unique
def _num_checks(pop_size, desired):
""" Binary search for the proper number of entries to use to get the specified collision
probability.
"""
s_max = pop_size
s_min = 0
last_test = -1
s_test = s_max
while s_max > s_min and last_test != s_test:
last_test = s_test
s_test = (s_max + s_min)/2
chance = _chance_duplication(pop_size, s_test)
if chance > desired:
s_max = s_test - 1
else:
s_min = s_test
return s_test
def yield_random_entries(batch_query, batch_size, collision_chance):
""" This method will yield semi-random items from a query in a database friendly way until no
more items match the base query modifier. It will pull batches of batch_size from the query
and yield enough items from each batch so that concurrent workers have a reduced chance of
selecting the same items. For example, if your batches return 10,000 entries, and you desire
only a .03 collision_chance, we will only use 25 random entries before going back to the db
for a new batch.
"""
# Seed with some data which will pass the condition, but will be immediately discarded
all_candidates = [1]
while len(all_candidates) > 0:
all_candidates = list(batch_query().limit(batch_size))
shuffle(all_candidates)
num_selections = max(1, _num_checks(len(all_candidates), collision_chance))
logger.debug('Found %s/%s matching entries, processing %s', len(all_candidates), batch_size,
num_selections)
candidates = all_candidates[0:num_selections]
for candidate in candidates:
yield candidate

View file

@ -0,0 +1,108 @@
import logging
from peewee import JOIN_LEFT_OUTER
from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
TextField)
from data.database import BaseModel, db, db_for_update, CloseForLongOperation
from app import app, storage
from digest import checksums
from util.migrate import yield_random_entries
logger = logging.getLogger(__name__)
class Repository(BaseModel):
pass
# Vendor the information from tables we will be writing to at the time of this migration
class ImageStorage(BaseModel):
uuid = CharField(index=True, unique=True)
checksum = CharField(null=True)
image_size = BigIntegerField(null=True)
uncompressed_size = BigIntegerField(null=True)
uploading = BooleanField(default=True, null=True)
cas_path = BooleanField(default=True)
content_checksum = CharField(null=True, index=True)
class Image(BaseModel):
docker_image_id = CharField(index=True)
repository = ForeignKeyField(Repository)
ancestors = CharField(index=True, default='/', max_length=64535, null=True)
storage = ForeignKeyField(ImageStorage, index=True, null=True)
created = DateTimeField(null=True)
comment = TextField(null=True)
command = TextField(null=True)
aggregate_size = BigIntegerField(null=True)
v1_json_metadata = TextField(null=True)
v1_checksum = CharField(null=True)
class ImageStorageLocation(BaseModel):
name = CharField(unique=True, index=True)
class ImageStoragePlacement(BaseModel):
storage = ForeignKeyField(ImageStorage)
location = ForeignKeyField(ImageStorageLocation)
def _get_image_storage_locations(storage_id):
placements_query = (ImageStoragePlacement
.select(ImageStoragePlacement, ImageStorageLocation)
.join(ImageStorageLocation)
.switch(ImageStoragePlacement)
.join(ImageStorage, JOIN_LEFT_OUTER)
.where(ImageStorage.id == storage_id))
locations = set()
for placement in placements_query:
locations.add(placement.location.name)
return locations
def backfill_content_checksums():
""" Copies metadata from image storages to their images. """
logger.debug('Image content checksum backfill: Began execution')
def batch_query():
return (ImageStorage
.select(ImageStorage.id, ImageStorage.uuid)
.where(ImageStorage.content_checksum >> None, ImageStorage.uploading == False))
for candidate_storage in yield_random_entries(batch_query, 10000, 0.1):
logger.debug('Computing content checksum for storage: %s', candidate_storage.uuid)
locations = _get_image_storage_locations(candidate_storage.id)
checksum = None
with CloseForLongOperation(app.config):
try:
# Compute the checksum
layer_path = storage.image_layer_path(candidate_storage.uuid)
with storage.stream_read_file(locations, layer_path) as layer_data_handle:
checksum = 'sha256:{0}'.format(checksums.sha256_file(layer_data_handle))
except Exception as exc:
logger.warning('Unable to compute checksum for storage: %s', candidate_storage.uuid)
checksum = 'unknown:{0}'.format(exc.__class__.__name__)
# Now update the ImageStorage with the checksum
with app.config['DB_TRANSACTION_FACTORY'](db):
to_update = db_for_update(ImageStorage.get(ImageStorage.id == candidate_storage.id))
if to_update.content_checksum is not None:
logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid)
else:
logger.debug('Setting content checksum to %s for %s', checksum, candidate_storage.uuid)
to_update.content_checksum = checksum
to_update.save()
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
# logging.getLogger('peewee').setLevel(logging.CRITICAL)
backfill_content_checksums()

View file

@ -0,0 +1,70 @@
import logging
from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
TextField)
from data.database import BaseModel, db, db_for_update
from util.migrate import yield_random_entries
from app import app
logger = logging.getLogger(__name__)
class Repository(BaseModel):
pass
# Vendor the information from tables we will be writing to at the time of this migration
class ImageStorage(BaseModel):
uuid = CharField(index=True, unique=True)
checksum = CharField(null=True)
image_size = BigIntegerField(null=True)
uncompressed_size = BigIntegerField(null=True)
uploading = BooleanField(default=True, null=True)
cas_path = BooleanField(default=True)
content_checksum = CharField(null=True, index=True)
class Image(BaseModel):
docker_image_id = CharField(index=True)
repository = ForeignKeyField(Repository)
ancestors = CharField(index=True, default='/', max_length=64535, null=True)
storage = ForeignKeyField(ImageStorage, index=True, null=True)
created = DateTimeField(null=True)
comment = TextField(null=True)
command = TextField(null=True)
aggregate_size = BigIntegerField(null=True)
v1_json_metadata = TextField(null=True)
v1_checksum = CharField(null=True)
def backfill_checksums():
""" Copies checksums from image storages to their images. """
logger.debug('Image v1 checksum backfill: Began execution')
def batch_query():
return (Image
.select(Image.id)
.join(ImageStorage)
.where(Image.v1_checksum >> None, ImageStorage.uploading == False,
~(ImageStorage.checksum >> None)))
for candidate_image in yield_random_entries(batch_query, 10000, 0.1):
logger.debug('Computing content checksum for storage: %s', candidate_image.id)
with app.config['DB_TRANSACTION_FACTORY'](db):
try:
image = db_for_update(Image
.select(Image, ImageStorage)
.join(ImageStorage)
.where(Image.id == candidate_image.id)).get()
image.v1_checksum = image.storage.checksum
image.save()
except Image.DoesNotExist:
pass
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('peewee').setLevel(logging.CRITICAL)
backfill_checksums()