From 6a9634dffb8b2a1a4e6f78305e1637a92a748e8a Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 31 Oct 2018 14:13:25 -0400 Subject: [PATCH] Harden the storage replication worker to failures by explicitly catching certain errors, having better backoff and explicitly ensuring a layer has been replicated to a region before adding the placement into the database Also adds tests for the various failure cases --- storage/fakestorage.py | 47 ++++++---- workers/storagereplication.py | 71 ++++++++------ workers/test/test_storagereplication.py | 118 ++++++++++++++++++++++-- 3 files changed, 181 insertions(+), 55 deletions(-) diff --git a/storage/fakestorage.py b/storage/fakestorage.py index 40a4e6f62..3781aa4ae 100644 --- a/storage/fakestorage.py +++ b/storage/fakestorage.py @@ -6,11 +6,13 @@ from uuid import uuid4 from storage.basestorage import BaseStorageV2 -_FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO) +_GLOBAL_FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO) class FakeStorage(BaseStorageV2): def __init__(self, context): super(FakeStorage, self).__init__() + self._fake_storage_map = (defaultdict(StringIO.StringIO) + if context == 'local' else _GLOBAL_FAKE_STORAGE_MAP) def _init_path(self, path=None, create=False): return path @@ -25,18 +27,18 @@ class FakeStorage(BaseStorageV2): return None def get_content(self, path): - if not path in _FAKE_STORAGE_MAP: - raise IOError('Fake file %s not found. Exist: %s' % (path, _FAKE_STORAGE_MAP.keys())) + if not path in self._fake_storage_map: + raise IOError('Fake file %s not found. Exist: %s' % (path, self._fake_storage_map.keys())) - _FAKE_STORAGE_MAP.get(path).seek(0) - return _FAKE_STORAGE_MAP.get(path).read() + self._fake_storage_map.get(path).seek(0) + return self._fake_storage_map.get(path).read() def put_content(self, path, content): - _FAKE_STORAGE_MAP.pop(path, None) - _FAKE_STORAGE_MAP[path].write(content) + self._fake_storage_map.pop(path, None) + self._fake_storage_map[path].write(content) def stream_read(self, path): - io_obj = _FAKE_STORAGE_MAP[path] + io_obj = self._fake_storage_map[path] io_obj.seek(0) while True: buf = io_obj.read(self.buffer_size) @@ -48,40 +50,49 @@ class FakeStorage(BaseStorageV2): return StringIO.StringIO(self.get_content(path)) def stream_write(self, path, fp, content_type=None, content_encoding=None): - out_fp = _FAKE_STORAGE_MAP[path] + out_fp = self._fake_storage_map[path] out_fp.seek(0) self.stream_write_to_fp(fp, out_fp) def remove(self, path): - _FAKE_STORAGE_MAP.pop(path, None) + self._fake_storage_map.pop(path, None) def exists(self, path): - if _FAKE_STORAGE_MAP.get('all_files_exist', None): + if self._fake_storage_map.get('all_files_exist', None): return True - return path in _FAKE_STORAGE_MAP + return path in self._fake_storage_map def get_checksum(self, path): - return hashlib.sha256(_FAKE_STORAGE_MAP[path].read()).hexdigest()[:7] + return hashlib.sha256(self._fake_storage_map[path].read()).hexdigest()[:7] def initiate_chunked_upload(self): new_uuid = str(uuid4()) - _FAKE_STORAGE_MAP[new_uuid].seek(0) + self._fake_storage_map[new_uuid].seek(0) return new_uuid, {} def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None): - upload_storage = _FAKE_STORAGE_MAP[uuid] + upload_storage = self._fake_storage_map[uuid] try: return self.stream_write_to_fp(in_fp, upload_storage, length), {}, None except IOError as ex: return 0, {}, ex def complete_chunked_upload(self, uuid, final_path, _): - _FAKE_STORAGE_MAP[final_path] = _FAKE_STORAGE_MAP[uuid] - _FAKE_STORAGE_MAP.pop(uuid, None) + self._fake_storage_map[final_path] = self._fake_storage_map[uuid] + self._fake_storage_map.pop(uuid, None) def cancel_chunked_upload(self, uuid, _): - _FAKE_STORAGE_MAP.pop(uuid, None) + self._fake_storage_map.pop(uuid, None) def copy_to(self, destination, path): + if self.exists('break_copying'): + raise IOError('Broken!') + + if self.exists('fake_copying'): + return + + if self.exists('except_copying'): + raise Exception("I'm an exception!") + content = self.get_content(path) destination.put_content(path, content) diff --git a/workers/storagereplication.py b/workers/storagereplication.py index 56bf0cf2b..005bd2ee8 100644 --- a/workers/storagereplication.py +++ b/workers/storagereplication.py @@ -3,10 +3,10 @@ import time import features -from app import app, storage, image_replication_queue +from app import app, storage as app_storage, image_replication_queue from data.database import CloseForLongOperation from data import model -from workers.queueworker import QueueWorker, WorkerUnhealthyException +from workers.queueworker import QueueWorker, WorkerUnhealthyException, JobException from util.log import logfile_path logger = logging.getLogger(__name__) @@ -28,17 +28,28 @@ class StorageReplicationWorker(QueueWorker): namespace_id, storage_uuid) return - succeeded = self.replicate_storage(namespace, storage_uuid) - logger.debug('Replication finished of image storage %s under namespace %s: %s', - storage_uuid, namespace_id, succeeded) - if not succeeded: - raise WorkerUnhealthyException() + self.replicate_storage(namespace, storage_uuid, app_storage) - def replicate_storage(self, namespace, storage_uuid): + def _backoff_check_exists(self, location, path, storage, backoff_check=True): + for retry in range(0, 4): + if storage.exists([location], path): + return True + + if not backoff_check: + return False + + seconds = pow(2, retry) * 2 + logger.debug('Cannot find path `%s` in location %s (try #%s). Sleeping for %s seconds', + path, location, retry, seconds) + time.sleep(seconds) + + return False + + def replicate_storage(self, namespace, storage_uuid, storage, backoff_check=True): # Lookup the namespace and its associated regions. if not namespace: logger.debug('Unknown namespace when trying to replicate storage %s', storage_uuid) - return True + return locations = model.user.get_region_locations(namespace) @@ -47,7 +58,7 @@ class StorageReplicationWorker(QueueWorker): partial_storage = model.storage.get_storage_by_uuid(storage_uuid) except model.InvalidImageException: logger.debug('Unknown storage: %s', storage_uuid) - return True + return # Check to see if the image is at all the required locations. locations_required = locations | set(storage.default_locations) @@ -59,26 +70,17 @@ class StorageReplicationWorker(QueueWorker): if not locations_missing: logger.debug('No missing locations for storage %s under namespace %s. Required: %s', storage_uuid, namespace.username, locations_required) - return True + return # For any missing storage locations, initiate a copy. existing_location = list(partial_storage.locations)[0] path_to_copy = model.storage.get_layer_path(partial_storage) - # Lookup the existing location. If not found, progressively sleep a few times to handle the case - # of not fully consistent storage. - for retry in range(0, 3): - if storage.exists([existing_location], path_to_copy): - break - - logger.debug('Cannot find image storage %s in existing location %s (try #%s)', - storage_uuid, existing_location, retry) - time.sleep(pow(2, retry) * 5) - - if not storage.exists([existing_location], path_to_copy): + # Lookup and ensure the existing location exists. + if not self._backoff_check_exists(existing_location, path_to_copy, storage, backoff_check): logger.warning('Cannot find image storage %s in existing location %s; stopping replication', storage_uuid, existing_location) - return False + raise JobException() # For each missing location, copy over the storage. for location in locations_missing: @@ -91,21 +93,32 @@ class StorageReplicationWorker(QueueWorker): with CloseForLongOperation(app.config): storage.copy_between(path_to_copy, existing_location, location) copied = True - except: - logger.exception('Exception when copying path %s of image storage %s to location %s', + except IOError: + logger.exception('Failed to copy path `%s` of image storage %s to location %s', path_to_copy, partial_storage.uuid, location) - return False + raise JobException() + except: + logger.exception('Unknown exception when copying path %s of image storage %s to loc %s', + path_to_copy, partial_storage.uuid, location) + raise WorkerUnhealthyException() - # Create the storage location record for the storage now that the copies have - # completed. if copied: + # Verify the data was copied to the target storage, to ensure that there are no cases + # where we write the placement without knowing the data is present. + if not self._backoff_check_exists(location, path_to_copy, storage, backoff_check): + logger.warning('Failed to find path `%s` in location `%s` after copy', path_to_copy, + location) + raise JobException() + + # Create the storage location record for the storage now that the copy has + # completed. model.storage.add_storage_placement(partial_storage, location) + logger.debug('Finished copy of image storage %s to location %s from %s', partial_storage.uuid, location, existing_location) logger.debug('Completed replication of image storage %s to locations %s from %s', partial_storage.uuid, locations_missing, existing_location) - return True if __name__ == "__main__": diff --git a/workers/test/test_storagereplication.py b/workers/test/test_storagereplication.py index c5c6c3ad9..8487b1d97 100644 --- a/workers/test/test_storagereplication.py +++ b/workers/test/test_storagereplication.py @@ -2,10 +2,12 @@ import hashlib import pytest -from app import storage from data import model, database from storage.basestorage import StoragePaths -from workers.storagereplication import StorageReplicationWorker +from storage.fakestorage import FakeStorage +from storage.distributedstorage import DistributedStorage +from workers.storagereplication import (StorageReplicationWorker, JobException, + WorkerUnhealthyException) from test.fixtures import * @@ -30,15 +32,20 @@ def replication_worker(): return StorageReplicationWorker(None) -def test_storage_replication_v1(storage_user, storage_paths, replication_worker, app): +@pytest.fixture() +def storage(): + return DistributedStorage({'local_us': FakeStorage('local'), 'local_eu': FakeStorage('local')}, + ['local_us']) + + +def test_storage_replication_v1(storage_user, storage_paths, replication_worker, storage, app): # Add a storage entry with a V1 path. v1_storage = model.storage.create_v1_storage('local_us') content_path = storage_paths.v1_image_layer_path(v1_storage.uuid) storage.put_content(['local_us'], content_path, 'some content') # Call replicate on it and verify it replicates. - result = replication_worker.replicate_storage(storage_user, v1_storage.uuid) - assert result + replication_worker.replicate_storage(storage_user, v1_storage.uuid, storage) # Ensure that the data was replicated to the other "region". assert storage.get_content(['local_eu'], content_path) == 'some content' @@ -47,7 +54,7 @@ def test_storage_replication_v1(storage_user, storage_paths, replication_worker, assert len(locations) == 2 -def test_storage_replication_cas(storage_user, storage_paths, replication_worker, app): +def test_storage_replication_cas(storage_user, storage_paths, replication_worker, storage, app): # Add a storage entry with a CAS path. content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest() cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum) @@ -59,11 +66,106 @@ def test_storage_replication_cas(storage_user, storage_paths, replication_worker storage.put_content(['local_us'], content_path, 'some content') # Call replicate on it and verify it replicates. - result = replication_worker.replicate_storage(storage_user, cas_storage.uuid) - assert result + replication_worker.replicate_storage(storage_user, cas_storage.uuid, storage) # Ensure that the data was replicated to the other "region". assert storage.get_content(['local_eu'], content_path) == 'some content' locations = model.storage.get_storage_locations(cas_storage.uuid) assert len(locations) == 2 + + +def test_storage_replication_missing_base(storage_user, storage_paths, replication_worker, storage, + app): + # Add a storage entry with a CAS path. + content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest() + cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum) + + location = database.ImageStorageLocation.get(name='local_us') + database.ImageStoragePlacement.create(storage=cas_storage, location=location) + + # Attempt to replicate storage. This should fail because the layer is missing from the base + # storage. + with pytest.raises(JobException): + replication_worker.replicate_storage(storage_user, cas_storage.uuid, storage, + backoff_check=False) + + # Ensure the storage location count remains 1. This is technically inaccurate, but that's okay + # as we still require at least one location per storage. + locations = model.storage.get_storage_locations(cas_storage.uuid) + assert len(locations) == 1 + + +def test_storage_replication_copy_error(storage_user, storage_paths, replication_worker, storage, + app): + # Add a storage entry with a CAS path. + content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest() + cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum) + + location = database.ImageStorageLocation.get(name='local_us') + database.ImageStoragePlacement.create(storage=cas_storage, location=location) + + content_path = storage_paths.blob_path(cas_storage.content_checksum) + storage.put_content(['local_us'], content_path, 'some content') + + # Tell storage to break copying. + storage.put_content(['local_us'], 'break_copying', 'true') + + # Attempt to replicate storage. This should fail because the write fails. + with pytest.raises(JobException): + replication_worker.replicate_storage(storage_user, cas_storage.uuid, storage, + backoff_check=False) + + # Ensure the storage location count remains 1. + locations = model.storage.get_storage_locations(cas_storage.uuid) + assert len(locations) == 1 + + +def test_storage_replication_copy_didnot_copy(storage_user, storage_paths, replication_worker, + storage, app): + # Add a storage entry with a CAS path. + content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest() + cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum) + + location = database.ImageStorageLocation.get(name='local_us') + database.ImageStoragePlacement.create(storage=cas_storage, location=location) + + content_path = storage_paths.blob_path(cas_storage.content_checksum) + storage.put_content(['local_us'], content_path, 'some content') + + # Tell storage to fake copying (i.e. not actually copy the data). + storage.put_content(['local_us'], 'fake_copying', 'true') + + # Attempt to replicate storage. This should fail because the copy doesn't actually do the copy. + with pytest.raises(JobException): + replication_worker.replicate_storage(storage_user, cas_storage.uuid, storage, + backoff_check=False) + + # Ensure the storage location count remains 1. + locations = model.storage.get_storage_locations(cas_storage.uuid) + assert len(locations) == 1 + + +def test_storage_replication_copy_unhandled_exception(storage_user, storage_paths, + replication_worker, storage, app): + # Add a storage entry with a CAS path. + content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest() + cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum) + + location = database.ImageStorageLocation.get(name='local_us') + database.ImageStoragePlacement.create(storage=cas_storage, location=location) + + content_path = storage_paths.blob_path(cas_storage.content_checksum) + storage.put_content(['local_us'], content_path, 'some content') + + # Tell storage to raise an exception when copying. + storage.put_content(['local_us'], 'except_copying', 'true') + + # Attempt to replicate storage. This should fail because the copy raises an unhandled exception. + with pytest.raises(WorkerUnhealthyException): + replication_worker.replicate_storage(storage_user, cas_storage.uuid, storage, + backoff_check=False) + + # Ensure the storage location count remains 1. + locations = model.storage.get_storage_locations(cas_storage.uuid) + assert len(locations) == 1