From 5cd793331e7da0f3afc58a40ba7601c0b79fbd32 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 11 Jul 2016 14:33:29 -0400 Subject: [PATCH] Fix storage replication for CAS and add tests --- storage/fakestorage.py | 5 ++- test/test_storagereplication.py | 66 +++++++++++++++++++++++++++++++++ workers/storagereplication.py | 26 ++++++++----- 3 files changed, 86 insertions(+), 11 deletions(-) create mode 100644 test/test_storagereplication.py diff --git a/storage/fakestorage.py b/storage/fakestorage.py index 806911095..0fe0b4213 100644 --- a/storage/fakestorage.py +++ b/storage/fakestorage.py @@ -58,7 +58,6 @@ class FakeStorage(BaseStorageV2): def exists(self, path): if _FAKE_STORAGE_MAP.get('all_files_exist', None): return True - return path in _FAKE_STORAGE_MAP def get_checksum(self, path): @@ -83,3 +82,7 @@ class FakeStorage(BaseStorageV2): def cancel_chunked_upload(self, uuid, _): _FAKE_STORAGE_MAP.pop(uuid, None) + + def copy_to(self, destination, path): + content = self.get_content(path) + destination.put_content(path, content) diff --git a/test/test_storagereplication.py b/test/test_storagereplication.py new file mode 100644 index 000000000..fd3b679e8 --- /dev/null +++ b/test/test_storagereplication.py @@ -0,0 +1,66 @@ +import unittest +import hashlib + +from app import storage +from initdb import setup_database_for_testing, finished_database_for_testing +from data import model, database +from workers.storagereplication import StorageReplicationWorker +from storage.basestorage import StoragePaths + +class TestStorageReplication(unittest.TestCase): + def setUp(self): + setup_database_for_testing(self) + + self.worker = StorageReplicationWorker(None) + self.paths = StoragePaths() + + # Add both regions for a user. + self.user = model.user.get_user('devtable') + database.UserRegion.create(user=self.user, + location=database.ImageStorageLocation.get(name='local_us')) + database.UserRegion.create(user=self.user, + location=database.ImageStorageLocation.get(name='local_eu')) + + def tearDown(self): + finished_database_for_testing(self) + + def test_storage_replication_v1(self): + # Add a storage entry with a V1 path. + v1_storage = model.storage.create_v1_storage('local_us') + content_path = self.paths.v1_image_layer_path(v1_storage.uuid) + storage.put_content(['local_us'], content_path, 'some content') + + # Call replicate on it and verify it replicates. + result = self.worker.replicate_storage(self.user, v1_storage.uuid) + self.assertTrue(result) + + # Ensure that the data was replicated to the other "region". + self.assertEquals('some content', storage.get_content(['local_eu'], content_path)) + + locations = model.storage.get_storage_locations(v1_storage.uuid) + self.assertEquals(2, len(locations)) + + def test_storage_replication_cas(self): + # Add a storage entry with a CAS path. + content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest() + cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum) + + location = database.ImageStorageLocation.get(name='local_us') + database.ImageStoragePlacement.create(storage=cas_storage, location=location) + + content_path = self.paths.blob_path(cas_storage.content_checksum) + storage.put_content(['local_us'], content_path, 'some content') + + # Call replicate on it and verify it replicates. + result = self.worker.replicate_storage(self.user, cas_storage.uuid) + self.assertTrue(result) + + # Ensure that the data was replicated to the other "region". + self.assertEquals('some content', storage.get_content(['local_eu'], content_path)) + + locations = model.storage.get_storage_locations(cas_storage.uuid) + self.assertEquals(2, len(locations)) + + +if __name__ == '__main__': + unittest.main() diff --git a/workers/storagereplication.py b/workers/storagereplication.py index a096f3d95..627ae9137 100644 --- a/workers/storagereplication.py +++ b/workers/storagereplication.py @@ -6,7 +6,7 @@ from app import app, storage, image_replication_queue from data.database import CloseForLongOperation from data import model from storage.basestorage import StoragePaths -from workers.queueworker import QueueWorker +from workers.queueworker import QueueWorker, WorkerUnhealthyException logger = logging.getLogger(__name__) @@ -18,7 +18,11 @@ class StorageReplicationWorker(QueueWorker): logger.debug('Starting replication of image storage %s', storage_uuid) namespace = model.user.get_namespace_user_by_user_id(job_details['namespace_user_id']) + if not self.replicate_storage(namespace, storage_uuid): + raise WorkerUnhealthyException() + + def replicate_storage(self, namespace, storage_uuid): # Lookup the namespace and its associated regions. if not namespace: logger.debug('Unknown namespace: %s', namespace) @@ -48,24 +52,26 @@ class StorageReplicationWorker(QueueWorker): for location in locations_missing: logger.debug('Copying image storage %s to location %s', partial_storage.uuid, location) - # Copy the various paths. - paths = [storage_paths.v1_image_layer_path] + # Copy the binary data. + path_to_copy = model.storage.get_layer_path(partial_storage) + copied = False try: - for path_builder in paths: - current_path = path_builder(partial_storage.uuid) + if storage.exists([existing_location], path_to_copy): with CloseForLongOperation(app.config): - storage.copy_between(current_path, existing_location, location) + storage.copy_between(path_to_copy, existing_location, location) + copied = True except: logger.exception('Exception when copying path %s of image storage %s to location %s', - current_path, partial_storage.uuid, location) + path_to_copy, partial_storage.uuid, location) return False # Create the storage location record for the storage now that the copies have # completed. - model.storage.add_storage_placement(partial_storage, location) - logger.debug('Finished copy of image storage %s to location %s', - partial_storage.uuid, location) + if copied: + model.storage.add_storage_placement(partial_storage, location) + logger.debug('Finished copy of image storage %s to location %s', + partial_storage.uuid, location) logger.debug('Completed replication of image storage %s to locations %s', partial_storage.uuid, locations_missing)