Merge master into vulnerability-tool
This commit is contained in:
		
						commit
						7816b0c657
					
				
					 44 changed files with 880 additions and 289 deletions
				
			
		|  | @ -32,15 +32,15 @@ def add_enterprise_config_defaults(config_obj, current_secret_key, hostname): | |||
| 
 | ||||
|   # Default storage configuration. | ||||
|   if not 'DISTRIBUTED_STORAGE_CONFIG' in config_obj: | ||||
|     config_obj['DISTRIBUTED_STORAGE_PREFERENCE'] = ['local'] | ||||
|     config_obj['DISTRIBUTED_STORAGE_PREFERENCE'] = ['default'] | ||||
|     config_obj['DISTRIBUTED_STORAGE_CONFIG'] = { | ||||
|       'local': ['LocalStorage', {'storage_path': '/datastorage/registry'}] | ||||
|       'default': ['LocalStorage', {'storage_path': '/datastorage/registry'}] | ||||
|     } | ||||
| 
 | ||||
|     config_obj['USERFILES_LOCATION'] = 'local' | ||||
|     config_obj['USERFILES_LOCATION'] = 'default' | ||||
|     config_obj['USERFILES_PATH'] = 'userfiles/' | ||||
| 
 | ||||
|     config_obj['LOG_ARCHIVE_LOCATION'] = 'local' | ||||
|     config_obj['LOG_ARCHIVE_LOCATION'] = 'default' | ||||
| 
 | ||||
|   if not 'SERVER_HOSTNAME' in config_obj: | ||||
|     config_obj['SERVER_HOSTNAME'] = hostname | ||||
|  |  | |||
							
								
								
									
										9
									
								
								util/config/database.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										9
									
								
								util/config/database.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,9 @@ | |||
| from data import model | ||||
| 
 | ||||
| 
 | ||||
| def sync_database_with_config(config): | ||||
|   """ This ensures all implicitly required reference table entries exist in the database. """ | ||||
| 
 | ||||
|   location_names = config.get('DISTRIBUTED_STORAGE_CONFIG', {}).keys() | ||||
|   if location_names: | ||||
|     model.image.ensure_image_locations(*location_names) | ||||
|  | @ -30,12 +30,18 @@ JWT_FILENAMES = ['jwt-authn.cert'] | |||
| 
 | ||||
| CONFIG_FILENAMES = SSL_FILENAMES + DB_SSL_FILENAMES + JWT_FILENAMES | ||||
| 
 | ||||
| def get_storage_provider(config): | ||||
|   parameters = config.get('DISTRIBUTED_STORAGE_CONFIG', {}).get('local', ['LocalStorage', {}]) | ||||
| def get_storage_providers(config): | ||||
|   storage_config = config.get('DISTRIBUTED_STORAGE_CONFIG', {}) | ||||
| 
 | ||||
|   drivers = {} | ||||
| 
 | ||||
|   try: | ||||
|     return get_storage_driver(parameters) | ||||
|     for name, parameters in storage_config.items(): | ||||
|       drivers[name] = (parameters[0], get_storage_driver(parameters)) | ||||
|   except TypeError: | ||||
|     raise Exception('Missing required storage configuration parameter(s)') | ||||
|     raise Exception('Missing required storage configuration parameter(s): %s' % name) | ||||
| 
 | ||||
|   return drivers | ||||
| 
 | ||||
| def validate_service_for_config(service, config, password=None): | ||||
|   """ Attempts to validate the configuration for the given service. """ | ||||
|  | @ -80,20 +86,29 @@ def _validate_redis(config, _): | |||
| 
 | ||||
| def _validate_registry_storage(config, _): | ||||
|   """ Validates registry storage. """ | ||||
|   driver = get_storage_provider(config) | ||||
|   replication_enabled = config.get('FEATURE_STORAGE_REPLICATION', False) | ||||
| 
 | ||||
|   # Run custom validation on the driver. | ||||
|   driver.validate(app.config['HTTPCLIENT']) | ||||
|   providers = get_storage_providers(config).items() | ||||
| 
 | ||||
|   # Put and remove a temporary file to make sure the normal storage paths work. | ||||
|   driver.put_content('_verify', 'testing 123') | ||||
|   driver.remove('_verify') | ||||
|   if not providers: | ||||
|     raise Exception('Storage configuration required') | ||||
| 
 | ||||
|   # Run setup on the driver if the read/write succeeded. | ||||
|   try: | ||||
|     driver.setup() | ||||
|   except Exception as ex: | ||||
|     raise Exception('Could not prepare storage: %s' % str(ex)) | ||||
|   for name, (storage_type, driver) in providers: | ||||
|     try: | ||||
|       if replication_enabled and storage_type == 'LocalStorage': | ||||
|         raise Exception('Locally mounted directory not supported with storage replication') | ||||
| 
 | ||||
|       # Run custom validation on the driver. | ||||
|       driver.validate(app.config['HTTPCLIENT']) | ||||
| 
 | ||||
|       # Put and remove a temporary file to make sure the normal storage paths work. | ||||
|       driver.put_content('_verify', 'testing 123') | ||||
|       driver.remove('_verify') | ||||
| 
 | ||||
|       # Run setup on the driver if the read/write succeeded. | ||||
|       driver.setup() | ||||
|     except Exception as ex: | ||||
|       raise Exception('Invalid storage configuration: %s: %s' % (name, str(ex))) | ||||
| 
 | ||||
| 
 | ||||
| def _validate_mailing(config, _): | ||||
|  |  | |||
|  | @ -2,7 +2,6 @@ import logging | |||
| 
 | ||||
| from sqlalchemy.types import TypeDecorator, Text | ||||
| from sqlalchemy.dialects.mysql import TEXT as MySQLText, LONGTEXT | ||||
| from random import shuffle | ||||
| 
 | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
|  | @ -21,56 +20,3 @@ class UTF8LongText(TypeDecorator): | |||
|       return dialect.type_descriptor(LONGTEXT(charset='utf8mb4', collation='utf8mb4_unicode_ci')) | ||||
|     else: | ||||
|       return dialect.type_descriptor(Text()) | ||||
| 
 | ||||
| 
 | ||||
| def _chance_duplication(pop_size, samples): | ||||
|   """ The chance of randomly selecting a duplicate when you choose the specified number of samples | ||||
|       from the specified population size. | ||||
|   """ | ||||
|   pairs = (samples * (samples - 1)) / 2.0 | ||||
|   unique = (pop_size - 1.0)/pop_size | ||||
|   all_unique = pow(unique, pairs) | ||||
|   return 1 - all_unique | ||||
| 
 | ||||
| 
 | ||||
| def _num_checks(pop_size, desired): | ||||
|   """ Binary search for the proper number of entries to use to get the specified collision | ||||
|       probability. | ||||
|   """ | ||||
|   s_max = pop_size | ||||
|   s_min = 0 | ||||
|   last_test = -1 | ||||
|   s_test = s_max | ||||
| 
 | ||||
|   while s_max > s_min and last_test != s_test: | ||||
|     last_test = s_test | ||||
|     s_test = (s_max + s_min)/2 | ||||
|     chance = _chance_duplication(pop_size, s_test) | ||||
|     if chance > desired: | ||||
|       s_max = s_test - 1 | ||||
|     else: | ||||
|       s_min = s_test | ||||
| 
 | ||||
|   return s_test | ||||
| 
 | ||||
| 
 | ||||
| def yield_random_entries(batch_query, batch_size, collision_chance): | ||||
|   """ This method will yield semi-random items from a query in a database friendly way until no | ||||
|       more items match the base query modifier. It will pull batches of batch_size from the query | ||||
|       and yield enough items from each batch so that concurrent workers have a reduced chance of | ||||
|       selecting the same items. For example, if your batches return 10,000 entries, and you desire | ||||
|       only a .03 collision_chance, we will only use 25 random entries before going back to the db | ||||
|       for a new batch. | ||||
|   """ | ||||
| 
 | ||||
|   # Seed with some data which will pass the condition, but will be immediately discarded | ||||
|   all_candidates = [1] | ||||
|   while len(all_candidates) > 0: | ||||
|     all_candidates = list(batch_query().limit(batch_size)) | ||||
|     shuffle(all_candidates) | ||||
|     num_selections = max(1, _num_checks(len(all_candidates), collision_chance)) | ||||
|     logger.debug('Found %s/%s matching entries, processing %s', len(all_candidates), batch_size, | ||||
|                  num_selections) | ||||
|     candidates = all_candidates[0:num_selections] | ||||
|     for candidate in candidates: | ||||
|       yield candidate | ||||
|  |  | |||
							
								
								
									
										156
									
								
								util/migrate/allocator.py
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										156
									
								
								util/migrate/allocator.py
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,156 @@ | |||
| import logging | ||||
| import random | ||||
| 
 | ||||
| from bintrees import RBTree | ||||
| from threading import Event | ||||
| 
 | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| logger.setLevel(logging.INFO) | ||||
| 
 | ||||
| 
 | ||||
| class NoAvailableKeysError(ValueError): | ||||
|   pass | ||||
| 
 | ||||
| 
 | ||||
| class CompletedKeys(object): | ||||
|   def __init__(self, max_index): | ||||
|     self._max_index = max_index | ||||
|     self._min_index = 0 | ||||
|     self._slabs = RBTree() | ||||
| 
 | ||||
|   def _get_previous_or_none(self, index): | ||||
|     try: | ||||
|       return self._slabs.floor_item(index) | ||||
|     except KeyError: | ||||
|       return None | ||||
| 
 | ||||
|   def is_available(self, index): | ||||
|     logger.debug('Testing index %s', index) | ||||
|     if index >= self._max_index or index < self._min_index: | ||||
|       logger.debug('Index out of range') | ||||
|       return False | ||||
| 
 | ||||
|     try: | ||||
|       prev_start, prev_length = self._slabs.floor_item(index) | ||||
|       logger.debug('Prev range: %s-%s', prev_start, prev_start + prev_length) | ||||
|       return (prev_start + prev_length) <= index | ||||
|     except KeyError: | ||||
|       return True | ||||
| 
 | ||||
|   def mark_completed(self, start_index, past_last_index): | ||||
|     logger.debug('Marking the range completed: %s-%s', start_index, past_last_index) | ||||
|     # Find the item directly before this and see if there is overlap | ||||
|     to_discard = set() | ||||
|     try: | ||||
|       prev_start, prev_length = self._slabs.floor_item(start_index) | ||||
|       if prev_start + prev_length >= start_index: | ||||
|         # we are going to merge with the range before us | ||||
|         logger.debug('Merging with the prev range: %s-%s', prev_start, prev_start + prev_length) | ||||
|         to_discard.add(prev_start) | ||||
|         start_index = prev_start | ||||
|         past_last_index = max(past_last_index, prev_start + prev_length) | ||||
|     except KeyError: | ||||
|       pass | ||||
| 
 | ||||
|     # Find all keys between the start and last index and merge them into one block | ||||
|     for merge_start, merge_length in self._slabs.iter_items(start_index, past_last_index + 1): | ||||
|       candidate_next_index = merge_start + merge_length | ||||
|       logger.debug('Merging with block %s-%s', merge_start, candidate_next_index) | ||||
|       to_discard.add(merge_start) | ||||
|       past_last_index = max(past_last_index, candidate_next_index) | ||||
| 
 | ||||
|     # write the new block which is fully merged | ||||
|     discard = False | ||||
|     if past_last_index >= self._max_index: | ||||
|       logger.debug('Discarding block and setting new max to: %s', start_index) | ||||
|       self._max_index = start_index | ||||
|       discard = True | ||||
| 
 | ||||
|     if start_index <= self._min_index: | ||||
|       logger.debug('Discarding block and setting new min to: %s', past_last_index) | ||||
|       self._min_index = past_last_index | ||||
|       discard = True | ||||
| 
 | ||||
|     if to_discard: | ||||
|       logger.debug('Discarding %s obsolte blocks', len(to_discard)) | ||||
|       self._slabs.remove_items(to_discard) | ||||
| 
 | ||||
|     if not discard: | ||||
|       logger.debug('Writing new block with range: %s-%s', start_index, past_last_index) | ||||
|       self._slabs.insert(start_index, past_last_index - start_index) | ||||
| 
 | ||||
|     logger.debug('Total blocks: %s', len(self._slabs)) | ||||
| 
 | ||||
|   def get_block_start_index(self, block_size_estimate): | ||||
|     logger.debug('Total range: %s-%s', self._min_index, self._max_index) | ||||
|     if self._max_index <= self._min_index: | ||||
|       raise NoAvailableKeysError('All indexes have been marked completed') | ||||
| 
 | ||||
|     num_holes = len(self._slabs) + 1 | ||||
|     random_hole = random.randint(0, num_holes - 1) | ||||
|     logger.debug('Selected random hole %s with %s total holes', random_hole, num_holes) | ||||
| 
 | ||||
|     hole_start = self._min_index | ||||
|     past_hole_end = self._max_index | ||||
| 
 | ||||
|     # Now that we have picked a hole, we need to define the bounds | ||||
|     if random_hole > 0: | ||||
|       # There will be a slab before this hole, find where it ends | ||||
|       bound_entries = self._slabs.nsmallest(random_hole + 1)[-2:] | ||||
|       left_index, left_len = bound_entries[0] | ||||
|       logger.debug('Left range %s-%s', left_index, left_index + left_len) | ||||
|       hole_start = left_index + left_len | ||||
| 
 | ||||
|       if len(bound_entries) > 1: | ||||
|         right_index, right_len = bound_entries[1] | ||||
|         logger.debug('Right range %s-%s', right_index, right_index + right_len) | ||||
|         past_hole_end, _ = bound_entries[1] | ||||
|     elif not self._slabs.is_empty(): | ||||
|       right_index, right_len = self._slabs.nsmallest(1)[0] | ||||
|       logger.debug('Right range %s-%s', right_index, right_index + right_len) | ||||
|       past_hole_end, _ = self._slabs.nsmallest(1)[0] | ||||
| 
 | ||||
|     # Now that we have our hole bounds, select a random block from [0:len - block_size_estimate] | ||||
|     logger.debug('Selecting from hole range: %s-%s', hole_start, past_hole_end) | ||||
|     rand_max_bound = max(hole_start, past_hole_end - block_size_estimate) | ||||
|     logger.debug('Rand max bound: %s', rand_max_bound) | ||||
|     return random.randint(hole_start, rand_max_bound) | ||||
| 
 | ||||
| 
 | ||||
| def yield_random_entries(batch_query, primary_key_field, batch_size, max_id): | ||||
|   """ This method will yield items from random blocks in the database. We will track metadata | ||||
|       about which keys are available for work, and we will complete the backfill when there is no | ||||
|       more work to be done. The method yields tupes of (candidate, Event), and if the work was | ||||
|       already done by another worker, the caller should set the event. Batch candidates must have | ||||
|       an "id" field which can be inspected. | ||||
|   """ | ||||
| 
 | ||||
|   allocator = CompletedKeys(max_id + 1) | ||||
| 
 | ||||
|   try: | ||||
|     while True: | ||||
|       start_index = allocator.get_block_start_index(batch_size) | ||||
|       all_candidates = list(batch_query() | ||||
|                             .limit(batch_size) | ||||
|                             .where(primary_key_field >= start_index)) | ||||
| 
 | ||||
|       if len(all_candidates) == 0: | ||||
|         logger.info('No candidates, new highest id: %s', start_index) | ||||
|         allocator.mark_completed(start_index, max_id + 1) | ||||
|         continue | ||||
| 
 | ||||
|       logger.info('Found %s candidates, processing block', len(all_candidates)) | ||||
|       for candidate in all_candidates: | ||||
|         abort_early = Event() | ||||
|         yield candidate, abort_early | ||||
|         if abort_early.is_set(): | ||||
|           logger.info('Overlap with another worker, aborting') | ||||
|           break | ||||
| 
 | ||||
|       completed_through = candidate.id + 1 | ||||
|       logger.info('Marking id range as completed: %s-%s', start_index, completed_through) | ||||
|       allocator.mark_completed(start_index, completed_through) | ||||
| 
 | ||||
|   except NoAvailableKeysError: | ||||
|     logger.info('No more work') | ||||
|  | @ -3,12 +3,15 @@ import logging | |||
| from peewee import JOIN_LEFT_OUTER | ||||
| 
 | ||||
| from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField, | ||||
|                     TextField) | ||||
|                     TextField, fn) | ||||
| 
 | ||||
| from data.database import BaseModel, db, db_for_update, CloseForLongOperation | ||||
| from data.database import BaseModel, CloseForLongOperation | ||||
| from app import app, storage | ||||
| from digest import checksums | ||||
| from util.migrate import yield_random_entries | ||||
| from util.migrate.allocator import yield_random_entries | ||||
| 
 | ||||
| 
 | ||||
| BATCH_SIZE = 1000 | ||||
| 
 | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
|  | @ -69,16 +72,19 @@ def _get_image_storage_locations(storage_id): | |||
| 
 | ||||
| def backfill_content_checksums(): | ||||
|   """ Copies metadata from image storages to their images. """ | ||||
|   logger.debug('Image content checksum backfill: Began execution') | ||||
|   logger.debug('Began execution') | ||||
|   logger.debug('This may be a long operation!') | ||||
| 
 | ||||
|   def batch_query(): | ||||
|     return (ImageStorage | ||||
|             .select(ImageStorage.id, ImageStorage.uuid) | ||||
|             .where(ImageStorage.content_checksum >> None, ImageStorage.uploading == False)) | ||||
| 
 | ||||
|   for candidate_storage in yield_random_entries(batch_query, 10000, 0.1): | ||||
|     logger.debug('Computing content checksum for storage: %s', candidate_storage.uuid) | ||||
|   max_id = ImageStorage.select(fn.Max(ImageStorage.id)).scalar() | ||||
| 
 | ||||
|   written = 0 | ||||
|   for candidate_storage, abort in yield_random_entries(batch_query, ImageStorage.id, BATCH_SIZE, | ||||
|                                                        max_id): | ||||
|     locations = _get_image_storage_locations(candidate_storage.id) | ||||
| 
 | ||||
|     checksum = None | ||||
|  | @ -93,16 +99,23 @@ def backfill_content_checksums(): | |||
|         checksum = 'unknown:{0}'.format(exc.__class__.__name__) | ||||
| 
 | ||||
|     # Now update the ImageStorage with the checksum | ||||
|     with app.config['DB_TRANSACTION_FACTORY'](db): | ||||
|       to_update = db_for_update(ImageStorage.get(ImageStorage.id == candidate_storage.id)) | ||||
|       if to_update.content_checksum is not None: | ||||
|         logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid) | ||||
|       else: | ||||
|         logger.debug('Setting content checksum to %s for %s', checksum, candidate_storage.uuid) | ||||
|         to_update.content_checksum = checksum | ||||
|         to_update.save() | ||||
|     num_updated = (ImageStorage | ||||
|                    .update(content_checksum=checksum) | ||||
|                    .where(ImageStorage.id == candidate_storage.id, | ||||
|                           ImageStorage.content_checksum >> None)).execute() | ||||
|     if num_updated == 0: | ||||
|       logger.info('Another worker filled in the checksum: %s', candidate_storage.uuid) | ||||
|       abort.set() | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     written += num_updated | ||||
|     if (written % BATCH_SIZE) == 0: | ||||
|       logger.debug('%s entries written', written) | ||||
| 
 | ||||
|   logger.debug('Completed, %s entries written', written) | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|   logging.basicConfig(level=logging.DEBUG) | ||||
|   # logging.getLogger('peewee').setLevel(logging.CRITICAL) | ||||
|   logging.getLogger('peewee').setLevel(logging.WARNING) | ||||
|   logging.getLogger('boto').setLevel(logging.WARNING) | ||||
|   logging.getLogger('data.database').setLevel(logging.WARNING) | ||||
|   backfill_content_checksums() | ||||
|  |  | |||
|  | @ -1,48 +1,81 @@ | |||
| import logging | ||||
| from data.database import Image, ImageStorage, db | ||||
| 
 | ||||
| from data.database import BaseModel | ||||
| from peewee import (fn, CharField, BigIntegerField, ForeignKeyField, BooleanField, DateTimeField, | ||||
|                     TextField, IntegerField) | ||||
| from app import app | ||||
| from util.migrate.allocator import yield_random_entries | ||||
| 
 | ||||
| 
 | ||||
| BATCH_SIZE = 1000 | ||||
| 
 | ||||
| 
 | ||||
| class Repository(BaseModel): | ||||
|   pass | ||||
| 
 | ||||
| 
 | ||||
| # Vendor the information from tables we will be writing to at the time of this migration | ||||
| class ImageStorage(BaseModel): | ||||
|   uuid = CharField(index=True, unique=True) | ||||
|   checksum = CharField(null=True) | ||||
|   image_size = BigIntegerField(null=True) | ||||
|   uncompressed_size = BigIntegerField(null=True) | ||||
|   uploading = BooleanField(default=True, null=True) | ||||
|   cas_path = BooleanField(default=True) | ||||
|   content_checksum = CharField(null=True, index=True) | ||||
| 
 | ||||
| 
 | ||||
| class Image(BaseModel): | ||||
|   docker_image_id = CharField(index=True) | ||||
|   repository = ForeignKeyField(Repository) | ||||
|   ancestors = CharField(index=True, default='/', max_length=64535, null=True) | ||||
|   storage = ForeignKeyField(ImageStorage, index=True, null=True) | ||||
|   created = DateTimeField(null=True) | ||||
|   comment = TextField(null=True) | ||||
|   command = TextField(null=True) | ||||
|   aggregate_size = BigIntegerField(null=True) | ||||
|   v1_json_metadata = TextField(null=True) | ||||
|   v1_checksum = CharField(null=True) | ||||
| 
 | ||||
|   security_indexed = BooleanField(default=False) | ||||
|   security_indexed_engine = IntegerField(default=-1) | ||||
|   parent_id = IntegerField(index=True, null=True) | ||||
| 
 | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| def backfill_parent_id(): | ||||
|   logger.setLevel(logging.DEBUG) | ||||
| 
 | ||||
|   logger.debug('backfill_parent_id: Starting') | ||||
|   logger.debug('backfill_parent_id: This can be a LONG RUNNING OPERATION. Please wait!') | ||||
| 
 | ||||
|   # Check for any images without parent | ||||
|   has_images = bool(list(Image | ||||
|                          .select(Image.id) | ||||
|                          .join(ImageStorage) | ||||
|                          .where(Image.parent >> None, Image.ancestors != '/', ImageStorage.uploading == False) | ||||
|                          .limit(1))) | ||||
|   def fetch_batch(): | ||||
|     return (Image | ||||
|             .select(Image.id, Image.ancestors) | ||||
|             .join(ImageStorage) | ||||
|             .where(Image.parent_id >> None, Image.ancestors != '/', | ||||
|                    ImageStorage.uploading == False)) | ||||
| 
 | ||||
|   if not has_images: | ||||
|     logger.debug('backfill_parent_id: No migration needed') | ||||
|     return | ||||
|   max_id = Image.select(fn.Max(Image.id)).scalar() | ||||
| 
 | ||||
|   while True: | ||||
|     # Load the record from the DB. | ||||
|     batch_images_ids = list(Image | ||||
|                             .select(Image.id) | ||||
|                             .join(ImageStorage) | ||||
|                             .where(Image.parent >> None, Image.ancestors != '/', ImageStorage.uploading == False) | ||||
|                             .limit(100)) | ||||
|   written = 0 | ||||
|   for to_backfill, abort in yield_random_entries(fetch_batch, Image.id, BATCH_SIZE, max_id): | ||||
|     computed_parent = int(to_backfill.ancestors.split('/')[-2]) | ||||
|     num_changed = (Image | ||||
|                    .update(parent_id=computed_parent) | ||||
|                    .where(Image.id == to_backfill.id, Image.parent_id >> None)).execute() | ||||
|     if num_changed == 0: | ||||
|       logger.info('Collision with another worker, aborting batch') | ||||
|       abort.set() | ||||
|     written += num_changed | ||||
|     if (written % BATCH_SIZE) == 0: | ||||
|       logger.debug('%s entries written', written) | ||||
| 
 | ||||
|     if len(batch_images_ids) == 0: | ||||
|       logger.debug('backfill_parent_id: Completed') | ||||
|       return | ||||
|   logger.debug('backfill_parent_id: Completed, updated %s entries', written) | ||||
| 
 | ||||
|     for image_id in batch_images_ids: | ||||
|       with app.config['DB_TRANSACTION_FACTORY'](db): | ||||
|         try: | ||||
|           image = Image.select(Image.id, Image.ancestors).where(Image.id == image_id).get() | ||||
|           image.parent = image.ancestors.split('/')[-2] | ||||
|           image.save() | ||||
|         except Image.DoesNotExist: | ||||
|           pass | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
| if __name__ == '__main__': | ||||
|   logging.basicConfig(level=logging.DEBUG) | ||||
|   logging.getLogger('peewee').setLevel(logging.CRITICAL) | ||||
| 
 | ||||
|  |  | |||
|  | @ -1,12 +1,15 @@ | |||
| import logging | ||||
| 
 | ||||
| from peewee import (CharField, BigIntegerField, BooleanField, ForeignKeyField, DateTimeField, | ||||
|                     TextField) | ||||
| from data.database import BaseModel, db, db_for_update | ||||
| from util.migrate import yield_random_entries | ||||
|                     TextField, fn) | ||||
| from data.database import BaseModel | ||||
| from util.migrate.allocator import yield_random_entries | ||||
| from app import app | ||||
| 
 | ||||
| 
 | ||||
| BATCH_SIZE = 1000 | ||||
| 
 | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -40,28 +43,30 @@ class Image(BaseModel): | |||
| 
 | ||||
| def backfill_checksums(): | ||||
|   """ Copies checksums from image storages to their images. """ | ||||
|   logger.debug('Image v1 checksum backfill: Began execution') | ||||
|   logger.debug('Began execution') | ||||
|   logger.debug('This may be a long operation!') | ||||
|   def batch_query(): | ||||
|     return (Image | ||||
|             .select(Image.id) | ||||
|             .select(Image, ImageStorage) | ||||
|             .join(ImageStorage) | ||||
|             .where(Image.v1_checksum >> None, ImageStorage.uploading == False, | ||||
|                    ~(ImageStorage.checksum >> None))) | ||||
| 
 | ||||
|   for candidate_image in yield_random_entries(batch_query, 10000, 0.1): | ||||
|     logger.debug('Computing content checksum for storage: %s', candidate_image.id) | ||||
|   max_id = Image.select(fn.Max(Image.id)).scalar() | ||||
| 
 | ||||
|     with app.config['DB_TRANSACTION_FACTORY'](db): | ||||
|       try: | ||||
|         image = db_for_update(Image | ||||
|                               .select(Image, ImageStorage) | ||||
|                               .join(ImageStorage) | ||||
|                               .where(Image.id == candidate_image.id)).get() | ||||
|   written = 0 | ||||
|   for candidate_image, abort in yield_random_entries(batch_query, Image.id, BATCH_SIZE, max_id): | ||||
|     num_changed = (Image | ||||
|                    .update(v1_checksum=candidate_image.storage.checksum) | ||||
|                    .where(Image.id == candidate_image.id, Image.v1_checksum >> None)).execute() | ||||
|     if num_changed == 0: | ||||
|       logger.info('Collision with another worker, aborting batch') | ||||
|       abort.set() | ||||
|     written += num_changed | ||||
|     if (written % BATCH_SIZE) == 0: | ||||
|       logger.debug('%s entries written', written) | ||||
| 
 | ||||
|         image.v1_checksum = image.storage.checksum | ||||
|         image.save() | ||||
|       except Image.DoesNotExist: | ||||
|         pass | ||||
|   logger.debug('Completed, updated %s entries', written) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|  |  | |||
|  | @ -104,8 +104,8 @@ def backfill_v1_metadata(): | |||
|           try: | ||||
|             data = storage.get_content(repo_image.storage.locations, json_path) | ||||
|           except IOError: | ||||
|             data = None | ||||
|             logger.exception('failed to find v1 metadata, defaulting to None') | ||||
|             data = "{}" | ||||
|             logger.warning('failed to find v1 metadata, defaulting to {}') | ||||
|           repo_image.v1_json_metadata = data | ||||
|           repo_image.save() | ||||
|         except ImageStoragePlacement.DoesNotExist: | ||||
|  |  | |||
		Reference in a new issue