Merge pull request #3281 from quay/georep-fix

Fixes for georeplication
This commit is contained in:
Joseph Schorr 2018-11-01 17:58:32 -04:00 committed by GitHub
commit 6c29baf3fc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 324 additions and 74 deletions

View file

@ -109,7 +109,15 @@ def _write_derived_image_to_storage(verb, derived_image, queue_file):
# Re-Initialize the storage engine because some may not respond well to forking (e.g. S3) # Re-Initialize the storage engine because some may not respond well to forking (e.g. S3)
store = Storage(app, metric_queue, config_provider=config_provider, ip_resolver=ip_resolver) store = Storage(app, metric_queue, config_provider=config_provider, ip_resolver=ip_resolver)
store.stream_write(derived_image.blob.placements, derived_image.blob.storage_path, queue_file)
try:
store.stream_write(derived_image.blob.placements, derived_image.blob.storage_path, queue_file)
except IOError as ex:
logger.debug('Exception when writing %s derived image %s: %s', verb, derived_image, ex)
with database.UseThenDisconnect(app.config):
registry_model.delete_derived_image(derived_image)
queue_file.close() queue_file.close()

View file

@ -84,7 +84,7 @@ class BaseStorage(StoragePaths):
def stream_write_to_fp(self, in_fp, out_fp, num_bytes=READ_UNTIL_END): def stream_write_to_fp(self, in_fp, out_fp, num_bytes=READ_UNTIL_END):
""" Copy the specified number of bytes from the input file stream to the output stream. If """ Copy the specified number of bytes from the input file stream to the output stream. If
num_bytes < 0 copy until the stream ends. num_bytes < 0 copy until the stream ends. Returns the number of bytes copied.
""" """
bytes_copied = 0 bytes_copied = 0
while bytes_copied < num_bytes or num_bytes == READ_UNTIL_END: while bytes_copied < num_bytes or num_bytes == READ_UNTIL_END:

View file

@ -182,12 +182,28 @@ class _CloudStorage(BaseStorageV2):
**self._upload_params) **self._upload_params)
def stream_write(self, path, fp, content_type=None, content_encoding=None): def stream_write(self, path, fp, content_type=None, content_encoding=None):
self._stream_write_internal(path, fp, content_type, content_encoding) """ Writes the data found in the file-like stream to the given path. Raises an IOError
if the write fails.
"""
_, write_error = self._stream_write_internal(path, fp, content_type, content_encoding)
if write_error is not None:
logger.error('Error when trying to stream_write path `%s`: %s', path, write_error)
raise IOError('Exception when trying to stream_write path')
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None, def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
cancel_on_error=True, size=filelike.READ_UNTIL_END): cancel_on_error=True, size=filelike.READ_UNTIL_END):
""" Writes the data found in the file-like stream to the given path, with optional limit
on size. Note that this method returns a *tuple* of (bytes_written, write_error) and should
*not* raise an exception (such as IOError) if a problem uploading occurred. ALWAYS check
the returned tuple on calls to this method.
"""
write_error = None write_error = None
mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
try:
mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
except S3ResponseError as e:
logger.exception('Exception when initiating multipart upload')
return 0, e
# We are going to reuse this but be VERY careful to only read the number of bytes written to it # We are going to reuse this but be VERY careful to only read the number of bytes written to it
buf = StringIO.StringIO() buf = StringIO.StringIO()
@ -211,7 +227,7 @@ class _CloudStorage(BaseStorageV2):
mp.upload_part_from_file(buf, num_part, size=bytes_staged) mp.upload_part_from_file(buf, num_part, size=bytes_staged)
total_bytes_written += bytes_staged total_bytes_written += bytes_staged
num_part += 1 num_part += 1
except IOError as e: except (S3ResponseError, IOError) as e:
logger.warn('Error when writing to stream in stream_write_internal at path %s: %s', path, e) logger.warn('Error when writing to stream in stream_write_internal at path %s: %s', path, e)
write_error = e write_error = e
@ -219,7 +235,11 @@ class _CloudStorage(BaseStorageV2):
self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['failure']) self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['failure'])
if cancel_on_error: if cancel_on_error:
mp.cancel_upload() try:
mp.cancel_upload()
except (S3ResponseError, IOError):
logger.exception('Could not cancel upload')
return 0, write_error return 0, write_error
else: else:
break break
@ -263,6 +283,7 @@ class _CloudStorage(BaseStorageV2):
return k.etag[1:-1][:7] return k.etag[1:-1][:7]
def copy_to(self, destination, path): def copy_to(self, destination, path):
""" Copies the given path from this storage to the destination storage. """
self._initialize_cloud_conn() self._initialize_cloud_conn()
# First try to copy directly via boto, but only if the storages are the # First try to copy directly via boto, but only if the storages are the
@ -527,6 +548,11 @@ class GoogleCloudStorage(_CloudStorage):
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None, def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
cancel_on_error=True, size=filelike.READ_UNTIL_END): cancel_on_error=True, size=filelike.READ_UNTIL_END):
""" Writes the data found in the file-like stream to the given path, with optional limit
on size. Note that this method returns a *tuple* of (bytes_written, write_error) and should
*not* raise an exception (such as IOError) if a problem uploading occurred. ALWAYS check
the returned tuple on calls to this method.
"""
# Minimum size of upload part size on S3 is 5MB # Minimum size of upload part size on S3 is 5MB
self._initialize_cloud_conn() self._initialize_cloud_conn()
path = self._init_path(path) path = self._init_path(path)

View file

@ -6,11 +6,13 @@ from uuid import uuid4
from storage.basestorage import BaseStorageV2 from storage.basestorage import BaseStorageV2
_FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO) _GLOBAL_FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO)
class FakeStorage(BaseStorageV2): class FakeStorage(BaseStorageV2):
def __init__(self, context): def __init__(self, context):
super(FakeStorage, self).__init__() super(FakeStorage, self).__init__()
self._fake_storage_map = (defaultdict(StringIO.StringIO)
if context == 'local' else _GLOBAL_FAKE_STORAGE_MAP)
def _init_path(self, path=None, create=False): def _init_path(self, path=None, create=False):
return path return path
@ -25,18 +27,18 @@ class FakeStorage(BaseStorageV2):
return None return None
def get_content(self, path): def get_content(self, path):
if not path in _FAKE_STORAGE_MAP: if not path in self._fake_storage_map:
raise IOError('Fake file %s not found. Exist: %s' % (path, _FAKE_STORAGE_MAP.keys())) raise IOError('Fake file %s not found. Exist: %s' % (path, self._fake_storage_map.keys()))
_FAKE_STORAGE_MAP.get(path).seek(0) self._fake_storage_map.get(path).seek(0)
return _FAKE_STORAGE_MAP.get(path).read() return self._fake_storage_map.get(path).read()
def put_content(self, path, content): def put_content(self, path, content):
_FAKE_STORAGE_MAP.pop(path, None) self._fake_storage_map.pop(path, None)
_FAKE_STORAGE_MAP[path].write(content) self._fake_storage_map[path].write(content)
def stream_read(self, path): def stream_read(self, path):
io_obj = _FAKE_STORAGE_MAP[path] io_obj = self._fake_storage_map[path]
io_obj.seek(0) io_obj.seek(0)
while True: while True:
buf = io_obj.read(self.buffer_size) buf = io_obj.read(self.buffer_size)
@ -48,40 +50,49 @@ class FakeStorage(BaseStorageV2):
return StringIO.StringIO(self.get_content(path)) return StringIO.StringIO(self.get_content(path))
def stream_write(self, path, fp, content_type=None, content_encoding=None): def stream_write(self, path, fp, content_type=None, content_encoding=None):
out_fp = _FAKE_STORAGE_MAP[path] out_fp = self._fake_storage_map[path]
out_fp.seek(0) out_fp.seek(0)
self.stream_write_to_fp(fp, out_fp) self.stream_write_to_fp(fp, out_fp)
def remove(self, path): def remove(self, path):
_FAKE_STORAGE_MAP.pop(path, None) self._fake_storage_map.pop(path, None)
def exists(self, path): def exists(self, path):
if _FAKE_STORAGE_MAP.get('all_files_exist', None): if self._fake_storage_map.get('all_files_exist', None):
return True return True
return path in _FAKE_STORAGE_MAP return path in self._fake_storage_map
def get_checksum(self, path): def get_checksum(self, path):
return hashlib.sha256(_FAKE_STORAGE_MAP[path].read()).hexdigest()[:7] return hashlib.sha256(self._fake_storage_map[path].read()).hexdigest()[:7]
def initiate_chunked_upload(self): def initiate_chunked_upload(self):
new_uuid = str(uuid4()) new_uuid = str(uuid4())
_FAKE_STORAGE_MAP[new_uuid].seek(0) self._fake_storage_map[new_uuid].seek(0)
return new_uuid, {} return new_uuid, {}
def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None): def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None):
upload_storage = _FAKE_STORAGE_MAP[uuid] upload_storage = self._fake_storage_map[uuid]
try: try:
return self.stream_write_to_fp(in_fp, upload_storage, length), {}, None return self.stream_write_to_fp(in_fp, upload_storage, length), {}, None
except IOError as ex: except IOError as ex:
return 0, {}, ex return 0, {}, ex
def complete_chunked_upload(self, uuid, final_path, _): def complete_chunked_upload(self, uuid, final_path, _):
_FAKE_STORAGE_MAP[final_path] = _FAKE_STORAGE_MAP[uuid] self._fake_storage_map[final_path] = self._fake_storage_map[uuid]
_FAKE_STORAGE_MAP.pop(uuid, None) self._fake_storage_map.pop(uuid, None)
def cancel_chunked_upload(self, uuid, _): def cancel_chunked_upload(self, uuid, _):
_FAKE_STORAGE_MAP.pop(uuid, None) self._fake_storage_map.pop(uuid, None)
def copy_to(self, destination, path): def copy_to(self, destination, path):
if self.exists('break_copying'):
raise IOError('Broken!')
if self.exists('fake_copying'):
return
if self.exists('except_copying'):
raise Exception("I'm an exception!")
content = self.get_content(path) content = self.get_content(path)
destination.put_content(path, content) destination.put_content(path, content)

View file

@ -7,6 +7,8 @@ import pytest
import moto import moto
import boto import boto
from moto import mock_s3
from storage import S3Storage, StorageContext from storage import S3Storage, StorageContext
from storage.cloud import _CloudStorage, _PartUploadMetadata from storage.cloud import _CloudStorage, _PartUploadMetadata
from storage.cloud import _CHUNKS_KEY from storage.cloud import _CHUNKS_KEY
@ -56,28 +58,29 @@ def test_basicop(storage_engine):
assert not storage_engine.exists(_TEST_PATH) assert not storage_engine.exists(_TEST_PATH)
def test_copy_samecreds(storage_engine): @pytest.mark.parametrize('bucket, username, password', [
pytest.param(_TEST_BUCKET, _TEST_USER, _TEST_PASSWORD, id='same credentials'),
pytest.param('another_bucket', 'blech', 'password', id='different credentials'),
])
def test_copy(bucket, username, password, storage_engine):
# Copy the content to another engine. # Copy the content to another engine.
another_engine = S3Storage(_TEST_CONTEXT, 'another/path', _TEST_BUCKET, _TEST_USER, another_engine = S3Storage(_TEST_CONTEXT, 'another/path', _TEST_BUCKET, _TEST_USER,
_TEST_PASSWORD) _TEST_PASSWORD)
storage_engine.copy_to(another_engine, _TEST_PATH)
# Verify it can be retrieved.
assert another_engine.get_content(_TEST_PATH) == _TEST_CONTENT
def test_copy_differentcreds(storage_engine):
# Copy the content to another engine.
another_engine = S3Storage(_TEST_CONTEXT, 'another/path', 'another_bucket', 'blech',
'password')
boto.connect_s3().create_bucket('another_bucket') boto.connect_s3().create_bucket('another_bucket')
storage_engine.copy_to(another_engine, _TEST_PATH) storage_engine.copy_to(another_engine, _TEST_PATH)
# Verify it can be retrieved. # Verify it can be retrieved.
assert another_engine.get_content(_TEST_PATH) == _TEST_CONTENT assert another_engine.get_content(_TEST_PATH) == _TEST_CONTENT
def test_copy_with_error(storage_engine):
another_engine = S3Storage(_TEST_CONTEXT, 'another/path', 'anotherbucket', 'foo',
'bar')
with pytest.raises(IOError):
storage_engine.copy_to(another_engine, _TEST_PATH)
def test_stream_read(storage_engine): def test_stream_read(storage_engine):
# Read the streaming content. # Read the streaming content.
data = ''.join(storage_engine.stream_read(_TEST_PATH)) data = ''.join(storage_engine.stream_read(_TEST_PATH))
@ -95,6 +98,18 @@ def test_stream_write(storage_engine):
assert storage_engine.get_content(_TEST_PATH) == new_data assert storage_engine.get_content(_TEST_PATH) == new_data
def test_stream_write_error():
with mock_s3():
# Create an engine but not the bucket.
engine = S3Storage(_TEST_CONTEXT, 'some/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD)
# Attempt to write to the uncreated bucket, which should raise an error.
with pytest.raises(IOError):
engine.stream_write(_TEST_PATH, StringIO('hello world'), content_type='Cool/Type')
assert not engine.exists(_TEST_PATH)
@pytest.mark.parametrize('chunk_count', [ @pytest.mark.parametrize('chunk_count', [
0, 0,
1, 1,
@ -107,7 +122,7 @@ def test_stream_write(storage_engine):
def test_chunk_upload(storage_engine, chunk_count, force_client_side): def test_chunk_upload(storage_engine, chunk_count, force_client_side):
if chunk_count == 0 and force_client_side: if chunk_count == 0 and force_client_side:
return return
upload_id, metadata = storage_engine.initiate_chunked_upload() upload_id, metadata = storage_engine.initiate_chunked_upload()
final_data = '' final_data = ''

75
util/verifyplacements.py Normal file
View file

@ -0,0 +1,75 @@
"""
Usage (from the root in the container): venv/bin/python -m util.verifyplacements
This script verifies that if a blob is listed as being in a specific storage location, the file
actually exists there. If the file is not found in that storage location, the placement entry in
the database is removed.
"""
import logging
from peewee import fn
from app import storage
from data import model
from data.database import ImageStorage, ImageStoragePlacement, ImageStorageLocation
from util.migrate.allocator import yield_random_entries
logger = logging.getLogger(__name__)
LOCATION_MAP = {}
def _get_location_row(location):
if location in LOCATION_MAP:
return LOCATION_MAP[location]
location_row = ImageStorageLocation.get(name=location)
LOCATION_MAP[location] = location_row
return location_row
def verify_placements():
encountered = set()
iterator = yield_random_entries(
lambda: ImageStorage.select().where(ImageStorage.uploading == False),
ImageStorage.id,
1000,
ImageStorage.select(fn.Max(ImageStorage.id)).scalar(),
1,
)
for storage_row, abt, _ in iterator:
if storage_row.id in encountered:
continue
encountered.add(storage_row.id)
logger.info('Checking placements for storage `%s`', storage_row.uuid)
try:
with_locations = model.storage.get_storage_by_uuid(storage_row.uuid)
except model.InvalidImageException:
logger.exception('Could not find storage `%s`', storage_row.uuid)
continue
storage_path = model.storage.get_layer_path(storage_row)
locations_to_check = set(with_locations.locations)
if locations_to_check:
logger.info('Checking locations `%s` for storage `%s`', locations_to_check, storage_row.uuid)
for location in locations_to_check:
logger.info('Checking location `%s` for storage `%s`', location, storage_row.uuid)
if not storage.exists([location], storage_path):
location_row = _get_location_row(location)
logger.info('Location `%s` is missing for storage `%s`; removing', location,
storage_row.uuid)
(ImageStoragePlacement
.delete()
.where(ImageStoragePlacement.storage == storage_row,
ImageStoragePlacement.location == location_row)
.execute())
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
verify_placements()

View file

@ -3,10 +3,10 @@ import time
import features import features
from app import app, storage, image_replication_queue from app import app, storage as app_storage, image_replication_queue
from data.database import CloseForLongOperation from data.database import CloseForLongOperation
from data import model from data import model
from workers.queueworker import QueueWorker, WorkerUnhealthyException from workers.queueworker import QueueWorker, WorkerUnhealthyException, JobException
from util.log import logfile_path from util.log import logfile_path
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -28,17 +28,28 @@ class StorageReplicationWorker(QueueWorker):
namespace_id, storage_uuid) namespace_id, storage_uuid)
return return
succeeded = self.replicate_storage(namespace, storage_uuid) self.replicate_storage(namespace, storage_uuid, app_storage)
logger.debug('Replication finished of image storage %s under namespace %s: %s',
storage_uuid, namespace_id, succeeded)
if not succeeded:
raise WorkerUnhealthyException()
def replicate_storage(self, namespace, storage_uuid): def _backoff_check_exists(self, location, path, storage, backoff_check=True):
for retry in range(0, 4):
if storage.exists([location], path):
return True
if not backoff_check:
return False
seconds = pow(2, retry) * 2
logger.debug('Cannot find path `%s` in location %s (try #%s). Sleeping for %s seconds',
path, location, retry, seconds)
time.sleep(seconds)
return False
def replicate_storage(self, namespace, storage_uuid, storage, backoff_check=True):
# Lookup the namespace and its associated regions. # Lookup the namespace and its associated regions.
if not namespace: if not namespace:
logger.debug('Unknown namespace when trying to replicate storage %s', storage_uuid) logger.debug('Unknown namespace when trying to replicate storage %s', storage_uuid)
return True return
locations = model.user.get_region_locations(namespace) locations = model.user.get_region_locations(namespace)
@ -47,7 +58,7 @@ class StorageReplicationWorker(QueueWorker):
partial_storage = model.storage.get_storage_by_uuid(storage_uuid) partial_storage = model.storage.get_storage_by_uuid(storage_uuid)
except model.InvalidImageException: except model.InvalidImageException:
logger.debug('Unknown storage: %s', storage_uuid) logger.debug('Unknown storage: %s', storage_uuid)
return True return
# Check to see if the image is at all the required locations. # Check to see if the image is at all the required locations.
locations_required = locations | set(storage.default_locations) locations_required = locations | set(storage.default_locations)
@ -59,26 +70,17 @@ class StorageReplicationWorker(QueueWorker):
if not locations_missing: if not locations_missing:
logger.debug('No missing locations for storage %s under namespace %s. Required: %s', logger.debug('No missing locations for storage %s under namespace %s. Required: %s',
storage_uuid, namespace.username, locations_required) storage_uuid, namespace.username, locations_required)
return True return
# For any missing storage locations, initiate a copy. # For any missing storage locations, initiate a copy.
existing_location = list(partial_storage.locations)[0] existing_location = list(partial_storage.locations)[0]
path_to_copy = model.storage.get_layer_path(partial_storage) path_to_copy = model.storage.get_layer_path(partial_storage)
# Lookup the existing location. If not found, progressively sleep a few times to handle the case # Lookup and ensure the existing location exists.
# of not fully consistent storage. if not self._backoff_check_exists(existing_location, path_to_copy, storage, backoff_check):
for retry in range(0, 3):
if storage.exists([existing_location], path_to_copy):
break
logger.debug('Cannot find image storage %s in existing location %s (try #%s)',
storage_uuid, existing_location, retry)
time.sleep(pow(2, retry) * 5)
if not storage.exists([existing_location], path_to_copy):
logger.warning('Cannot find image storage %s in existing location %s; stopping replication', logger.warning('Cannot find image storage %s in existing location %s; stopping replication',
storage_uuid, existing_location) storage_uuid, existing_location)
return False raise JobException()
# For each missing location, copy over the storage. # For each missing location, copy over the storage.
for location in locations_missing: for location in locations_missing:
@ -91,21 +93,32 @@ class StorageReplicationWorker(QueueWorker):
with CloseForLongOperation(app.config): with CloseForLongOperation(app.config):
storage.copy_between(path_to_copy, existing_location, location) storage.copy_between(path_to_copy, existing_location, location)
copied = True copied = True
except: except IOError:
logger.exception('Exception when copying path %s of image storage %s to location %s', logger.exception('Failed to copy path `%s` of image storage %s to location %s',
path_to_copy, partial_storage.uuid, location) path_to_copy, partial_storage.uuid, location)
return False raise JobException()
except:
logger.exception('Unknown exception when copying path %s of image storage %s to loc %s',
path_to_copy, partial_storage.uuid, location)
raise WorkerUnhealthyException()
# Create the storage location record for the storage now that the copies have
# completed.
if copied: if copied:
# Verify the data was copied to the target storage, to ensure that there are no cases
# where we write the placement without knowing the data is present.
if not self._backoff_check_exists(location, path_to_copy, storage, backoff_check):
logger.warning('Failed to find path `%s` in location `%s` after copy', path_to_copy,
location)
raise JobException()
# Create the storage location record for the storage now that the copy has
# completed.
model.storage.add_storage_placement(partial_storage, location) model.storage.add_storage_placement(partial_storage, location)
logger.debug('Finished copy of image storage %s to location %s from %s', logger.debug('Finished copy of image storage %s to location %s from %s',
partial_storage.uuid, location, existing_location) partial_storage.uuid, location, existing_location)
logger.debug('Completed replication of image storage %s to locations %s from %s', logger.debug('Completed replication of image storage %s to locations %s from %s',
partial_storage.uuid, locations_missing, existing_location) partial_storage.uuid, locations_missing, existing_location)
return True
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -2,10 +2,12 @@ import hashlib
import pytest import pytest
from app import storage
from data import model, database from data import model, database
from storage.basestorage import StoragePaths from storage.basestorage import StoragePaths
from workers.storagereplication import StorageReplicationWorker from storage.fakestorage import FakeStorage
from storage.distributedstorage import DistributedStorage
from workers.storagereplication import (StorageReplicationWorker, JobException,
WorkerUnhealthyException)
from test.fixtures import * from test.fixtures import *
@ -30,15 +32,20 @@ def replication_worker():
return StorageReplicationWorker(None) return StorageReplicationWorker(None)
def test_storage_replication_v1(storage_user, storage_paths, replication_worker, app): @pytest.fixture()
def storage():
return DistributedStorage({'local_us': FakeStorage('local'), 'local_eu': FakeStorage('local')},
['local_us'])
def test_storage_replication_v1(storage_user, storage_paths, replication_worker, storage, app):
# Add a storage entry with a V1 path. # Add a storage entry with a V1 path.
v1_storage = model.storage.create_v1_storage('local_us') v1_storage = model.storage.create_v1_storage('local_us')
content_path = storage_paths.v1_image_layer_path(v1_storage.uuid) content_path = storage_paths.v1_image_layer_path(v1_storage.uuid)
storage.put_content(['local_us'], content_path, 'some content') storage.put_content(['local_us'], content_path, 'some content')
# Call replicate on it and verify it replicates. # Call replicate on it and verify it replicates.
result = replication_worker.replicate_storage(storage_user, v1_storage.uuid) replication_worker.replicate_storage(storage_user, v1_storage.uuid, storage)
assert result
# Ensure that the data was replicated to the other "region". # Ensure that the data was replicated to the other "region".
assert storage.get_content(['local_eu'], content_path) == 'some content' assert storage.get_content(['local_eu'], content_path) == 'some content'
@ -47,7 +54,7 @@ def test_storage_replication_v1(storage_user, storage_paths, replication_worker,
assert len(locations) == 2 assert len(locations) == 2
def test_storage_replication_cas(storage_user, storage_paths, replication_worker, app): def test_storage_replication_cas(storage_user, storage_paths, replication_worker, storage, app):
# Add a storage entry with a CAS path. # Add a storage entry with a CAS path.
content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest() content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest()
cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum) cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum)
@ -59,11 +66,106 @@ def test_storage_replication_cas(storage_user, storage_paths, replication_worker
storage.put_content(['local_us'], content_path, 'some content') storage.put_content(['local_us'], content_path, 'some content')
# Call replicate on it and verify it replicates. # Call replicate on it and verify it replicates.
result = replication_worker.replicate_storage(storage_user, cas_storage.uuid) replication_worker.replicate_storage(storage_user, cas_storage.uuid, storage)
assert result
# Ensure that the data was replicated to the other "region". # Ensure that the data was replicated to the other "region".
assert storage.get_content(['local_eu'], content_path) == 'some content' assert storage.get_content(['local_eu'], content_path) == 'some content'
locations = model.storage.get_storage_locations(cas_storage.uuid) locations = model.storage.get_storage_locations(cas_storage.uuid)
assert len(locations) == 2 assert len(locations) == 2
def test_storage_replication_missing_base(storage_user, storage_paths, replication_worker, storage,
app):
# Add a storage entry with a CAS path.
content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest()
cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum)
location = database.ImageStorageLocation.get(name='local_us')
database.ImageStoragePlacement.create(storage=cas_storage, location=location)
# Attempt to replicate storage. This should fail because the layer is missing from the base
# storage.
with pytest.raises(JobException):
replication_worker.replicate_storage(storage_user, cas_storage.uuid, storage,
backoff_check=False)
# Ensure the storage location count remains 1. This is technically inaccurate, but that's okay
# as we still require at least one location per storage.
locations = model.storage.get_storage_locations(cas_storage.uuid)
assert len(locations) == 1
def test_storage_replication_copy_error(storage_user, storage_paths, replication_worker, storage,
app):
# Add a storage entry with a CAS path.
content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest()
cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum)
location = database.ImageStorageLocation.get(name='local_us')
database.ImageStoragePlacement.create(storage=cas_storage, location=location)
content_path = storage_paths.blob_path(cas_storage.content_checksum)
storage.put_content(['local_us'], content_path, 'some content')
# Tell storage to break copying.
storage.put_content(['local_us'], 'break_copying', 'true')
# Attempt to replicate storage. This should fail because the write fails.
with pytest.raises(JobException):
replication_worker.replicate_storage(storage_user, cas_storage.uuid, storage,
backoff_check=False)
# Ensure the storage location count remains 1.
locations = model.storage.get_storage_locations(cas_storage.uuid)
assert len(locations) == 1
def test_storage_replication_copy_didnot_copy(storage_user, storage_paths, replication_worker,
storage, app):
# Add a storage entry with a CAS path.
content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest()
cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum)
location = database.ImageStorageLocation.get(name='local_us')
database.ImageStoragePlacement.create(storage=cas_storage, location=location)
content_path = storage_paths.blob_path(cas_storage.content_checksum)
storage.put_content(['local_us'], content_path, 'some content')
# Tell storage to fake copying (i.e. not actually copy the data).
storage.put_content(['local_us'], 'fake_copying', 'true')
# Attempt to replicate storage. This should fail because the copy doesn't actually do the copy.
with pytest.raises(JobException):
replication_worker.replicate_storage(storage_user, cas_storage.uuid, storage,
backoff_check=False)
# Ensure the storage location count remains 1.
locations = model.storage.get_storage_locations(cas_storage.uuid)
assert len(locations) == 1
def test_storage_replication_copy_unhandled_exception(storage_user, storage_paths,
replication_worker, storage, app):
# Add a storage entry with a CAS path.
content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest()
cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum)
location = database.ImageStorageLocation.get(name='local_us')
database.ImageStoragePlacement.create(storage=cas_storage, location=location)
content_path = storage_paths.blob_path(cas_storage.content_checksum)
storage.put_content(['local_us'], content_path, 'some content')
# Tell storage to raise an exception when copying.
storage.put_content(['local_us'], 'except_copying', 'true')
# Attempt to replicate storage. This should fail because the copy raises an unhandled exception.
with pytest.raises(WorkerUnhealthyException):
replication_worker.replicate_storage(storage_user, cas_storage.uuid, storage,
backoff_check=False)
# Ensure the storage location count remains 1.
locations = model.storage.get_storage_locations(cas_storage.uuid)
assert len(locations) == 1