Fix torrent migration and update backfill to compute torrent pieces
This commit is contained in:
parent
087c6828ad
commit
073b68cf0d
3 changed files with 149 additions and 123 deletions
|
@ -13,6 +13,8 @@ down_revision = '471caec2cb66'
|
|||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
from util.migrate import UTF8LongText
|
||||
|
||||
def upgrade(tables):
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('torrentinfo',
|
||||
|
@ -25,8 +27,8 @@ def upgrade(tables):
|
|||
)
|
||||
op.create_index('torrentinfo_storage_id', 'torrentinfo', ['storage_id'], unique=False)
|
||||
op.create_index('torrentinfo_storage_id_piece_length', 'torrentinfo', ['storage_id', 'piece_length'], unique=True)
|
||||
op.add_column(u'blobupload', sa.Column('piece_hashes', sa.Text(), nullable=False))
|
||||
op.add_column(u'blobupload', sa.Column('piece_sha_state', sa.Text(), nullable=True))
|
||||
op.add_column(u'blobupload', sa.Column('piece_hashes', UTF8LongText(), nullable=False))
|
||||
op.add_column(u'blobupload', sa.Column('piece_sha_state', UTF8LongText(), nullable=True))
|
||||
### end Alembic commands ###
|
||||
|
||||
|
||||
|
|
|
@ -1,121 +0,0 @@
|
|||
import logging
|
||||
|
||||
from peewee import JOIN_LEFT_OUTER
|
||||
|
||||
from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
|
||||
TextField, fn)
|
||||
|
||||
from data.database import BaseModel, CloseForLongOperation
|
||||
from app import app, storage
|
||||
from digest import checksums
|
||||
from util.migrate.allocator import yield_random_entries
|
||||
|
||||
|
||||
BATCH_SIZE = 1000
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Repository(BaseModel):
|
||||
pass
|
||||
|
||||
|
||||
# Vendor the information from tables we will be writing to at the time of this migration
|
||||
class ImageStorage(BaseModel):
|
||||
uuid = CharField(index=True, unique=True)
|
||||
checksum = CharField(null=True)
|
||||
image_size = BigIntegerField(null=True)
|
||||
uncompressed_size = BigIntegerField(null=True)
|
||||
uploading = BooleanField(default=True, null=True)
|
||||
cas_path = BooleanField(default=True)
|
||||
content_checksum = CharField(null=True, index=True)
|
||||
|
||||
|
||||
class Image(BaseModel):
|
||||
docker_image_id = CharField(index=True)
|
||||
repository = ForeignKeyField(Repository)
|
||||
ancestors = CharField(index=True, default='/', max_length=64535, null=True)
|
||||
storage = ForeignKeyField(ImageStorage, index=True, null=True)
|
||||
created = DateTimeField(null=True)
|
||||
comment = TextField(null=True)
|
||||
command = TextField(null=True)
|
||||
aggregate_size = BigIntegerField(null=True)
|
||||
v1_json_metadata = TextField(null=True)
|
||||
v1_checksum = CharField(null=True)
|
||||
|
||||
|
||||
class ImageStorageLocation(BaseModel):
|
||||
name = CharField(unique=True, index=True)
|
||||
|
||||
|
||||
class ImageStoragePlacement(BaseModel):
|
||||
storage = ForeignKeyField(ImageStorage)
|
||||
location = ForeignKeyField(ImageStorageLocation)
|
||||
|
||||
|
||||
|
||||
def _get_image_storage_locations(storage_id):
|
||||
placements_query = (ImageStoragePlacement
|
||||
.select(ImageStoragePlacement, ImageStorageLocation)
|
||||
.join(ImageStorageLocation)
|
||||
.switch(ImageStoragePlacement)
|
||||
.join(ImageStorage, JOIN_LEFT_OUTER)
|
||||
.where(ImageStorage.id == storage_id))
|
||||
|
||||
locations = set()
|
||||
for placement in placements_query:
|
||||
locations.add(placement.location.name)
|
||||
|
||||
return locations
|
||||
|
||||
|
||||
def backfill_content_checksums():
|
||||
""" Copies metadata from image storages to their images. """
|
||||
logger.debug('Began execution')
|
||||
logger.debug('This may be a long operation!')
|
||||
|
||||
def batch_query():
|
||||
return (ImageStorage
|
||||
.select(ImageStorage.id, ImageStorage.uuid)
|
||||
.where(ImageStorage.content_checksum >> None, ImageStorage.uploading == False))
|
||||
|
||||
max_id = ImageStorage.select(fn.Max(ImageStorage.id)).scalar()
|
||||
|
||||
written = 0
|
||||
for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, BATCH_SIZE,
|
||||
max_id):
|
||||
locations = _get_image_storage_locations(candidate_storage.id)
|
||||
|
||||
checksum = None
|
||||
with CloseForLongOperation(app.config):
|
||||
try:
|
||||
# Compute the checksum
|
||||
layer_path = storage.image_layer_path(candidate_storage.uuid)
|
||||
with storage.stream_read_file(locations, layer_path) as layer_data_handle:
|
||||
checksum = 'sha256:{0}'.format(checksums.sha256_file(layer_data_handle))
|
||||
except Exception as exc:
|
||||
logger.warning('Unable to compute checksum for storage: %s', candidate_storage.uuid)
|
||||
checksum = 'unknown:{0}'.format(exc.__class__.__name__)
|
||||
|
||||
# Now update the ImageStorage with the checksum
|
||||
num_updated = (ImageStorage
|
||||
.update(content_checksum=checksum)
|
||||
.where(ImageStorage.id == candidate_storage.id,
|
||||
ImageStorage.content_checksum >> None)).execute()
|
||||
if num_updated == 0:
|
||||
logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid)
|
||||
abort.set()
|
||||
|
||||
written += num_updated
|
||||
if (written % BATCH_SIZE) == 0:
|
||||
logger.debug('%s entries written', written)
|
||||
|
||||
logger.debug('Completed, %s entries written', written)
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logging.getLogger('peewee').setLevel(logging.WARNING)
|
||||
logging.getLogger('boto').setLevel(logging.WARNING)
|
||||
logging.getLogger('data.database').setLevel(logging.WARNING)
|
||||
backfill_content_checksums()
|
145
util/migrate/backfill_content_checksums_and_torrent_pieces.py
Normal file
145
util/migrate/backfill_content_checksums_and_torrent_pieces.py
Normal file
|
@ -0,0 +1,145 @@
|
|||
import logging
|
||||
|
||||
from peewee import (JOIN_LEFT_OUTER, CharField, BigIntegerField, BooleanField, ForeignKeyField,
|
||||
IntegerField, IntegrityError, fn)
|
||||
|
||||
from data.database import BaseModel, CloseForLongOperation
|
||||
from data.fields import Base64BinaryField
|
||||
from app import app, storage
|
||||
from digest import checksums
|
||||
from util.migrate.allocator import yield_random_entries
|
||||
from util.registry.torrent import PieceHasher
|
||||
from util.registry.filelike import wrap_with_handler
|
||||
|
||||
|
||||
BATCH_SIZE = 1000
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Vendor the information from tables we will be writing to at the time of this migration
|
||||
class ImageStorage(BaseModel):
|
||||
uuid = CharField(index=True, unique=True)
|
||||
checksum = CharField(null=True)
|
||||
image_size = BigIntegerField(null=True)
|
||||
uncompressed_size = BigIntegerField(null=True)
|
||||
uploading = BooleanField(default=True, null=True)
|
||||
cas_path = BooleanField(default=True)
|
||||
content_checksum = CharField(null=True, index=True)
|
||||
|
||||
|
||||
class ImageStorageLocation(BaseModel):
|
||||
name = CharField(unique=True, index=True)
|
||||
|
||||
|
||||
class ImageStoragePlacement(BaseModel):
|
||||
storage = ForeignKeyField(ImageStorage)
|
||||
location = ForeignKeyField(ImageStorageLocation)
|
||||
|
||||
|
||||
class TorrentInfo(BaseModel):
|
||||
storage = ForeignKeyField(ImageStorage)
|
||||
piece_length = IntegerField()
|
||||
pieces = Base64BinaryField()
|
||||
|
||||
|
||||
def _get_image_storage_locations(storage_id):
|
||||
placements_query = (ImageStoragePlacement
|
||||
.select(ImageStoragePlacement, ImageStorageLocation)
|
||||
.join(ImageStorageLocation)
|
||||
.switch(ImageStoragePlacement)
|
||||
.join(ImageStorage, JOIN_LEFT_OUTER)
|
||||
.where(ImageStorage.id == storage_id))
|
||||
|
||||
locations = set()
|
||||
for placement in placements_query:
|
||||
locations.add(placement.location.name)
|
||||
|
||||
return locations
|
||||
|
||||
|
||||
def _get_layer_path(storage_record):
|
||||
""" Returns the path in the storage engine to the layer data referenced by the storage row. """
|
||||
if not storage_record.cas_path:
|
||||
logger.debug('Serving layer from legacy v1 path: %s', storage_record.uuid)
|
||||
return storage.v1_image_layer_path(storage_record.uuid)
|
||||
return storage.blob_path(storage_record.content_checksum)
|
||||
|
||||
|
||||
def backfill_content_checksums_and_torrent_pieces(piece_length):
|
||||
""" Hashes the entire file for the content associated with an imagestorage. """
|
||||
logger.debug('Began execution')
|
||||
logger.debug('This may be a long operation!')
|
||||
|
||||
def batch_query():
|
||||
return (ImageStorage
|
||||
.select(ImageStorage.id, ImageStorage.uuid, ImageStorage.content_checksum,
|
||||
ImageStorage.cas_path)
|
||||
.join(TorrentInfo, JOIN_LEFT_OUTER, on=((TorrentInfo.storage == ImageStorage.id) &
|
||||
(TorrentInfo.piece_length == piece_length)))
|
||||
.where((TorrentInfo.id >> None) | (ImageStorage.content_checksum >> None)))
|
||||
|
||||
max_id = ImageStorage.select(fn.Max(ImageStorage.id)).scalar()
|
||||
|
||||
checksums_written = 0
|
||||
pieces_written = 0
|
||||
for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, BATCH_SIZE,
|
||||
max_id):
|
||||
locations = _get_image_storage_locations(candidate_storage.id)
|
||||
|
||||
checksum = ImageStorage.content_checksum
|
||||
torrent_pieces = ''
|
||||
with CloseForLongOperation(app.config):
|
||||
try:
|
||||
# Compute the checksum
|
||||
layer_path = _get_layer_path(candidate_storage)
|
||||
with storage.stream_read_file(locations, layer_path) as layer_data_handle:
|
||||
hasher = PieceHasher(piece_length)
|
||||
wrapped = wrap_with_handler(layer_data_handle, hasher.update)
|
||||
checksum = 'sha256:{0}'.format(checksums.sha256_file(wrapped))
|
||||
torrent_pieces = hasher.final_piece_hashes()
|
||||
except Exception as exc:
|
||||
logger.exception('Unable to compute hashes for storage: %s', candidate_storage.uuid)
|
||||
|
||||
# Create a fallback value for the checksum
|
||||
if checksum is None:
|
||||
checksum = 'unknown:{0}'.format(exc.__class__.__name__)
|
||||
|
||||
torrent_collision = False
|
||||
checksum_collision = False
|
||||
|
||||
# Now update the ImageStorage with the checksum
|
||||
num_updated = (ImageStorage
|
||||
.update(content_checksum=checksum)
|
||||
.where(ImageStorage.id == candidate_storage.id,
|
||||
ImageStorage.content_checksum >> None)).execute()
|
||||
checksums_written += num_updated
|
||||
if num_updated == 0:
|
||||
checksum_collision = True
|
||||
|
||||
try:
|
||||
TorrentInfo.create(storage=candidate_storage.id, piece_length=piece_length,
|
||||
pieces=torrent_pieces)
|
||||
pieces_written += 1
|
||||
except IntegrityError:
|
||||
torrent_collision = True
|
||||
|
||||
if torrent_collision and checksum_collision:
|
||||
logger.info('Another worker pre-empted us for storage: %s', candidate_storage.uuid)
|
||||
abort.set()
|
||||
|
||||
if (pieces_written % BATCH_SIZE) == 0 or (checksums_written % BATCH_SIZE) == 0:
|
||||
logger.debug('%s checksums written, %s torrent pieces written', checksums_written,
|
||||
pieces_written)
|
||||
|
||||
logger.debug('Completed, %s checksums written, %s torrent pieces written', checksums_written,
|
||||
pieces_written)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
#logging.getLogger('peewee').setLevel(logging.WARNING)
|
||||
logging.getLogger('boto').setLevel(logging.WARNING)
|
||||
logging.getLogger('data.database').setLevel(logging.WARNING)
|
||||
backfill_content_checksums_and_torrent_pieces(app.config['TORRENT_PIECE_SIZE'])
|
Reference in a new issue