Fix storage replication for CAS and add tests
This commit is contained in:
parent
3044f8ecbd
commit
5cd793331e
3 changed files with 86 additions and 11 deletions
|
@ -58,7 +58,6 @@ class FakeStorage(BaseStorageV2):
|
|||
def exists(self, path):
|
||||
if _FAKE_STORAGE_MAP.get('all_files_exist', None):
|
||||
return True
|
||||
|
||||
return path in _FAKE_STORAGE_MAP
|
||||
|
||||
def get_checksum(self, path):
|
||||
|
@ -83,3 +82,7 @@ class FakeStorage(BaseStorageV2):
|
|||
|
||||
def cancel_chunked_upload(self, uuid, _):
|
||||
_FAKE_STORAGE_MAP.pop(uuid, None)
|
||||
|
||||
def copy_to(self, destination, path):
|
||||
content = self.get_content(path)
|
||||
destination.put_content(path, content)
|
||||
|
|
66
test/test_storagereplication.py
Normal file
66
test/test_storagereplication.py
Normal file
|
@ -0,0 +1,66 @@
|
|||
import unittest
|
||||
import hashlib
|
||||
|
||||
from app import storage
|
||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||
from data import model, database
|
||||
from workers.storagereplication import StorageReplicationWorker
|
||||
from storage.basestorage import StoragePaths
|
||||
|
||||
class TestStorageReplication(unittest.TestCase):
|
||||
def setUp(self):
|
||||
setup_database_for_testing(self)
|
||||
|
||||
self.worker = StorageReplicationWorker(None)
|
||||
self.paths = StoragePaths()
|
||||
|
||||
# Add both regions for a user.
|
||||
self.user = model.user.get_user('devtable')
|
||||
database.UserRegion.create(user=self.user,
|
||||
location=database.ImageStorageLocation.get(name='local_us'))
|
||||
database.UserRegion.create(user=self.user,
|
||||
location=database.ImageStorageLocation.get(name='local_eu'))
|
||||
|
||||
def tearDown(self):
|
||||
finished_database_for_testing(self)
|
||||
|
||||
def test_storage_replication_v1(self):
|
||||
# Add a storage entry with a V1 path.
|
||||
v1_storage = model.storage.create_v1_storage('local_us')
|
||||
content_path = self.paths.v1_image_layer_path(v1_storage.uuid)
|
||||
storage.put_content(['local_us'], content_path, 'some content')
|
||||
|
||||
# Call replicate on it and verify it replicates.
|
||||
result = self.worker.replicate_storage(self.user, v1_storage.uuid)
|
||||
self.assertTrue(result)
|
||||
|
||||
# Ensure that the data was replicated to the other "region".
|
||||
self.assertEquals('some content', storage.get_content(['local_eu'], content_path))
|
||||
|
||||
locations = model.storage.get_storage_locations(v1_storage.uuid)
|
||||
self.assertEquals(2, len(locations))
|
||||
|
||||
def test_storage_replication_cas(self):
|
||||
# Add a storage entry with a CAS path.
|
||||
content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest()
|
||||
cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum)
|
||||
|
||||
location = database.ImageStorageLocation.get(name='local_us')
|
||||
database.ImageStoragePlacement.create(storage=cas_storage, location=location)
|
||||
|
||||
content_path = self.paths.blob_path(cas_storage.content_checksum)
|
||||
storage.put_content(['local_us'], content_path, 'some content')
|
||||
|
||||
# Call replicate on it and verify it replicates.
|
||||
result = self.worker.replicate_storage(self.user, cas_storage.uuid)
|
||||
self.assertTrue(result)
|
||||
|
||||
# Ensure that the data was replicated to the other "region".
|
||||
self.assertEquals('some content', storage.get_content(['local_eu'], content_path))
|
||||
|
||||
locations = model.storage.get_storage_locations(cas_storage.uuid)
|
||||
self.assertEquals(2, len(locations))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -6,7 +6,7 @@ from app import app, storage, image_replication_queue
|
|||
from data.database import CloseForLongOperation
|
||||
from data import model
|
||||
from storage.basestorage import StoragePaths
|
||||
from workers.queueworker import QueueWorker
|
||||
from workers.queueworker import QueueWorker, WorkerUnhealthyException
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -18,7 +18,11 @@ class StorageReplicationWorker(QueueWorker):
|
|||
logger.debug('Starting replication of image storage %s', storage_uuid)
|
||||
|
||||
namespace = model.user.get_namespace_user_by_user_id(job_details['namespace_user_id'])
|
||||
if not self.replicate_storage(namespace, storage_uuid):
|
||||
raise WorkerUnhealthyException()
|
||||
|
||||
|
||||
def replicate_storage(self, namespace, storage_uuid):
|
||||
# Lookup the namespace and its associated regions.
|
||||
if not namespace:
|
||||
logger.debug('Unknown namespace: %s', namespace)
|
||||
|
@ -48,21 +52,23 @@ class StorageReplicationWorker(QueueWorker):
|
|||
for location in locations_missing:
|
||||
logger.debug('Copying image storage %s to location %s', partial_storage.uuid, location)
|
||||
|
||||
# Copy the various paths.
|
||||
paths = [storage_paths.v1_image_layer_path]
|
||||
# Copy the binary data.
|
||||
path_to_copy = model.storage.get_layer_path(partial_storage)
|
||||
copied = False
|
||||
|
||||
try:
|
||||
for path_builder in paths:
|
||||
current_path = path_builder(partial_storage.uuid)
|
||||
if storage.exists([existing_location], path_to_copy):
|
||||
with CloseForLongOperation(app.config):
|
||||
storage.copy_between(current_path, existing_location, location)
|
||||
storage.copy_between(path_to_copy, existing_location, location)
|
||||
copied = True
|
||||
except:
|
||||
logger.exception('Exception when copying path %s of image storage %s to location %s',
|
||||
current_path, partial_storage.uuid, location)
|
||||
path_to_copy, partial_storage.uuid, location)
|
||||
return False
|
||||
|
||||
# Create the storage location record for the storage now that the copies have
|
||||
# completed.
|
||||
if copied:
|
||||
model.storage.add_storage_placement(partial_storage, location)
|
||||
logger.debug('Finished copy of image storage %s to location %s',
|
||||
partial_storage.uuid, location)
|
||||
|
|
Reference in a new issue