120 lines
		
	
	
	
		
			4.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			120 lines
		
	
	
	
		
			4.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| 
 | |
| from peewee import JOIN_LEFT_OUTER
 | |
| 
 | |
| from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField,
 | |
|                     TextField, fn)
 | |
| 
 | |
| from data.database import BaseModel, CloseForLongOperation
 | |
| from app import app, storage
 | |
| from digest import checksums
 | |
| from util.migrate.allocator import yield_random_entries
 | |
| 
 | |
| 
 | |
| BATCH_SIZE = 1000
 | |
| 
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| class Repository(BaseModel):
 | |
|   pass
 | |
| 
 | |
| 
 | |
| # Vendor the information from tables we will be writing to at the time of this migration
 | |
| class ImageStorage(BaseModel):
 | |
|   uuid = CharField(index=True, unique=True)
 | |
|   checksum = CharField(null=True)
 | |
|   image_size = BigIntegerField(null=True)
 | |
|   uncompressed_size = BigIntegerField(null=True)
 | |
|   uploading = BooleanField(default=True, null=True)
 | |
|   cas_path = BooleanField(default=True)
 | |
|   content_checksum = CharField(null=True, index=True)
 | |
| 
 | |
| 
 | |
| class Image(BaseModel):
 | |
|   docker_image_id = CharField(index=True)
 | |
|   repository = ForeignKeyField(Repository)
 | |
|   ancestors = CharField(index=True, default='/', max_length=64535, null=True)
 | |
|   storage = ForeignKeyField(ImageStorage, index=True, null=True)
 | |
|   created = DateTimeField(null=True)
 | |
|   comment = TextField(null=True)
 | |
|   command = TextField(null=True)
 | |
|   aggregate_size = BigIntegerField(null=True)
 | |
|   v1_json_metadata = TextField(null=True)
 | |
|   v1_checksum = CharField(null=True)
 | |
| 
 | |
| 
 | |
| class ImageStorageLocation(BaseModel):
 | |
|   name = CharField(unique=True, index=True)
 | |
| 
 | |
| 
 | |
| class ImageStoragePlacement(BaseModel):
 | |
|   storage = ForeignKeyField(ImageStorage)
 | |
|   location = ForeignKeyField(ImageStorageLocation)
 | |
| 
 | |
| 
 | |
| def _get_image_storage_locations(storage_id):
 | |
|   placements_query = (ImageStoragePlacement
 | |
|                       .select(ImageStoragePlacement, ImageStorageLocation)
 | |
|                       .join(ImageStorageLocation)
 | |
|                       .switch(ImageStoragePlacement)
 | |
|                       .join(ImageStorage, JOIN_LEFT_OUTER)
 | |
|                       .where(ImageStorage.id == storage_id))
 | |
| 
 | |
|   locations = set()
 | |
|   for placement in placements_query:
 | |
|     locations.add(placement.location.name)
 | |
| 
 | |
|   return locations
 | |
| 
 | |
| 
 | |
| def backfill_content_checksums():
 | |
|   """ Copies metadata from image storages to their images. """
 | |
|   logger.debug('Began execution')
 | |
|   logger.debug('This may be a long operation!')
 | |
| 
 | |
|   def batch_query():
 | |
|     return (ImageStorage
 | |
|             .select(ImageStorage.id, ImageStorage.uuid)
 | |
|             .where(ImageStorage.content_checksum >> None, ImageStorage.uploading == False))
 | |
| 
 | |
|   max_id = ImageStorage.select(fn.Max(ImageStorage.id)).scalar()
 | |
| 
 | |
|   written = 0
 | |
|   for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, BATCH_SIZE,
 | |
|                                                        max_id):
 | |
|     locations = _get_image_storage_locations(candidate_storage.id)
 | |
| 
 | |
|     checksum = None
 | |
|     with CloseForLongOperation(app.config):
 | |
|       try:
 | |
|         # Compute the checksum
 | |
|         layer_path = storage.v1_image_layer_path(candidate_storage.uuid)
 | |
|         with storage.stream_read_file(locations, layer_path) as layer_data_handle:
 | |
|           checksum = 'sha256:{0}'.format(checksums.sha256_file(layer_data_handle))
 | |
|       except Exception as exc:
 | |
|         logger.warning('Unable to compute checksum for storage: %s', candidate_storage.uuid)
 | |
|         checksum = 'unknown:{0}'.format(exc.__class__.__name__)
 | |
| 
 | |
|     # Now update the ImageStorage with the checksum
 | |
|     num_updated = (ImageStorage
 | |
|                    .update(content_checksum=checksum)
 | |
|                    .where(ImageStorage.id == candidate_storage.id,
 | |
|                           ImageStorage.content_checksum >> None)).execute()
 | |
|     if num_updated == 0:
 | |
|       logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid)
 | |
|       abort.set()
 | |
| 
 | |
|     written += num_updated
 | |
|     if (written % BATCH_SIZE) == 0:
 | |
|       logger.debug('%s entries written', written)
 | |
| 
 | |
|   logger.debug('Completed, %s entries written', written)
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|   logging.basicConfig(level=logging.DEBUG)
 | |
|   logging.getLogger('peewee').setLevel(logging.WARNING)
 | |
|   logging.getLogger('boto').setLevel(logging.WARNING)
 | |
|   logging.getLogger('data.database').setLevel(logging.WARNING)
 | |
|   backfill_content_checksums()
 |