Merge pull request #822 from coreos-inc/phase2-11-07-2015
Phase2 11 07 2015
This commit is contained in:
commit
b526e2a3cd
6 changed files with 283 additions and 32 deletions
|
@ -0,0 +1,21 @@
|
||||||
|
"""Backfill parent id and v1 checksums
|
||||||
|
|
||||||
|
Revision ID: 22af01f81722
|
||||||
|
Revises: 2827d36939e4
|
||||||
|
Create Date: 2015-11-05 16:24:43.679323
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '22af01f81722'
|
||||||
|
down_revision = '2827d36939e4'
|
||||||
|
|
||||||
|
from util.migrate.backfill_v1_checksums import backfill_checksums
|
||||||
|
from util.migrate.backfill_parent_id import backfill_parent_id
|
||||||
|
|
||||||
|
def upgrade(tables):
|
||||||
|
backfill_parent_id()
|
||||||
|
backfill_checksums()
|
||||||
|
|
||||||
|
def downgrade(tables):
|
||||||
|
pass
|
|
@ -278,14 +278,14 @@ def put_image_layer(namespace, repository, image_id):
|
||||||
except (IOError, checksums.TarError) as exc:
|
except (IOError, checksums.TarError) as exc:
|
||||||
logger.debug('put_image_layer: Error when computing tarsum %s', exc)
|
logger.debug('put_image_layer: Error when computing tarsum %s', exc)
|
||||||
|
|
||||||
if repo_image.storage.checksum is None:
|
if repo_image.v1_checksum is None:
|
||||||
# We don't have a checksum stored yet, that's fine skipping the check.
|
# We don't have a checksum stored yet, that's fine skipping the check.
|
||||||
# Not removing the mark though, image is not downloadable yet.
|
# Not removing the mark though, image is not downloadable yet.
|
||||||
session['checksum'] = csums
|
session['checksum'] = csums
|
||||||
session['content_checksum'] = 'sha256:{0}'.format(ch.hexdigest())
|
session['content_checksum'] = 'sha256:{0}'.format(ch.hexdigest())
|
||||||
return make_response('true', 200)
|
return make_response('true', 200)
|
||||||
|
|
||||||
checksum = repo_image.storage.checksum
|
checksum = repo_image.v1_checksum
|
||||||
|
|
||||||
# We check if the checksums provided matches one the one we computed
|
# We check if the checksums provided matches one the one we computed
|
||||||
if checksum not in csums:
|
if checksum not in csums:
|
||||||
|
|
|
@ -1,5 +1,12 @@
|
||||||
|
import logging
|
||||||
|
|
||||||
from sqlalchemy.types import TypeDecorator, Text
|
from sqlalchemy.types import TypeDecorator, Text
|
||||||
from sqlalchemy.dialects.mysql import TEXT as MySQLText, LONGTEXT
|
from sqlalchemy.dialects.mysql import TEXT as MySQLText, LONGTEXT
|
||||||
|
from random import shuffle
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class UTF8LongText(TypeDecorator):
|
class UTF8LongText(TypeDecorator):
|
||||||
""" Platform-independent UTF-8 LONGTEXT type.
|
""" Platform-independent UTF-8 LONGTEXT type.
|
||||||
|
@ -14,3 +21,56 @@ class UTF8LongText(TypeDecorator):
|
||||||
return dialect.type_descriptor(LONGTEXT(charset='utf8mb4', collation='utf8mb4_unicode_ci'))
|
return dialect.type_descriptor(LONGTEXT(charset='utf8mb4', collation='utf8mb4_unicode_ci'))
|
||||||
else:
|
else:
|
||||||
return dialect.type_descriptor(Text())
|
return dialect.type_descriptor(Text())
|
||||||
|
|
||||||
|
|
||||||
|
def _chance_duplication(pop_size, samples):
|
||||||
|
""" The chance of randomly selecting a duplicate when you choose the specified number of samples
|
||||||
|
from the specified population size.
|
||||||
|
"""
|
||||||
|
pairs = (samples * (samples - 1)) / 2.0
|
||||||
|
unique = (pop_size - 1.0)/pop_size
|
||||||
|
all_unique = pow(unique, pairs)
|
||||||
|
return 1 - all_unique
|
||||||
|
|
||||||
|
|
||||||
|
def _num_checks(pop_size, desired):
|
||||||
|
""" Binary search for the proper number of entries to use to get the specified collision
|
||||||
|
probability.
|
||||||
|
"""
|
||||||
|
s_max = pop_size
|
||||||
|
s_min = 0
|
||||||
|
last_test = -1
|
||||||
|
s_test = s_max
|
||||||
|
|
||||||
|
while s_max > s_min and last_test != s_test:
|
||||||
|
last_test = s_test
|
||||||
|
s_test = (s_max + s_min)/2
|
||||||
|
chance = _chance_duplication(pop_size, s_test)
|
||||||
|
if chance > desired:
|
||||||
|
s_max = s_test - 1
|
||||||
|
else:
|
||||||
|
s_min = s_test
|
||||||
|
|
||||||
|
return s_test
|
||||||
|
|
||||||
|
|
||||||
|
def yield_random_entries(batch_query, batch_size, collision_chance):
|
||||||
|
""" This method will yield semi-random items from a query in a database friendly way until no
|
||||||
|
more items match the base query modifier. It will pull batches of batch_size from the query
|
||||||
|
and yield enough items from each batch so that concurrent workers have a reduced chance of
|
||||||
|
selecting the same items. For example, if your batches return 10,000 entries, and you desire
|
||||||
|
only a .03 collision_chance, we will only use 25 random entries before going back to the db
|
||||||
|
for a new batch.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Seed with some data which will pass the condition, but will be immediately discarded
|
||||||
|
all_candidates = [1]
|
||||||
|
while len(all_candidates) > 0:
|
||||||
|
all_candidates = list(batch_query().limit(batch_size))
|
||||||
|
shuffle(all_candidates)
|
||||||
|
num_selections = max(1, _num_checks(len(all_candidates), collision_chance))
|
||||||
|
logger.debug('Found %s/%s matching entries, processing %s', len(all_candidates), batch_size,
|
||||||
|
num_selections)
|
||||||
|
candidates = all_candidates[0:num_selections]
|
||||||
|
for candidate in candidates:
|
||||||
|
yield candidate
|
||||||
|
|
108
util/migrate/backfill_content_checksums.py
Normal file
108
util/migrate/backfill_content_checksums.py
Normal file
|
@ -0,0 +1,108 @@
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from peewee import JOIN_LEFT_OUTER
|
||||||
|
|
||||||
|
from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
|
||||||
|
TextField)
|
||||||
|
|
||||||
|
from data.database import BaseModel, db, db_for_update, CloseForLongOperation
|
||||||
|
from app import app, storage
|
||||||
|
from digest import checksums
|
||||||
|
from util.migrate import yield_random_entries
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class Repository(BaseModel):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# Vendor the information from tables we will be writing to at the time of this migration
|
||||||
|
class ImageStorage(BaseModel):
|
||||||
|
uuid = CharField(index=True, unique=True)
|
||||||
|
checksum = CharField(null=True)
|
||||||
|
image_size = BigIntegerField(null=True)
|
||||||
|
uncompressed_size = BigIntegerField(null=True)
|
||||||
|
uploading = BooleanField(default=True, null=True)
|
||||||
|
cas_path = BooleanField(default=True)
|
||||||
|
content_checksum = CharField(null=True, index=True)
|
||||||
|
|
||||||
|
|
||||||
|
class Image(BaseModel):
|
||||||
|
docker_image_id = CharField(index=True)
|
||||||
|
repository = ForeignKeyField(Repository)
|
||||||
|
ancestors = CharField(index=True, default='/', max_length=64535, null=True)
|
||||||
|
storage = ForeignKeyField(ImageStorage, index=True, null=True)
|
||||||
|
created = DateTimeField(null=True)
|
||||||
|
comment = TextField(null=True)
|
||||||
|
command = TextField(null=True)
|
||||||
|
aggregate_size = BigIntegerField(null=True)
|
||||||
|
v1_json_metadata = TextField(null=True)
|
||||||
|
v1_checksum = CharField(null=True)
|
||||||
|
|
||||||
|
|
||||||
|
class ImageStorageLocation(BaseModel):
|
||||||
|
name = CharField(unique=True, index=True)
|
||||||
|
|
||||||
|
|
||||||
|
class ImageStoragePlacement(BaseModel):
|
||||||
|
storage = ForeignKeyField(ImageStorage)
|
||||||
|
location = ForeignKeyField(ImageStorageLocation)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def _get_image_storage_locations(storage_id):
|
||||||
|
placements_query = (ImageStoragePlacement
|
||||||
|
.select(ImageStoragePlacement, ImageStorageLocation)
|
||||||
|
.join(ImageStorageLocation)
|
||||||
|
.switch(ImageStoragePlacement)
|
||||||
|
.join(ImageStorage, JOIN_LEFT_OUTER)
|
||||||
|
.where(ImageStorage.id == storage_id))
|
||||||
|
|
||||||
|
locations = set()
|
||||||
|
for placement in placements_query:
|
||||||
|
locations.add(placement.location.name)
|
||||||
|
|
||||||
|
return locations
|
||||||
|
|
||||||
|
|
||||||
|
def backfill_content_checksums():
|
||||||
|
""" Copies metadata from image storages to their images. """
|
||||||
|
logger.debug('Image content checksum backfill: Began execution')
|
||||||
|
|
||||||
|
def batch_query():
|
||||||
|
return (ImageStorage
|
||||||
|
.select(ImageStorage.id, ImageStorage.uuid)
|
||||||
|
.where(ImageStorage.content_checksum >> None, ImageStorage.uploading == False))
|
||||||
|
|
||||||
|
for candidate_storage in yield_random_entries(batch_query, 10000, 0.1):
|
||||||
|
logger.debug('Computing content checksum for storage: %s', candidate_storage.uuid)
|
||||||
|
|
||||||
|
locations = _get_image_storage_locations(candidate_storage.id)
|
||||||
|
|
||||||
|
checksum = None
|
||||||
|
with CloseForLongOperation(app.config):
|
||||||
|
try:
|
||||||
|
# Compute the checksum
|
||||||
|
layer_path = storage.image_layer_path(candidate_storage.uuid)
|
||||||
|
with storage.stream_read_file(locations, layer_path) as layer_data_handle:
|
||||||
|
checksum = 'sha256:{0}'.format(checksums.sha256_file(layer_data_handle))
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning('Unable to compute checksum for storage: %s', candidate_storage.uuid)
|
||||||
|
checksum = 'unknown:{0}'.format(exc.__class__.__name__)
|
||||||
|
|
||||||
|
# Now update the ImageStorage with the checksum
|
||||||
|
with app.config['DB_TRANSACTION_FACTORY'](db):
|
||||||
|
to_update = db_for_update(ImageStorage.get(ImageStorage.id == candidate_storage.id))
|
||||||
|
if to_update.content_checksum is not None:
|
||||||
|
logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid)
|
||||||
|
else:
|
||||||
|
logger.debug('Setting content checksum to %s for %s', checksum, candidate_storage.uuid)
|
||||||
|
to_update.content_checksum = checksum
|
||||||
|
to_update.save()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
# logging.getLogger('peewee').setLevel(logging.CRITICAL)
|
||||||
|
backfill_content_checksums()
|
|
@ -1,46 +1,38 @@
|
||||||
import logging
|
import logging
|
||||||
from data.database import Image, ImageStorage, db
|
|
||||||
|
from data.database import Image, ImageStorage, db, db_for_update
|
||||||
from app import app
|
from app import app
|
||||||
|
from util.migrate import yield_random_entries
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def backfill_parent_id():
|
def backfill_parent_id():
|
||||||
logger.setLevel(logging.DEBUG)
|
logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
logger.debug('backfill_parent_id: Starting')
|
logger.debug('backfill_parent_id: Starting')
|
||||||
logger.debug('backfill_parent_id: This can be a LONG RUNNING OPERATION. Please wait!')
|
logger.debug('backfill_parent_id: This can be a LONG RUNNING OPERATION. Please wait!')
|
||||||
|
|
||||||
# Check for any images without parent
|
def fetch_batch():
|
||||||
has_images = bool(list(Image
|
return (Image
|
||||||
.select(Image.id)
|
.select(Image.id, Image.ancestors)
|
||||||
.join(ImageStorage)
|
.join(ImageStorage)
|
||||||
.where(Image.parent >> None, Image.ancestors != '/', ImageStorage.uploading == False)
|
.where(Image.parent >> None, Image.ancestors != '/',
|
||||||
.limit(1)))
|
ImageStorage.uploading == False))
|
||||||
|
|
||||||
if not has_images:
|
for to_backfill in yield_random_entries(fetch_batch, 10000, 0.3):
|
||||||
logger.debug('backfill_parent_id: No migration needed')
|
with app.config['DB_TRANSACTION_FACTORY'](db):
|
||||||
return
|
try:
|
||||||
|
image = db_for_update(Image
|
||||||
|
.select()
|
||||||
|
.where(Image.id == to_backfill.id)).get()
|
||||||
|
image.parent = to_backfill.ancestors.split('/')[-2]
|
||||||
|
image.save()
|
||||||
|
except Image.DoesNotExist:
|
||||||
|
pass
|
||||||
|
|
||||||
while True:
|
logger.debug('backfill_parent_id: Completed')
|
||||||
# Load the record from the DB.
|
|
||||||
batch_images_ids = list(Image
|
|
||||||
.select(Image.id)
|
|
||||||
.join(ImageStorage)
|
|
||||||
.where(Image.parent >> None, Image.ancestors != '/', ImageStorage.uploading == False)
|
|
||||||
.limit(100))
|
|
||||||
|
|
||||||
if len(batch_images_ids) == 0:
|
|
||||||
logger.debug('backfill_parent_id: Completed')
|
|
||||||
return
|
|
||||||
|
|
||||||
for image_id in batch_images_ids:
|
|
||||||
with app.config['DB_TRANSACTION_FACTORY'](db):
|
|
||||||
try:
|
|
||||||
image = Image.select(Image.id, Image.ancestors).where(Image.id == image_id).get()
|
|
||||||
image.parent = image.ancestors.split('/')[-2]
|
|
||||||
image.save()
|
|
||||||
except Image.DoesNotExist:
|
|
||||||
pass
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
|
70
util/migrate/backfill_v1_checksums.py
Normal file
70
util/migrate/backfill_v1_checksums.py
Normal file
|
@ -0,0 +1,70 @@
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
|
||||||
|
TextField)
|
||||||
|
from data.database import BaseModel, db, db_for_update
|
||||||
|
from util.migrate import yield_random_entries
|
||||||
|
from app import app
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class Repository(BaseModel):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# Vendor the information from tables we will be writing to at the time of this migration
|
||||||
|
class ImageStorage(BaseModel):
|
||||||
|
uuid = CharField(index=True, unique=True)
|
||||||
|
checksum = CharField(null=True)
|
||||||
|
image_size = BigIntegerField(null=True)
|
||||||
|
uncompressed_size = BigIntegerField(null=True)
|
||||||
|
uploading = BooleanField(default=True, null=True)
|
||||||
|
cas_path = BooleanField(default=True)
|
||||||
|
content_checksum = CharField(null=True, index=True)
|
||||||
|
|
||||||
|
|
||||||
|
class Image(BaseModel):
|
||||||
|
docker_image_id = CharField(index=True)
|
||||||
|
repository = ForeignKeyField(Repository)
|
||||||
|
ancestors = CharField(index=True, default='/', max_length=64535, null=True)
|
||||||
|
storage = ForeignKeyField(ImageStorage, index=True, null=True)
|
||||||
|
created = DateTimeField(null=True)
|
||||||
|
comment = TextField(null=True)
|
||||||
|
command = TextField(null=True)
|
||||||
|
aggregate_size = BigIntegerField(null=True)
|
||||||
|
v1_json_metadata = TextField(null=True)
|
||||||
|
v1_checksum = CharField(null=True)
|
||||||
|
|
||||||
|
|
||||||
|
def backfill_checksums():
|
||||||
|
""" Copies checksums from image storages to their images. """
|
||||||
|
logger.debug('Image v1 checksum backfill: Began execution')
|
||||||
|
def batch_query():
|
||||||
|
return (Image
|
||||||
|
.select(Image.id)
|
||||||
|
.join(ImageStorage)
|
||||||
|
.where(Image.v1_checksum >> None, ImageStorage.uploading == False,
|
||||||
|
~(ImageStorage.checksum >> None)))
|
||||||
|
|
||||||
|
for candidate_image in yield_random_entries(batch_query, 10000, 0.1):
|
||||||
|
logger.debug('Computing content checksum for storage: %s', candidate_image.id)
|
||||||
|
|
||||||
|
with app.config['DB_TRANSACTION_FACTORY'](db):
|
||||||
|
try:
|
||||||
|
image = db_for_update(Image
|
||||||
|
.select(Image, ImageStorage)
|
||||||
|
.join(ImageStorage)
|
||||||
|
.where(Image.id == candidate_image.id)).get()
|
||||||
|
|
||||||
|
image.v1_checksum = image.storage.checksum
|
||||||
|
image.save()
|
||||||
|
except Image.DoesNotExist:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
logging.getLogger('peewee').setLevel(logging.CRITICAL)
|
||||||
|
backfill_checksums()
|
Reference in a new issue