98 lines
		
	
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			98 lines
		
	
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| import features
 | |
| import time
 | |
| 
 | |
| from app import app, storage, image_replication_queue
 | |
| from data.database import UseThenDisconnect, CloseForLongOperation
 | |
| from data import model
 | |
| from storage.basestorage import StoragePaths
 | |
| from workers.queueworker import QueueWorker
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| POLL_PERIOD_SECONDS = 10
 | |
| 
 | |
| class StorageReplicationWorker(QueueWorker):
 | |
|   def process_queue_item(self, job_details):
 | |
|     storage_uuid = job_details['storage_id']
 | |
|     logger.debug('Starting replication of image storage %s', storage_uuid)
 | |
| 
 | |
|     namespace = model.user.get_namespace_user_by_user_id(job_details['namespace_user_id'])
 | |
| 
 | |
|     # Lookup the namespace and its associated regions.
 | |
|     if not namespace:
 | |
|       logger.debug('Unknown namespace: %s', namespace)
 | |
|       return True
 | |
| 
 | |
|     locations = model.user.get_region_locations(namespace)
 | |
| 
 | |
|     # Lookup the image storage.
 | |
|     partial_storage = model.storage.get_storage_by_uuid(storage_uuid)
 | |
|     if not partial_storage:
 | |
|       logger.debug('Unknown storage: %s', storage_uuid)
 | |
|       return True
 | |
| 
 | |
|     # Check to see if the image is at all the required locations.
 | |
|     locations_required = locations | set(storage.default_locations)
 | |
|     locations_missing = locations_required - set(partial_storage.locations)
 | |
| 
 | |
|     if not locations_missing:
 | |
|       logger.debug('No missing locations for storage %s under namespace %s',
 | |
|                    storage_uuid, namespace.username)
 | |
|       return True
 | |
| 
 | |
|     # For any missing storage locations, initiate a copy.
 | |
|     storage_paths = StoragePaths()
 | |
|     existing_location = list(partial_storage.locations)[0]
 | |
| 
 | |
|     for location in locations_missing:
 | |
|       logger.debug('Copying image storage %s to location %s', partial_storage.uuid, location)
 | |
| 
 | |
|       # Copy the various paths.
 | |
|       paths = [storage_paths.image_ancestry_path,
 | |
|                storage_paths.image_layer_path]
 | |
| 
 | |
|       try:
 | |
|         for path_builder in paths:
 | |
|           current_path = path_builder(partial_storage.uuid)
 | |
|           with CloseForLongOperation(app.config):
 | |
|             storage.copy_between(current_path, existing_location, location)
 | |
|       except:
 | |
|         logger.exception('Exception when copying path %s of image storage %s to location %s',
 | |
|                           current_path, partial_storage.uuid, location)
 | |
|         return False
 | |
| 
 | |
|       # Create the storage location record for the storage now that the copies have
 | |
|       # completed.
 | |
|       model.storage.add_storage_placement(partial_storage, location)
 | |
|       logger.debug('Finished copy of image storage %s to location %s',
 | |
|                    partial_storage.uuid, location)
 | |
| 
 | |
|     logger.debug('Completed replication of image storage %s to locations %s',
 | |
|                  partial_storage.uuid, locations_missing)
 | |
|     return True
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|   logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
 | |
| 
 | |
|   has_local_storage = False
 | |
| 
 | |
|   if features.STORAGE_REPLICATION:
 | |
|     for storage_type, _ in app.config.get('DISTRIBUTED_STORAGE_CONFIG', {}).values():
 | |
|       if storage_type == 'LocalStorage':
 | |
|         has_local_storage = True
 | |
|         break
 | |
| 
 | |
|   if not features.STORAGE_REPLICATION or has_local_storage:
 | |
|     if has_local_storage:
 | |
|       logger.error("Storage replication can't be used with local storage")
 | |
|     else:
 | |
|       logger.debug('Full storage replication disabled; skipping')
 | |
|     while True:
 | |
|       time.sleep(10000)
 | |
| 
 | |
|   logger.debug('Starting replication worker')
 | |
|   worker = StorageReplicationWorker(image_replication_queue,
 | |
|                                     poll_period_seconds=POLL_PERIOD_SECONDS)
 | |
|   worker.start()
 |