Convert storage replication tests to pytest format
This commit is contained in:
parent
544c8f0adf
commit
0e2bff972f
1 changed files with 61 additions and 58 deletions
|
@ -1,66 +1,69 @@
|
|||
import unittest
|
||||
import hashlib
|
||||
|
||||
import pytest
|
||||
|
||||
from app import storage
|
||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||
from data import model, database
|
||||
from workers.storagereplication import StorageReplicationWorker
|
||||
from storage.basestorage import StoragePaths
|
||||
from workers.storagereplication import StorageReplicationWorker
|
||||
|
||||
class TestStorageReplication(unittest.TestCase):
|
||||
def setUp(self):
|
||||
setup_database_for_testing(self)
|
||||
|
||||
self.worker = StorageReplicationWorker(None)
|
||||
self.paths = StoragePaths()
|
||||
|
||||
# Add both regions for a user.
|
||||
self.user = model.user.get_user('devtable')
|
||||
database.UserRegion.create(user=self.user,
|
||||
location=database.ImageStorageLocation.get(name='local_us'))
|
||||
database.UserRegion.create(user=self.user,
|
||||
location=database.ImageStorageLocation.get(name='local_eu'))
|
||||
|
||||
def tearDown(self):
|
||||
finished_database_for_testing(self)
|
||||
|
||||
def test_storage_replication_v1(self):
|
||||
# Add a storage entry with a V1 path.
|
||||
v1_storage = model.storage.create_v1_storage('local_us')
|
||||
content_path = self.paths.v1_image_layer_path(v1_storage.uuid)
|
||||
storage.put_content(['local_us'], content_path, 'some content')
|
||||
|
||||
# Call replicate on it and verify it replicates.
|
||||
result = self.worker.replicate_storage(self.user, v1_storage.uuid)
|
||||
self.assertTrue(result)
|
||||
|
||||
# Ensure that the data was replicated to the other "region".
|
||||
self.assertEquals('some content', storage.get_content(['local_eu'], content_path))
|
||||
|
||||
locations = model.storage.get_storage_locations(v1_storage.uuid)
|
||||
self.assertEquals(2, len(locations))
|
||||
|
||||
def test_storage_replication_cas(self):
|
||||
# Add a storage entry with a CAS path.
|
||||
content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest()
|
||||
cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum)
|
||||
|
||||
location = database.ImageStorageLocation.get(name='local_us')
|
||||
database.ImageStoragePlacement.create(storage=cas_storage, location=location)
|
||||
|
||||
content_path = self.paths.blob_path(cas_storage.content_checksum)
|
||||
storage.put_content(['local_us'], content_path, 'some content')
|
||||
|
||||
# Call replicate on it and verify it replicates.
|
||||
result = self.worker.replicate_storage(self.user, cas_storage.uuid)
|
||||
self.assertTrue(result)
|
||||
|
||||
# Ensure that the data was replicated to the other "region".
|
||||
self.assertEquals('some content', storage.get_content(['local_eu'], content_path))
|
||||
|
||||
locations = model.storage.get_storage_locations(cas_storage.uuid)
|
||||
self.assertEquals(2, len(locations))
|
||||
from test.fixtures import *
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@pytest.fixture()
|
||||
def storage_user(app):
|
||||
user = model.user.get_user('devtable')
|
||||
database.UserRegion.create(user=user,
|
||||
location=database.ImageStorageLocation.get(name='local_us'))
|
||||
database.UserRegion.create(user=user,
|
||||
location=database.ImageStorageLocation.get(name='local_eu'))
|
||||
return user
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def storage_paths():
|
||||
return StoragePaths()
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def replication_worker():
|
||||
return StorageReplicationWorker(None)
|
||||
|
||||
|
||||
def test_storage_replication_v1(storage_user, storage_paths, replication_worker, app):
|
||||
# Add a storage entry with a V1 path.
|
||||
v1_storage = model.storage.create_v1_storage('local_us')
|
||||
content_path = storage_paths.v1_image_layer_path(v1_storage.uuid)
|
||||
storage.put_content(['local_us'], content_path, 'some content')
|
||||
|
||||
# Call replicate on it and verify it replicates.
|
||||
result = replication_worker.replicate_storage(storage_user, v1_storage.uuid)
|
||||
assert result
|
||||
|
||||
# Ensure that the data was replicated to the other "region".
|
||||
assert storage.get_content(['local_eu'], content_path) == 'some content'
|
||||
|
||||
locations = model.storage.get_storage_locations(v1_storage.uuid)
|
||||
assert len(locations) == 2
|
||||
|
||||
|
||||
def test_storage_replication_cas(storage_user, storage_paths, replication_worker, app):
|
||||
# Add a storage entry with a CAS path.
|
||||
content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest()
|
||||
cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum)
|
||||
|
||||
location = database.ImageStorageLocation.get(name='local_us')
|
||||
database.ImageStoragePlacement.create(storage=cas_storage, location=location)
|
||||
|
||||
content_path = storage_paths.blob_path(cas_storage.content_checksum)
|
||||
storage.put_content(['local_us'], content_path, 'some content')
|
||||
|
||||
# Call replicate on it and verify it replicates.
|
||||
result = replication_worker.replicate_storage(storage_user, cas_storage.uuid)
|
||||
assert result
|
||||
|
||||
# Ensure that the data was replicated to the other "region".
|
||||
assert storage.get_content(['local_eu'], content_path) == 'some content'
|
||||
|
||||
locations = model.storage.get_storage_locations(cas_storage.uuid)
|
||||
assert len(locations) == 2
|
||||
|
|
Reference in a new issue