diff --git a/storage/swift.py b/storage/swift.py index 6017a18b4..84a359c0d 100644 --- a/storage/swift.py +++ b/storage/swift.py @@ -11,7 +11,7 @@ import json from _pyio import BufferedReader from uuid import uuid4 -from swiftclient.client import Connection, ClientException +from swiftclient.client import Connection, ClientException, ReadableToIterable from urlparse import urlparse from random import SystemRandom from hashlib import sha1 @@ -108,10 +108,14 @@ class SwiftStorage(BaseStorage): if content_encoding is not None: headers['Content-Encoding'] = content_encoding + is_filelike = hasattr(content, 'read') + if is_filelike: + content = ReadableToIterable(content, md5=True) + try: - self._get_connection().put_object(self._swift_container, path, content, - chunk_size=chunk, content_type=content_type, - headers=headers) + etag = self._get_connection().put_object(self._swift_container, path, content, + chunk_size=chunk, content_type=content_type, + headers=headers) except ClientException: # We re-raise client exception here so that validation of config during setup can see # the client exception messages. @@ -120,6 +124,16 @@ class SwiftStorage(BaseStorage): logger.exception('Could not put object at path %s: %s', path, ex) raise IOError("Could not put content: %s" % path) + # If we wrapped the content in a ReadableToIterable, compare its MD5 to the etag returned. If + # they don't match, raise an IOError indicating a write failure. + if is_filelike: + if etag != content.get_md5sum(): + logger.error('Got mismatch in md5 etag for path %s: Expected %s, but server has %s', path, + content.get_md5sum(), etag) + raise IOError('upload verification failed for path {0}:' + 'md5 mismatch, local {1} != remote {2}' + .format(path, content.get_md5sum(), etag)) + def _head_object(self, path): path = self._normalize_path(path) try: diff --git a/storage/test/test_swift.py b/storage/test/test_swift.py new file mode 100644 index 000000000..f9d4cd9ce --- /dev/null +++ b/storage/test/test_swift.py @@ -0,0 +1,220 @@ +import io +import pytest +import hashlib + +from collections import defaultdict +from mock import MagicMock + +from storage import StorageContext +from storage.swift import SwiftStorage + +base_args = { + 'context': StorageContext('nyc', None, None), + 'swift_container': 'container-name', + 'storage_path': '/basepath', + 'auth_url': 'https://auth.com', + 'swift_user': 'root', + 'swift_password': 'password', +} + +class MockSwiftStorage(SwiftStorage): + def __init__(self, *args, **kwargs): + super(MockSwiftStorage, self).__init__(*args, **kwargs) + self._connection = MagicMock() + + def _get_connection(self): + return self._connection + +class FakeSwiftStorage(SwiftStorage): + def __init__(self, fail_checksum=False, *args, **kwargs): + super(FakeSwiftStorage, self).__init__(*args, **kwargs) + self._connection = FakeSwift(fail_checksum=fail_checksum) + + def _get_connection(self): + return self._connection + + +class FakeSwift(object): + def __init__(self, fail_checksum=False): + self.containers = defaultdict(dict) + self.fail_checksum = fail_checksum + + def head_object(self, container, path): + return self.containers[container].get(path) + + def put_object(self, container, path, content, chunk_size=None, content_type=None, headers=None): + if not isinstance(content, str): + if hasattr(content, 'read'): + content = content.read() + else: + content = ''.join(content) + + self.containers[container][path] = { + 'content': content, + 'chunk_size': chunk_size, + 'content_type': content_type, + 'headers': headers, + } + + digest = hashlib.md5() + digest.update(content) + return digest.hexdigest() if not self.fail_checksum else 'invalid' + + def get_object(self, container, path, resp_chunk_size=None): + data = self.containers[container].get(path, {}) + if 'X-Object-Manifest' in data['headers']: + new_contents = [] + prefix = data['headers']['X-Object-Manifest'] + for key, value in self.containers[container].iteritems(): + if ('container-name/' + key).startswith(prefix): + new_contents.append((key, value['content'])) + + new_contents.sort(key=lambda value: value[0]) + + data = dict(data) + data['content'] = ''.join([nc[1] for nc in new_contents]) + return bool(data), data.get('content') + + return bool(data), data.get('content') + + def delete_object(self, container, path): + self.containers[container].pop(path, None) + + +class FakeQueue(object): + def __init__(self): + self.items = [] + + def get(self): + if not self.items: + return None + + return self.items.pop() + + def put(self, names, item, available_after=0): + self.items.append({ + 'names': names, + 'item': item, + 'available_after': available_after, + }) + +def test_fixed_path_concat(): + swift = MockSwiftStorage(**base_args) + swift.exists('object/path') + swift._get_connection().head_object.assert_called_with('container-name', 'basepath/object/path') + + +def test_simple_path_concat(): + simple_concat_args = dict(base_args) + simple_concat_args['simple_path_concat'] = True + swift = MockSwiftStorage(**simple_concat_args) + swift.exists('object/path') + swift._get_connection().head_object.assert_called_with('container-name', 'basepathobject/path') + +def test_delete_unknown_path(): + swift = SwiftStorage(**base_args) + with pytest.raises(IOError): + swift.remove('someunknownpath') + +def test_simple_put_get(): + swift = FakeSwiftStorage(**base_args) + assert not swift.exists('somepath') + + swift.put_content('somepath', 'hello world!') + assert swift.exists('somepath') + assert swift.get_content('somepath') == 'hello world!' + +def test_stream_read_write(): + swift = FakeSwiftStorage(**base_args) + assert not swift.exists('somepath') + + swift.stream_write('somepath', io.BytesIO('some content here')) + assert swift.exists('somepath') + assert swift.get_content('somepath') == 'some content here' + assert ''.join(list(swift.stream_read('somepath'))) == 'some content here' + +def test_stream_read_write_invalid_checksum(): + swift = FakeSwiftStorage(fail_checksum=True, **base_args) + assert not swift.exists('somepath') + + with pytest.raises(IOError): + swift.stream_write('somepath', io.BytesIO('some content here')) + +def test_remove(): + swift = FakeSwiftStorage(**base_args) + assert not swift.exists('somepath') + + swift.put_content('somepath', 'hello world!') + assert swift.exists('somepath') + + swift.remove('somepath') + assert not swift.exists('somepath') + +def test_checksum(): + swift = FakeSwiftStorage(**base_args) + swift.put_content('somepath', 'hello world!') + assert swift.get_checksum('somepath') is not None + +def test_chunked_upload(): + swift = FakeSwiftStorage(**base_args) + uuid, metadata = swift.initiate_chunked_upload() + + chunks = ['this', 'is', 'some', 'chunked', 'data', ''] + offset = 0 + for chunk in chunks: + bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, len(chunk), + io.BytesIO(chunk), metadata) + assert error is None + assert len(chunk) == bytes_written + offset += len(chunk) + + swift.complete_chunked_upload(uuid, 'somepath', metadata) + assert swift.get_content('somepath') == ''.join(chunks) + +def test_cancel_chunked_upload(): + swift = FakeSwiftStorage(**base_args) + uuid, metadata = swift.initiate_chunked_upload() + + chunks = ['this', 'is', 'some', 'chunked', 'data', ''] + offset = 0 + for chunk in chunks: + bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, len(chunk), + io.BytesIO(chunk), metadata) + assert error is None + assert len(chunk) == bytes_written + offset += len(chunk) + + swift.cancel_chunked_upload(uuid, metadata) + for segment in SwiftStorage._segment_list_from_metadata(metadata): + assert not swift.exists(segment.path) + +def test_empty_chunks_queued_for_deletion(): + chunk_cleanup_queue = FakeQueue() + args = dict(base_args) + args['context'] = StorageContext('nyc', None, chunk_cleanup_queue) + + swift = FakeSwiftStorage(**args) + uuid, metadata = swift.initiate_chunked_upload() + + chunks = ['this', '', 'is', 'some', '', 'chunked', 'data', ''] + offset = 0 + for chunk in chunks: + length = len(chunk) + if length == 0: + length = 1 + + bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, length, + io.BytesIO(chunk), metadata) + assert error is None + assert len(chunk) == bytes_written + offset += len(chunk) + + swift.complete_chunked_upload(uuid, 'somepath', metadata) + assert ''.join(chunks) == swift.get_content('somepath') + + # Check the chunk deletion queue and ensure we have the last chunk queued. + found = chunk_cleanup_queue.get() + assert found is not None + + found2 = chunk_cleanup_queue.get() + assert found2 is None diff --git a/test/test_swift.py b/test/test_swift.py deleted file mode 100644 index 318f0a1fc..000000000 --- a/test/test_swift.py +++ /dev/null @@ -1,223 +0,0 @@ -import io -import unittest - -from collections import defaultdict -from mock import MagicMock - -from storage import StorageContext -from storage.swift import SwiftStorage - - -class MockSwiftStorage(SwiftStorage): - def __init__(self, *args, **kwargs): - super(MockSwiftStorage, self).__init__(*args, **kwargs) - self._connection = MagicMock() - - def _get_connection(self): - return self._connection - - -class MockSwiftTests(unittest.TestCase): - base_args = { - 'context': StorageContext('nyc', None, None), - 'swift_container': 'container-name', - 'storage_path': '/basepath', - 'auth_url': 'https://auth.com', - 'swift_user': 'root', - 'swift_password': 'password', - } - - def test_fixed_path_concat(self): - swift = MockSwiftStorage(**self.base_args) - swift.exists('object/path') - swift._get_connection().head_object.assert_called_with('container-name', 'basepath/object/path') - - - def test_simple_path_concat(self): - simple_concat_args = dict(self.base_args) - simple_concat_args['simple_path_concat'] = True - swift = MockSwiftStorage(**simple_concat_args) - swift.exists('object/path') - swift._get_connection().head_object.assert_called_with('container-name', 'basepathobject/path') - - def test_delete_unknown_path(self): - swift = SwiftStorage(**self.base_args) - with self.assertRaises(IOError): - swift.remove('someunknownpath') - - -class FakeSwiftStorage(SwiftStorage): - def __init__(self, *args, **kwargs): - super(FakeSwiftStorage, self).__init__(*args, **kwargs) - self._connection = FakeSwift() - - def _get_connection(self): - return self._connection - - -class FakeSwift(object): - def __init__(self): - self.containers = defaultdict(dict) - - def head_object(self, container, path): - return self.containers[container].get(path) - - def put_object(self, container, path, content, chunk_size=None, content_type=None, headers=None): - if not isinstance(content, str): - content = content.read() - - self.containers[container][path] = { - 'content': content, - 'chunk_size': chunk_size, - 'content_type': content_type, - 'headers': headers, - } - - def get_object(self, container, path, resp_chunk_size=None): - data = self.containers[container].get(path, {}) - if 'X-Object-Manifest' in data['headers']: - new_contents = [] - prefix = data['headers']['X-Object-Manifest'] - for key, value in self.containers[container].iteritems(): - if ('container-name/' + key).startswith(prefix): - new_contents.append((key, value['content'])) - - new_contents.sort(key=lambda value: value[0]) - - data = dict(data) - data['content'] = ''.join([nc[1] for nc in new_contents]) - return bool(data), data.get('content') - - return bool(data), data.get('content') - - def delete_object(self, container, path): - self.containers[container].pop(path, None) - - -class FakeQueue(object): - def __init__(self): - self.items = [] - - def get(self): - if not self.items: - return None - - return self.items.pop() - - def put(self, names, item, available_after=0): - self.items.append({ - 'names': names, - 'item': item, - 'available_after': available_after, - }) - -class FakeSwiftTests(unittest.TestCase): - base_args = { - 'context': StorageContext('nyc', None, None), - 'swift_container': 'container-name', - 'storage_path': '/basepath', - 'auth_url': 'https://auth.com', - 'swift_user': 'root', - 'swift_password': 'password', - } - - def test_simple_put_get(self): - swift = FakeSwiftStorage(**self.base_args) - self.assertFalse(swift.exists('somepath')) - - swift.put_content('somepath', 'hello world!') - self.assertTrue(swift.exists('somepath')) - - self.assertEquals('hello world!', swift.get_content('somepath')) - - def test_stream_read_write(self): - swift = FakeSwiftStorage(**self.base_args) - self.assertFalse(swift.exists('somepath')) - - swift.stream_write('somepath', io.BytesIO('some content here')) - self.assertTrue(swift.exists('somepath')) - - self.assertEquals('some content here', swift.get_content('somepath')) - self.assertEquals('some content here', ''.join(list(swift.stream_read('somepath')))) - - def test_remove(self): - swift = FakeSwiftStorage(**self.base_args) - self.assertFalse(swift.exists('somepath')) - - swift.put_content('somepath', 'hello world!') - self.assertTrue(swift.exists('somepath')) - - swift.remove('somepath') - self.assertFalse(swift.exists('somepath')) - - def test_checksum(self): - swift = FakeSwiftStorage(**self.base_args) - swift.put_content('somepath', 'hello world!') - self.assertIsNotNone(swift.get_checksum('somepath')) - - def test_chunked_upload(self): - swift = FakeSwiftStorage(**self.base_args) - uuid, metadata = swift.initiate_chunked_upload() - - chunks = ['this', 'is', 'some', 'chunked', 'data', ''] - offset = 0 - for chunk in chunks: - bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, len(chunk), - io.BytesIO(chunk), metadata) - self.assertIsNone(error) - self.assertEquals(bytes_written, len(chunk)) - offset += len(chunk) - - swift.complete_chunked_upload(uuid, 'somepath', metadata) - self.assertEquals(''.join(chunks), swift.get_content('somepath')) - - def test_cancel_chunked_upload(self): - swift = FakeSwiftStorage(**self.base_args) - uuid, metadata = swift.initiate_chunked_upload() - - chunks = ['this', 'is', 'some', 'chunked', 'data', ''] - offset = 0 - for chunk in chunks: - bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, len(chunk), - io.BytesIO(chunk), metadata) - self.assertIsNone(error) - self.assertEquals(bytes_written, len(chunk)) - offset += len(chunk) - - swift.cancel_chunked_upload(uuid, metadata) - for segment in SwiftStorage._segment_list_from_metadata(metadata): - self.assertFalse(swift.exists(segment.path)) - - def test_empty_chunks_queued_for_deletion(self): - chunk_cleanup_queue = FakeQueue() - args = dict(self.base_args) - args['context'] = StorageContext('nyc', None, chunk_cleanup_queue) - - swift = FakeSwiftStorage(**args) - uuid, metadata = swift.initiate_chunked_upload() - - chunks = ['this', '', 'is', 'some', '', 'chunked', 'data', ''] - offset = 0 - for chunk in chunks: - length = len(chunk) - if length == 0: - length = 1 - - bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, length, - io.BytesIO(chunk), metadata) - self.assertIsNone(error) - self.assertEquals(bytes_written, len(chunk)) - offset += len(chunk) - - swift.complete_chunked_upload(uuid, 'somepath', metadata) - self.assertEquals(''.join(chunks), swift.get_content('somepath')) - - # Check the chunk deletion queue and ensure we have the last chunk queued. - found = chunk_cleanup_queue.get() - self.assertIsNotNone(found) - - found2 = chunk_cleanup_queue.get() - self.assertIsNone(found2) - -if __name__ == '__main__': - unittest.main()