- Update the migrations tool to verify migrations work up and down for both MySQL and PostgresSQL.
- Add migrations for the squashed image tables and for backfilling the uncompressed sizes - Make sure gzip stream uses a max length when determining the uncompressed size
This commit is contained in:
		
							parent
							
								
									f38ce51943
								
							
						
					
					
						commit
						f4daa5e97b
					
				
					 10 changed files with 152 additions and 43 deletions
				
			
		|  | @ -9,6 +9,8 @@ import zlib | |||
| # http://stackoverflow.com/questions/3122145/zlib-error-error-3-while-decompressing-incorrect-header-check/22310760#22310760 | ||||
| ZLIB_GZIP_WINDOW = zlib.MAX_WBITS | 32 | ||||
| 
 | ||||
| CHUNK_SIZE = 5 * 1024 * 1024 | ||||
| 
 | ||||
| class SizeInfo(object): | ||||
|   def __init__(self): | ||||
|     self.size = 0 | ||||
|  | @ -23,6 +25,11 @@ def calculate_size_handler(): | |||
|   decompressor = zlib.decompressobj(ZLIB_GZIP_WINDOW) | ||||
| 
 | ||||
|   def fn(buf): | ||||
|     size_info.size += len(decompressor.decompress(buf)) | ||||
|     # Note: We set a maximum CHUNK_SIZE to prevent the decompress from taking too much | ||||
|     # memory. As a result, we have to loop until the unconsumed tail is empty. | ||||
|     current_data = buf | ||||
|     while len(current_data) > 0: | ||||
|       size_info.size += len(decompressor.decompress(current_data, CHUNK_SIZE)) | ||||
|       current_data = decompressor.unconsumed_tail | ||||
| 
 | ||||
|   return size_info, fn | ||||
|  |  | |||
							
								
								
									
										87
									
								
								util/uncompressedsize.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										87
									
								
								util/uncompressedsize.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,87 @@ | |||
| import logging | ||||
| import zlib | ||||
| import sys | ||||
| 
 | ||||
| from data import model | ||||
| from data.database import ImageStorage | ||||
| from app import app, storage as store | ||||
| from data.database import db, db_random_func | ||||
| from util.gzipstream import ZLIB_GZIP_WINDOW | ||||
| 
 | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| CHUNK_SIZE = 5 * 1024 * 1024 | ||||
| 
 | ||||
| 
 | ||||
| def backfill_sizes_from_data(): | ||||
|   logger.setLevel(logging.DEBUG) | ||||
|   logger.debug('Starting uncompressed image size backfill') | ||||
|    | ||||
|   formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | ||||
|    | ||||
|   ch = logging.StreamHandler(sys.stdout) | ||||
|   ch.setFormatter(formatter) | ||||
|   logger.addHandler(ch) | ||||
| 
 | ||||
|   while True: | ||||
|     # Load the record from the DB. | ||||
|     batch_ids = list(ImageStorage | ||||
|                      .select(ImageStorage.uuid) | ||||
|                      .where(ImageStorage.uncompressed_size >> None, | ||||
|                             ImageStorage.uploading == False) | ||||
|                      .limit(100) | ||||
|                      .order_by(db_random_func())) | ||||
|     if len(batch_ids) == 0: | ||||
|       # We're done! | ||||
|       return | ||||
| 
 | ||||
|     for record in batch_ids: | ||||
|       uuid = record.uuid | ||||
| 
 | ||||
|       try: | ||||
|         with_locs = model.get_storage_by_uuid(uuid) | ||||
|         if with_locs.uncompressed_size is not None: | ||||
|           logger.debug('Somebody else already filled this in for us: %s', uuid) | ||||
|           continue | ||||
| 
 | ||||
|         # Read the layer from backing storage and calculate the uncompressed size. | ||||
|         logger.debug('Loading data: %s (%s bytes)', uuid, with_locs.image_size) | ||||
|         decompressor = zlib.decompressobj(ZLIB_GZIP_WINDOW) | ||||
| 
 | ||||
|         uncompressed_size = 0 | ||||
|         with store.stream_read_file(with_locs.locations, store.image_layer_path(uuid)) as stream: | ||||
|           while True: | ||||
|             current_data = stream.read(CHUNK_SIZE) | ||||
|             if len(current_data) == 0: | ||||
|               break | ||||
| 
 | ||||
|             while current_data: | ||||
|               uncompressed_size += len(decompressor.decompress(current_data, CHUNK_SIZE)) | ||||
|               current_data = decompressor.unconsumed_tail | ||||
| 
 | ||||
|         # Write the size to the image storage. We do so under a transaction AFTER checking to | ||||
|         # make sure the image storage still exists and has not changed. | ||||
|         logger.debug('Writing entry: %s. Size: %s', uuid, uncompressed_size) | ||||
|         with app.config['DB_TRANSACTION_FACTORY'](db): | ||||
|           current_record = model.get_storage_by_uuid(uuid) | ||||
| 
 | ||||
|           if not current_record.uploading and current_record.uncompressed_size == None: | ||||
|             current_record.uncompressed_size = uncompressed_size | ||||
|             current_record.save() | ||||
|           else: | ||||
|             logger.debug('Somebody else already filled this in for us, after we did the work: %s', | ||||
|                          uuid) | ||||
| 
 | ||||
|       except model.InvalidImageException: | ||||
|         logger.warning('Storage with uuid no longer exists: %s', uuid) | ||||
|       except MemoryError: | ||||
|         logger.warning('MemoryError on %s', uuid) | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|   logging.basicConfig(level=logging.DEBUG) | ||||
|   logging.getLogger('boto').setLevel(logging.CRITICAL) | ||||
|   logging.getLogger('peewee').setLevel(logging.CRITICAL) | ||||
| 
 | ||||
|   backfill_sizes_from_data() | ||||
		Reference in a new issue