Small fix for georeplication and add better logs
Previously, if we attempted to georeplicate storage from the existing location and, somehow, that existing location did not exist, we'd still mark the new location as invalid. This is a major problem for storage engines that are not consistent. Now, we first try a back off strategy to find the image in the existing storage and, as well, if the replication fails in any way, we log it.
This commit is contained in:
parent
2cd56a06fd
commit
7b72cf8b27
1 changed files with 48 additions and 20 deletions
|
@ -1,8 +1,8 @@
|
||||||
import os
|
|
||||||
import logging
|
import logging
|
||||||
import features
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
import features
|
||||||
|
|
||||||
from app import app, storage, image_replication_queue
|
from app import app, storage, image_replication_queue
|
||||||
from data.database import CloseForLongOperation
|
from data.database import CloseForLongOperation
|
||||||
from data import model
|
from data import model
|
||||||
|
@ -17,17 +17,27 @@ RESERVATION_SECONDS = app.config.get('STORAGE_REPLICATION_PROCESSING_SECONDS', 6
|
||||||
class StorageReplicationWorker(QueueWorker):
|
class StorageReplicationWorker(QueueWorker):
|
||||||
def process_queue_item(self, job_details):
|
def process_queue_item(self, job_details):
|
||||||
storage_uuid = job_details['storage_id']
|
storage_uuid = job_details['storage_id']
|
||||||
logger.debug('Starting replication of image storage %s', storage_uuid)
|
namespace_id = job_details['namespace_user_id']
|
||||||
|
|
||||||
namespace = model.user.get_namespace_user_by_user_id(job_details['namespace_user_id'])
|
logger.debug('Starting replication of image storage %s under namespace %s', storage_uuid,
|
||||||
if not self.replicate_storage(namespace, storage_uuid):
|
namespace_id)
|
||||||
|
try:
|
||||||
|
namespace = model.user.get_namespace_user_by_user_id(namespace_id)
|
||||||
|
except model.user.InvalidUsernameException:
|
||||||
|
logger.exception('Exception when looking up namespace %s for replication of image storage %s',
|
||||||
|
namespace_id, storage_uuid)
|
||||||
|
return
|
||||||
|
|
||||||
|
succeeded = self.replicate_storage(namespace, storage_uuid)
|
||||||
|
logger.debug('Replication finished of image storage %s under namespace %s: %s',
|
||||||
|
storage_uuid, namespace_id, succeeded)
|
||||||
|
if not succeeded:
|
||||||
raise WorkerUnhealthyException()
|
raise WorkerUnhealthyException()
|
||||||
|
|
||||||
|
|
||||||
def replicate_storage(self, namespace, storage_uuid):
|
def replicate_storage(self, namespace, storage_uuid):
|
||||||
# Lookup the namespace and its associated regions.
|
# Lookup the namespace and its associated regions.
|
||||||
if not namespace:
|
if not namespace:
|
||||||
logger.debug('Unknown namespace: %s', namespace)
|
logger.debug('Unknown namespace when trying to replicate storage %s', storage_uuid)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
locations = model.user.get_region_locations(namespace)
|
locations = model.user.get_region_locations(namespace)
|
||||||
|
@ -42,26 +52,44 @@ class StorageReplicationWorker(QueueWorker):
|
||||||
locations_required = locations | set(storage.default_locations)
|
locations_required = locations | set(storage.default_locations)
|
||||||
locations_missing = locations_required - set(partial_storage.locations)
|
locations_missing = locations_required - set(partial_storage.locations)
|
||||||
|
|
||||||
|
logger.debug('For replication of storage %s under namespace %s: %s required; %s missing',
|
||||||
|
storage_uuid, namespace.username, locations_required, locations_missing)
|
||||||
|
|
||||||
if not locations_missing:
|
if not locations_missing:
|
||||||
logger.debug('No missing locations for storage %s under namespace %s',
|
logger.debug('No missing locations for storage %s under namespace %s. Required: %s',
|
||||||
storage_uuid, namespace.username)
|
storage_uuid, namespace.username, locations_required)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# For any missing storage locations, initiate a copy.
|
# For any missing storage locations, initiate a copy.
|
||||||
existing_location = list(partial_storage.locations)[0]
|
existing_location = list(partial_storage.locations)[0]
|
||||||
|
path_to_copy = model.storage.get_layer_path(partial_storage)
|
||||||
|
|
||||||
|
# Lookup the existing location. If not found, progressively sleep a few times to handle the case
|
||||||
|
# of not fully consistent storage.
|
||||||
|
for retry in range(0, 3):
|
||||||
|
if storage.exists([existing_location], path_to_copy):
|
||||||
|
break
|
||||||
|
|
||||||
|
logger.debug('Cannot find image storage %s in existing location %s (try #%s)',
|
||||||
|
storage_uuid, existing_location, retry)
|
||||||
|
time.sleep(pow(2, retry) * 5)
|
||||||
|
|
||||||
|
if not storage.exists([existing_location], path_to_copy):
|
||||||
|
logger.warning('Cannot find image storage %s in existing location %s; stopping replication',
|
||||||
|
storage_uuid, existing_location)
|
||||||
|
return False
|
||||||
|
|
||||||
|
# For each missing location, copy over the storage.
|
||||||
for location in locations_missing:
|
for location in locations_missing:
|
||||||
logger.debug('Copying image storage %s to location %s', partial_storage.uuid, location)
|
logger.debug('Starting copy of storage %s to location %s from %s', partial_storage.uuid,
|
||||||
|
location, existing_location)
|
||||||
|
|
||||||
# Copy the binary data.
|
# Copy the binary data.
|
||||||
path_to_copy = model.storage.get_layer_path(partial_storage)
|
|
||||||
copied = False
|
copied = False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if storage.exists([existing_location], path_to_copy):
|
with CloseForLongOperation(app.config):
|
||||||
with CloseForLongOperation(app.config):
|
storage.copy_between(path_to_copy, existing_location, location)
|
||||||
storage.copy_between(path_to_copy, existing_location, location)
|
copied = True
|
||||||
copied = True
|
|
||||||
except:
|
except:
|
||||||
logger.exception('Exception when copying path %s of image storage %s to location %s',
|
logger.exception('Exception when copying path %s of image storage %s to location %s',
|
||||||
path_to_copy, partial_storage.uuid, location)
|
path_to_copy, partial_storage.uuid, location)
|
||||||
|
@ -71,11 +99,11 @@ class StorageReplicationWorker(QueueWorker):
|
||||||
# completed.
|
# completed.
|
||||||
if copied:
|
if copied:
|
||||||
model.storage.add_storage_placement(partial_storage, location)
|
model.storage.add_storage_placement(partial_storage, location)
|
||||||
logger.debug('Finished copy of image storage %s to location %s',
|
logger.debug('Finished copy of image storage %s to location %s from %s',
|
||||||
partial_storage.uuid, location)
|
partial_storage.uuid, location, existing_location)
|
||||||
|
|
||||||
logger.debug('Completed replication of image storage %s to locations %s',
|
logger.debug('Completed replication of image storage %s to locations %s from %s',
|
||||||
partial_storage.uuid, locations_missing)
|
partial_storage.uuid, locations_missing, existing_location)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
|
Reference in a new issue