Harden the storage replication worker to failures by explicitly catching certain errors, having better backoff and explicitly ensuring a layer has been replicated to a region before adding the placement into the database

Also adds tests for the various failure cases
This commit is contained in:
Joseph Schorr 2018-10-31 14:13:25 -04:00
parent 088a301754
commit 6a9634dffb
3 changed files with 181 additions and 55 deletions

View file

@ -6,11 +6,13 @@ from uuid import uuid4
from storage.basestorage import BaseStorageV2 from storage.basestorage import BaseStorageV2
_FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO) _GLOBAL_FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO)
class FakeStorage(BaseStorageV2): class FakeStorage(BaseStorageV2):
def __init__(self, context): def __init__(self, context):
super(FakeStorage, self).__init__() super(FakeStorage, self).__init__()
self._fake_storage_map = (defaultdict(StringIO.StringIO)
if context == 'local' else _GLOBAL_FAKE_STORAGE_MAP)
def _init_path(self, path=None, create=False): def _init_path(self, path=None, create=False):
return path return path
@ -25,18 +27,18 @@ class FakeStorage(BaseStorageV2):
return None return None
def get_content(self, path): def get_content(self, path):
if not path in _FAKE_STORAGE_MAP: if not path in self._fake_storage_map:
raise IOError('Fake file %s not found. Exist: %s' % (path, _FAKE_STORAGE_MAP.keys())) raise IOError('Fake file %s not found. Exist: %s' % (path, self._fake_storage_map.keys()))
_FAKE_STORAGE_MAP.get(path).seek(0) self._fake_storage_map.get(path).seek(0)
return _FAKE_STORAGE_MAP.get(path).read() return self._fake_storage_map.get(path).read()
def put_content(self, path, content): def put_content(self, path, content):
_FAKE_STORAGE_MAP.pop(path, None) self._fake_storage_map.pop(path, None)
_FAKE_STORAGE_MAP[path].write(content) self._fake_storage_map[path].write(content)
def stream_read(self, path): def stream_read(self, path):
io_obj = _FAKE_STORAGE_MAP[path] io_obj = self._fake_storage_map[path]
io_obj.seek(0) io_obj.seek(0)
while True: while True:
buf = io_obj.read(self.buffer_size) buf = io_obj.read(self.buffer_size)
@ -48,40 +50,49 @@ class FakeStorage(BaseStorageV2):
return StringIO.StringIO(self.get_content(path)) return StringIO.StringIO(self.get_content(path))
def stream_write(self, path, fp, content_type=None, content_encoding=None): def stream_write(self, path, fp, content_type=None, content_encoding=None):
out_fp = _FAKE_STORAGE_MAP[path] out_fp = self._fake_storage_map[path]
out_fp.seek(0) out_fp.seek(0)
self.stream_write_to_fp(fp, out_fp) self.stream_write_to_fp(fp, out_fp)
def remove(self, path): def remove(self, path):
_FAKE_STORAGE_MAP.pop(path, None) self._fake_storage_map.pop(path, None)
def exists(self, path): def exists(self, path):
if _FAKE_STORAGE_MAP.get('all_files_exist', None): if self._fake_storage_map.get('all_files_exist', None):
return True return True
return path in _FAKE_STORAGE_MAP return path in self._fake_storage_map
def get_checksum(self, path): def get_checksum(self, path):
return hashlib.sha256(_FAKE_STORAGE_MAP[path].read()).hexdigest()[:7] return hashlib.sha256(self._fake_storage_map[path].read()).hexdigest()[:7]
def initiate_chunked_upload(self): def initiate_chunked_upload(self):
new_uuid = str(uuid4()) new_uuid = str(uuid4())
_FAKE_STORAGE_MAP[new_uuid].seek(0) self._fake_storage_map[new_uuid].seek(0)
return new_uuid, {} return new_uuid, {}
def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None): def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None):
upload_storage = _FAKE_STORAGE_MAP[uuid] upload_storage = self._fake_storage_map[uuid]
try: try:
return self.stream_write_to_fp(in_fp, upload_storage, length), {}, None return self.stream_write_to_fp(in_fp, upload_storage, length), {}, None
except IOError as ex: except IOError as ex:
return 0, {}, ex return 0, {}, ex
def complete_chunked_upload(self, uuid, final_path, _): def complete_chunked_upload(self, uuid, final_path, _):
_FAKE_STORAGE_MAP[final_path] = _FAKE_STORAGE_MAP[uuid] self._fake_storage_map[final_path] = self._fake_storage_map[uuid]
_FAKE_STORAGE_MAP.pop(uuid, None) self._fake_storage_map.pop(uuid, None)
def cancel_chunked_upload(self, uuid, _): def cancel_chunked_upload(self, uuid, _):
_FAKE_STORAGE_MAP.pop(uuid, None) self._fake_storage_map.pop(uuid, None)
def copy_to(self, destination, path): def copy_to(self, destination, path):
if self.exists('break_copying'):
raise IOError('Broken!')
if self.exists('fake_copying'):
return
if self.exists('except_copying'):
raise Exception("I'm an exception!")
content = self.get_content(path) content = self.get_content(path)
destination.put_content(path, content) destination.put_content(path, content)

View file

@ -3,10 +3,10 @@ import time
import features import features
from app import app, storage, image_replication_queue from app import app, storage as app_storage, image_replication_queue
from data.database import CloseForLongOperation from data.database import CloseForLongOperation
from data import model from data import model
from workers.queueworker import QueueWorker, WorkerUnhealthyException from workers.queueworker import QueueWorker, WorkerUnhealthyException, JobException
from util.log import logfile_path from util.log import logfile_path
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -28,17 +28,28 @@ class StorageReplicationWorker(QueueWorker):
namespace_id, storage_uuid) namespace_id, storage_uuid)
return return
succeeded = self.replicate_storage(namespace, storage_uuid) self.replicate_storage(namespace, storage_uuid, app_storage)
logger.debug('Replication finished of image storage %s under namespace %s: %s',
storage_uuid, namespace_id, succeeded)
if not succeeded:
raise WorkerUnhealthyException()
def replicate_storage(self, namespace, storage_uuid): def _backoff_check_exists(self, location, path, storage, backoff_check=True):
for retry in range(0, 4):
if storage.exists([location], path):
return True
if not backoff_check:
return False
seconds = pow(2, retry) * 2
logger.debug('Cannot find path `%s` in location %s (try #%s). Sleeping for %s seconds',
path, location, retry, seconds)
time.sleep(seconds)
return False
def replicate_storage(self, namespace, storage_uuid, storage, backoff_check=True):
# Lookup the namespace and its associated regions. # Lookup the namespace and its associated regions.
if not namespace: if not namespace:
logger.debug('Unknown namespace when trying to replicate storage %s', storage_uuid) logger.debug('Unknown namespace when trying to replicate storage %s', storage_uuid)
return True return
locations = model.user.get_region_locations(namespace) locations = model.user.get_region_locations(namespace)
@ -47,7 +58,7 @@ class StorageReplicationWorker(QueueWorker):
partial_storage = model.storage.get_storage_by_uuid(storage_uuid) partial_storage = model.storage.get_storage_by_uuid(storage_uuid)
except model.InvalidImageException: except model.InvalidImageException:
logger.debug('Unknown storage: %s', storage_uuid) logger.debug('Unknown storage: %s', storage_uuid)
return True return
# Check to see if the image is at all the required locations. # Check to see if the image is at all the required locations.
locations_required = locations | set(storage.default_locations) locations_required = locations | set(storage.default_locations)
@ -59,26 +70,17 @@ class StorageReplicationWorker(QueueWorker):
if not locations_missing: if not locations_missing:
logger.debug('No missing locations for storage %s under namespace %s. Required: %s', logger.debug('No missing locations for storage %s under namespace %s. Required: %s',
storage_uuid, namespace.username, locations_required) storage_uuid, namespace.username, locations_required)
return True return
# For any missing storage locations, initiate a copy. # For any missing storage locations, initiate a copy.
existing_location = list(partial_storage.locations)[0] existing_location = list(partial_storage.locations)[0]
path_to_copy = model.storage.get_layer_path(partial_storage) path_to_copy = model.storage.get_layer_path(partial_storage)
# Lookup the existing location. If not found, progressively sleep a few times to handle the case # Lookup and ensure the existing location exists.
# of not fully consistent storage. if not self._backoff_check_exists(existing_location, path_to_copy, storage, backoff_check):
for retry in range(0, 3):
if storage.exists([existing_location], path_to_copy):
break
logger.debug('Cannot find image storage %s in existing location %s (try #%s)',
storage_uuid, existing_location, retry)
time.sleep(pow(2, retry) * 5)
if not storage.exists([existing_location], path_to_copy):
logger.warning('Cannot find image storage %s in existing location %s; stopping replication', logger.warning('Cannot find image storage %s in existing location %s; stopping replication',
storage_uuid, existing_location) storage_uuid, existing_location)
return False raise JobException()
# For each missing location, copy over the storage. # For each missing location, copy over the storage.
for location in locations_missing: for location in locations_missing:
@ -91,21 +93,32 @@ class StorageReplicationWorker(QueueWorker):
with CloseForLongOperation(app.config): with CloseForLongOperation(app.config):
storage.copy_between(path_to_copy, existing_location, location) storage.copy_between(path_to_copy, existing_location, location)
copied = True copied = True
except: except IOError:
logger.exception('Exception when copying path %s of image storage %s to location %s', logger.exception('Failed to copy path `%s` of image storage %s to location %s',
path_to_copy, partial_storage.uuid, location) path_to_copy, partial_storage.uuid, location)
return False raise JobException()
except:
logger.exception('Unknown exception when copying path %s of image storage %s to loc %s',
path_to_copy, partial_storage.uuid, location)
raise WorkerUnhealthyException()
# Create the storage location record for the storage now that the copies have
# completed.
if copied: if copied:
# Verify the data was copied to the target storage, to ensure that there are no cases
# where we write the placement without knowing the data is present.
if not self._backoff_check_exists(location, path_to_copy, storage, backoff_check):
logger.warning('Failed to find path `%s` in location `%s` after copy', path_to_copy,
location)
raise JobException()
# Create the storage location record for the storage now that the copy has
# completed.
model.storage.add_storage_placement(partial_storage, location) model.storage.add_storage_placement(partial_storage, location)
logger.debug('Finished copy of image storage %s to location %s from %s', logger.debug('Finished copy of image storage %s to location %s from %s',
partial_storage.uuid, location, existing_location) partial_storage.uuid, location, existing_location)
logger.debug('Completed replication of image storage %s to locations %s from %s', logger.debug('Completed replication of image storage %s to locations %s from %s',
partial_storage.uuid, locations_missing, existing_location) partial_storage.uuid, locations_missing, existing_location)
return True
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -2,10 +2,12 @@ import hashlib
import pytest import pytest
from app import storage
from data import model, database from data import model, database
from storage.basestorage import StoragePaths from storage.basestorage import StoragePaths
from workers.storagereplication import StorageReplicationWorker from storage.fakestorage import FakeStorage
from storage.distributedstorage import DistributedStorage
from workers.storagereplication import (StorageReplicationWorker, JobException,
WorkerUnhealthyException)
from test.fixtures import * from test.fixtures import *
@ -30,15 +32,20 @@ def replication_worker():
return StorageReplicationWorker(None) return StorageReplicationWorker(None)
def test_storage_replication_v1(storage_user, storage_paths, replication_worker, app): @pytest.fixture()
def storage():
return DistributedStorage({'local_us': FakeStorage('local'), 'local_eu': FakeStorage('local')},
['local_us'])
def test_storage_replication_v1(storage_user, storage_paths, replication_worker, storage, app):
# Add a storage entry with a V1 path. # Add a storage entry with a V1 path.
v1_storage = model.storage.create_v1_storage('local_us') v1_storage = model.storage.create_v1_storage('local_us')
content_path = storage_paths.v1_image_layer_path(v1_storage.uuid) content_path = storage_paths.v1_image_layer_path(v1_storage.uuid)
storage.put_content(['local_us'], content_path, 'some content') storage.put_content(['local_us'], content_path, 'some content')
# Call replicate on it and verify it replicates. # Call replicate on it and verify it replicates.
result = replication_worker.replicate_storage(storage_user, v1_storage.uuid) replication_worker.replicate_storage(storage_user, v1_storage.uuid, storage)
assert result
# Ensure that the data was replicated to the other "region". # Ensure that the data was replicated to the other "region".
assert storage.get_content(['local_eu'], content_path) == 'some content' assert storage.get_content(['local_eu'], content_path) == 'some content'
@ -47,7 +54,7 @@ def test_storage_replication_v1(storage_user, storage_paths, replication_worker,
assert len(locations) == 2 assert len(locations) == 2
def test_storage_replication_cas(storage_user, storage_paths, replication_worker, app): def test_storage_replication_cas(storage_user, storage_paths, replication_worker, storage, app):
# Add a storage entry with a CAS path. # Add a storage entry with a CAS path.
content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest() content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest()
cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum) cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum)
@ -59,11 +66,106 @@ def test_storage_replication_cas(storage_user, storage_paths, replication_worker
storage.put_content(['local_us'], content_path, 'some content') storage.put_content(['local_us'], content_path, 'some content')
# Call replicate on it and verify it replicates. # Call replicate on it and verify it replicates.
result = replication_worker.replicate_storage(storage_user, cas_storage.uuid) replication_worker.replicate_storage(storage_user, cas_storage.uuid, storage)
assert result
# Ensure that the data was replicated to the other "region". # Ensure that the data was replicated to the other "region".
assert storage.get_content(['local_eu'], content_path) == 'some content' assert storage.get_content(['local_eu'], content_path) == 'some content'
locations = model.storage.get_storage_locations(cas_storage.uuid) locations = model.storage.get_storage_locations(cas_storage.uuid)
assert len(locations) == 2 assert len(locations) == 2
def test_storage_replication_missing_base(storage_user, storage_paths, replication_worker, storage,
app):
# Add a storage entry with a CAS path.
content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest()
cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum)
location = database.ImageStorageLocation.get(name='local_us')
database.ImageStoragePlacement.create(storage=cas_storage, location=location)
# Attempt to replicate storage. This should fail because the layer is missing from the base
# storage.
with pytest.raises(JobException):
replication_worker.replicate_storage(storage_user, cas_storage.uuid, storage,
backoff_check=False)
# Ensure the storage location count remains 1. This is technically inaccurate, but that's okay
# as we still require at least one location per storage.
locations = model.storage.get_storage_locations(cas_storage.uuid)
assert len(locations) == 1
def test_storage_replication_copy_error(storage_user, storage_paths, replication_worker, storage,
app):
# Add a storage entry with a CAS path.
content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest()
cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum)
location = database.ImageStorageLocation.get(name='local_us')
database.ImageStoragePlacement.create(storage=cas_storage, location=location)
content_path = storage_paths.blob_path(cas_storage.content_checksum)
storage.put_content(['local_us'], content_path, 'some content')
# Tell storage to break copying.
storage.put_content(['local_us'], 'break_copying', 'true')
# Attempt to replicate storage. This should fail because the write fails.
with pytest.raises(JobException):
replication_worker.replicate_storage(storage_user, cas_storage.uuid, storage,
backoff_check=False)
# Ensure the storage location count remains 1.
locations = model.storage.get_storage_locations(cas_storage.uuid)
assert len(locations) == 1
def test_storage_replication_copy_didnot_copy(storage_user, storage_paths, replication_worker,
storage, app):
# Add a storage entry with a CAS path.
content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest()
cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum)
location = database.ImageStorageLocation.get(name='local_us')
database.ImageStoragePlacement.create(storage=cas_storage, location=location)
content_path = storage_paths.blob_path(cas_storage.content_checksum)
storage.put_content(['local_us'], content_path, 'some content')
# Tell storage to fake copying (i.e. not actually copy the data).
storage.put_content(['local_us'], 'fake_copying', 'true')
# Attempt to replicate storage. This should fail because the copy doesn't actually do the copy.
with pytest.raises(JobException):
replication_worker.replicate_storage(storage_user, cas_storage.uuid, storage,
backoff_check=False)
# Ensure the storage location count remains 1.
locations = model.storage.get_storage_locations(cas_storage.uuid)
assert len(locations) == 1
def test_storage_replication_copy_unhandled_exception(storage_user, storage_paths,
replication_worker, storage, app):
# Add a storage entry with a CAS path.
content_checksum = 'sha256:' + hashlib.sha256('some content').hexdigest()
cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum)
location = database.ImageStorageLocation.get(name='local_us')
database.ImageStoragePlacement.create(storage=cas_storage, location=location)
content_path = storage_paths.blob_path(cas_storage.content_checksum)
storage.put_content(['local_us'], content_path, 'some content')
# Tell storage to raise an exception when copying.
storage.put_content(['local_us'], 'except_copying', 'true')
# Attempt to replicate storage. This should fail because the copy raises an unhandled exception.
with pytest.raises(WorkerUnhealthyException):
replication_worker.replicate_storage(storage_user, cas_storage.uuid, storage,
backoff_check=False)
# Ensure the storage location count remains 1.
locations = model.storage.get_storage_locations(cas_storage.uuid)
assert len(locations) == 1