This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/workers/storagereplication.py
Joseph Schorr 7b72cf8b27 Small fix for georeplication and add better logs
Previously, if we attempted to georeplicate storage from the existing location and, somehow, that existing location did not exist, we'd still mark the new location as invalid. This is a major problem for storage engines that are not consistent. Now, we first try a back off strategy to find the image in the existing storage and, as well, if the replication fails in any way, we log it.
2017-06-23 17:07:05 -04:00

133 lines
5.2 KiB
Python

import logging
import time
import features
from app import app, storage, image_replication_queue
from data.database import CloseForLongOperation
from data import model
from workers.queueworker import QueueWorker, WorkerUnhealthyException
from util.log import logfile_path
logger = logging.getLogger(__name__)
POLL_PERIOD_SECONDS = 10
RESERVATION_SECONDS = app.config.get('STORAGE_REPLICATION_PROCESSING_SECONDS', 60*20)
class StorageReplicationWorker(QueueWorker):
def process_queue_item(self, job_details):
storage_uuid = job_details['storage_id']
namespace_id = job_details['namespace_user_id']
logger.debug('Starting replication of image storage %s under namespace %s', storage_uuid,
namespace_id)
try:
namespace = model.user.get_namespace_user_by_user_id(namespace_id)
except model.user.InvalidUsernameException:
logger.exception('Exception when looking up namespace %s for replication of image storage %s',
namespace_id, storage_uuid)
return
succeeded = self.replicate_storage(namespace, storage_uuid)
logger.debug('Replication finished of image storage %s under namespace %s: %s',
storage_uuid, namespace_id, succeeded)
if not succeeded:
raise WorkerUnhealthyException()
def replicate_storage(self, namespace, storage_uuid):
# Lookup the namespace and its associated regions.
if not namespace:
logger.debug('Unknown namespace when trying to replicate storage %s', storage_uuid)
return True
locations = model.user.get_region_locations(namespace)
# Lookup the image storage.
partial_storage = model.storage.get_storage_by_uuid(storage_uuid)
if not partial_storage:
logger.debug('Unknown storage: %s', storage_uuid)
return True
# Check to see if the image is at all the required locations.
locations_required = locations | set(storage.default_locations)
locations_missing = locations_required - set(partial_storage.locations)
logger.debug('For replication of storage %s under namespace %s: %s required; %s missing',
storage_uuid, namespace.username, locations_required, locations_missing)
if not locations_missing:
logger.debug('No missing locations for storage %s under namespace %s. Required: %s',
storage_uuid, namespace.username, locations_required)
return True
# For any missing storage locations, initiate a copy.
existing_location = list(partial_storage.locations)[0]
path_to_copy = model.storage.get_layer_path(partial_storage)
# Lookup the existing location. If not found, progressively sleep a few times to handle the case
# of not fully consistent storage.
for retry in range(0, 3):
if storage.exists([existing_location], path_to_copy):
break
logger.debug('Cannot find image storage %s in existing location %s (try #%s)',
storage_uuid, existing_location, retry)
time.sleep(pow(2, retry) * 5)
if not storage.exists([existing_location], path_to_copy):
logger.warning('Cannot find image storage %s in existing location %s; stopping replication',
storage_uuid, existing_location)
return False
# For each missing location, copy over the storage.
for location in locations_missing:
logger.debug('Starting copy of storage %s to location %s from %s', partial_storage.uuid,
location, existing_location)
# Copy the binary data.
copied = False
try:
with CloseForLongOperation(app.config):
storage.copy_between(path_to_copy, existing_location, location)
copied = True
except:
logger.exception('Exception when copying path %s of image storage %s to location %s',
path_to_copy, partial_storage.uuid, location)
return False
# Create the storage location record for the storage now that the copies have
# completed.
if copied:
model.storage.add_storage_placement(partial_storage, location)
logger.debug('Finished copy of image storage %s to location %s from %s',
partial_storage.uuid, location, existing_location)
logger.debug('Completed replication of image storage %s to locations %s from %s',
partial_storage.uuid, locations_missing, existing_location)
return True
if __name__ == "__main__":
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
has_local_storage = False
if features.STORAGE_REPLICATION:
for storage_type, _ in app.config.get('DISTRIBUTED_STORAGE_CONFIG', {}).values():
if storage_type == 'LocalStorage':
has_local_storage = True
break
if not features.STORAGE_REPLICATION or has_local_storage:
if has_local_storage:
logger.error("Storage replication can't be used with local storage")
else:
logger.debug('Full storage replication disabled; skipping')
while True:
time.sleep(10000)
logger.debug('Starting replication worker')
worker = StorageReplicationWorker(image_replication_queue,
poll_period_seconds=POLL_PERIOD_SECONDS,
reservation_seconds=RESERVATION_SECONDS)
worker.start()