import io import pytest import hashlib import copy from collections import defaultdict from mock import MagicMock from storage import StorageContext from storage.swift import SwiftStorage base_args = { 'context': StorageContext('nyc', None, None, None, None), 'swift_container': 'container-name', 'storage_path': '/basepath', 'auth_url': 'https://auth.com', 'swift_user': 'root', 'swift_password': 'password', } class MockSwiftStorage(SwiftStorage): def __init__(self, *args, **kwargs): super(MockSwiftStorage, self).__init__(*args, **kwargs) self._connection = MagicMock() def _get_connection(self): return self._connection class FakeSwiftStorage(SwiftStorage): def __init__(self, fail_checksum=False, connection=None, *args, **kwargs): super(FakeSwiftStorage, self).__init__(*args, **kwargs) self._connection = connection or FakeSwift(fail_checksum=fail_checksum) def _get_connection(self): return self._connection class FakeSwift(object): def __init__(self, fail_checksum=False): self.containers = defaultdict(dict) self.fail_checksum = fail_checksum def head_object(self, container, path): return self.containers[container].get(path) def copy_object(self, container, path, target): pieces = target.split('/', 2) _, content = self.get_object(container, path) self.put_object(pieces[1], pieces[2], content) def put_object(self, container, path, content, chunk_size=None, content_type=None, headers=None): if not isinstance(content, str): if hasattr(content, 'read'): content = content.read() else: content = ''.join(content) self.containers[container][path] = { 'content': content, 'chunk_size': chunk_size, 'content_type': content_type, 'headers': headers or {}, } digest = hashlib.md5() digest.update(content) return digest.hexdigest() if not self.fail_checksum else 'invalid' def get_object(self, container, path, resp_chunk_size=None): data = self.containers[container].get(path, {}) if 'X-Object-Manifest' in data['headers']: new_contents = [] prefix = data['headers']['X-Object-Manifest'] for key, value in self.containers[container].iteritems(): if ('container-name/' + key).startswith(prefix): new_contents.append((key, value['content'])) new_contents.sort(key=lambda value: value[0]) data = dict(data) data['content'] = ''.join([nc[1] for nc in new_contents]) return bool(data), data.get('content') return bool(data), data.get('content') def delete_object(self, container, path): self.containers[container].pop(path, None) class FakeQueue(object): def __init__(self): self.items = [] def get(self): if not self.items: return None return self.items.pop() def put(self, names, item, available_after=0): self.items.append({ 'names': names, 'item': item, 'available_after': available_after, }) def test_fixed_path_concat(): swift = MockSwiftStorage(**base_args) swift.exists('object/path') swift._get_connection().head_object.assert_called_with('container-name', 'basepath/object/path') def test_simple_path_concat(): simple_concat_args = dict(base_args) simple_concat_args['simple_path_concat'] = True swift = MockSwiftStorage(**simple_concat_args) swift.exists('object/path') swift._get_connection().head_object.assert_called_with('container-name', 'basepathobject/path') def test_delete_unknown_path(): swift = SwiftStorage(**base_args) with pytest.raises(IOError): swift.remove('someunknownpath') def test_simple_put_get(): swift = FakeSwiftStorage(**base_args) assert not swift.exists('somepath') swift.put_content('somepath', 'hello world!') assert swift.exists('somepath') assert swift.get_content('somepath') == 'hello world!' def test_stream_read_write(): swift = FakeSwiftStorage(**base_args) assert not swift.exists('somepath') swift.stream_write('somepath', io.BytesIO('some content here')) assert swift.exists('somepath') assert swift.get_content('somepath') == 'some content here' assert ''.join(list(swift.stream_read('somepath'))) == 'some content here' def test_stream_read_write_invalid_checksum(): swift = FakeSwiftStorage(fail_checksum=True, **base_args) assert not swift.exists('somepath') with pytest.raises(IOError): swift.stream_write('somepath', io.BytesIO('some content here')) def test_remove(): swift = FakeSwiftStorage(**base_args) assert not swift.exists('somepath') swift.put_content('somepath', 'hello world!') assert swift.exists('somepath') swift.remove('somepath') assert not swift.exists('somepath') def test_copy_to(): swift = FakeSwiftStorage(**base_args) modified_args = copy.deepcopy(base_args) modified_args['swift_container'] = 'another_container' another_swift = FakeSwiftStorage(connection=swift._connection, **modified_args) swift.put_content('somepath', 'some content here') swift.copy_to(another_swift, 'somepath') assert swift.exists('somepath') assert another_swift.exists('somepath') assert swift.get_content('somepath') == 'some content here' assert another_swift.get_content('somepath') == 'some content here' def test_copy_to_different(): swift = FakeSwiftStorage(**base_args) modified_args = copy.deepcopy(base_args) modified_args['swift_user'] = 'foobarbaz' modified_args['swift_container'] = 'another_container' another_swift = FakeSwiftStorage(**modified_args) swift.put_content('somepath', 'some content here') swift.copy_to(another_swift, 'somepath') assert swift.exists('somepath') assert another_swift.exists('somepath') assert swift.get_content('somepath') == 'some content here' assert another_swift.get_content('somepath') == 'some content here' def test_checksum(): swift = FakeSwiftStorage(**base_args) swift.put_content('somepath', 'hello world!') assert swift.get_checksum('somepath') is not None def test_chunked_upload(): swift = FakeSwiftStorage(**base_args) uuid, metadata = swift.initiate_chunked_upload() chunks = ['this', 'is', 'some', 'chunked', 'data', ''] offset = 0 for chunk in chunks: bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, len(chunk), io.BytesIO(chunk), metadata) assert error is None assert len(chunk) == bytes_written offset += len(chunk) swift.complete_chunked_upload(uuid, 'somepath', metadata) assert swift.get_content('somepath') == ''.join(chunks) def test_cancel_chunked_upload(): swift = FakeSwiftStorage(**base_args) uuid, metadata = swift.initiate_chunked_upload() chunks = ['this', 'is', 'some', 'chunked', 'data', ''] offset = 0 for chunk in chunks: bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, len(chunk), io.BytesIO(chunk), metadata) assert error is None assert len(chunk) == bytes_written offset += len(chunk) swift.cancel_chunked_upload(uuid, metadata) for segment in SwiftStorage._segment_list_from_metadata(metadata): assert not swift.exists(segment.path) def test_empty_chunks_queued_for_deletion(): chunk_cleanup_queue = FakeQueue() args = dict(base_args) args['context'] = StorageContext('nyc', None, chunk_cleanup_queue, None, None) swift = FakeSwiftStorage(**args) uuid, metadata = swift.initiate_chunked_upload() chunks = ['this', '', 'is', 'some', '', 'chunked', 'data', ''] offset = 0 for chunk in chunks: length = len(chunk) if length == 0: length = 1 bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, length, io.BytesIO(chunk), metadata) assert error is None assert len(chunk) == bytes_written offset += len(chunk) swift.complete_chunked_upload(uuid, 'somepath', metadata) assert ''.join(chunks) == swift.get_content('somepath') # Check the chunk deletion queue and ensure we have the last chunk queued. found = chunk_cleanup_queue.get() assert found is not None found2 = chunk_cleanup_queue.get() assert found2 is None