diff --git a/workers/storagereplication.py b/workers/storagereplication.py index 42e83b1d2..9eb3a9bd9 100644 --- a/workers/storagereplication.py +++ b/workers/storagereplication.py @@ -1,8 +1,8 @@ -import os import logging -import features import time +import features + from app import app, storage, image_replication_queue from data.database import CloseForLongOperation from data import model @@ -17,17 +17,27 @@ RESERVATION_SECONDS = app.config.get('STORAGE_REPLICATION_PROCESSING_SECONDS', 6 class StorageReplicationWorker(QueueWorker): def process_queue_item(self, job_details): storage_uuid = job_details['storage_id'] - logger.debug('Starting replication of image storage %s', storage_uuid) + namespace_id = job_details['namespace_user_id'] - namespace = model.user.get_namespace_user_by_user_id(job_details['namespace_user_id']) - if not self.replicate_storage(namespace, storage_uuid): + logger.debug('Starting replication of image storage %s under namespace %s', storage_uuid, + namespace_id) + try: + namespace = model.user.get_namespace_user_by_user_id(namespace_id) + except model.user.InvalidUsernameException: + logger.exception('Exception when looking up namespace %s for replication of image storage %s', + namespace_id, storage_uuid) + return + + succeeded = self.replicate_storage(namespace, storage_uuid) + logger.debug('Replication finished of image storage %s under namespace %s: %s', + storage_uuid, namespace_id, succeeded) + if not succeeded: raise WorkerUnhealthyException() - def replicate_storage(self, namespace, storage_uuid): # Lookup the namespace and its associated regions. if not namespace: - logger.debug('Unknown namespace: %s', namespace) + logger.debug('Unknown namespace when trying to replicate storage %s', storage_uuid) return True locations = model.user.get_region_locations(namespace) @@ -42,26 +52,44 @@ class StorageReplicationWorker(QueueWorker): locations_required = locations | set(storage.default_locations) locations_missing = locations_required - set(partial_storage.locations) + logger.debug('For replication of storage %s under namespace %s: %s required; %s missing', + storage_uuid, namespace.username, locations_required, locations_missing) + if not locations_missing: - logger.debug('No missing locations for storage %s under namespace %s', - storage_uuid, namespace.username) + logger.debug('No missing locations for storage %s under namespace %s. Required: %s', + storage_uuid, namespace.username, locations_required) return True # For any missing storage locations, initiate a copy. existing_location = list(partial_storage.locations)[0] + path_to_copy = model.storage.get_layer_path(partial_storage) + # Lookup the existing location. If not found, progressively sleep a few times to handle the case + # of not fully consistent storage. + for retry in range(0, 3): + if storage.exists([existing_location], path_to_copy): + break + + logger.debug('Cannot find image storage %s in existing location %s (try #%s)', + storage_uuid, existing_location, retry) + time.sleep(pow(2, retry) * 5) + + if not storage.exists([existing_location], path_to_copy): + logger.warning('Cannot find image storage %s in existing location %s; stopping replication', + storage_uuid, existing_location) + return False + + # For each missing location, copy over the storage. for location in locations_missing: - logger.debug('Copying image storage %s to location %s', partial_storage.uuid, location) + logger.debug('Starting copy of storage %s to location %s from %s', partial_storage.uuid, + location, existing_location) # Copy the binary data. - path_to_copy = model.storage.get_layer_path(partial_storage) copied = False - try: - if storage.exists([existing_location], path_to_copy): - with CloseForLongOperation(app.config): - storage.copy_between(path_to_copy, existing_location, location) - copied = True + with CloseForLongOperation(app.config): + storage.copy_between(path_to_copy, existing_location, location) + copied = True except: logger.exception('Exception when copying path %s of image storage %s to location %s', path_to_copy, partial_storage.uuid, location) @@ -71,11 +99,11 @@ class StorageReplicationWorker(QueueWorker): # completed. if copied: model.storage.add_storage_placement(partial_storage, location) - logger.debug('Finished copy of image storage %s to location %s', - partial_storage.uuid, location) + logger.debug('Finished copy of image storage %s to location %s from %s', + partial_storage.uuid, location, existing_location) - logger.debug('Completed replication of image storage %s to locations %s', - partial_storage.uuid, locations_missing) + logger.debug('Completed replication of image storage %s to locations %s from %s', + partial_storage.uuid, locations_missing, existing_location) return True