Move Swift tests to pytest

This commit is contained in:
Joseph Schorr 2017-06-27 16:05:32 +03:00
parent e45ffb39d1
commit 6b272cf7e6
2 changed files with 204 additions and 223 deletions

204
storage/test/test_swift.py Normal file
View file

@ -0,0 +1,204 @@
import io
import pytest
from collections import defaultdict
from mock import MagicMock
from storage import StorageContext
from storage.swift import SwiftStorage
base_args = {
'context': StorageContext('nyc', None, None),
'swift_container': 'container-name',
'storage_path': '/basepath',
'auth_url': 'https://auth.com',
'swift_user': 'root',
'swift_password': 'password',
}
class MockSwiftStorage(SwiftStorage):
def __init__(self, *args, **kwargs):
super(MockSwiftStorage, self).__init__(*args, **kwargs)
self._connection = MagicMock()
def _get_connection(self):
return self._connection
class FakeSwiftStorage(SwiftStorage):
def __init__(self, *args, **kwargs):
super(FakeSwiftStorage, self).__init__(*args, **kwargs)
self._connection = FakeSwift()
def _get_connection(self):
return self._connection
class FakeSwift(object):
def __init__(self):
self.containers = defaultdict(dict)
def head_object(self, container, path):
return self.containers[container].get(path)
def put_object(self, container, path, content, chunk_size=None, content_type=None, headers=None):
if not isinstance(content, str):
content = content.read()
self.containers[container][path] = {
'content': content,
'chunk_size': chunk_size,
'content_type': content_type,
'headers': headers,
}
def get_object(self, container, path, resp_chunk_size=None):
data = self.containers[container].get(path, {})
if 'X-Object-Manifest' in data['headers']:
new_contents = []
prefix = data['headers']['X-Object-Manifest']
for key, value in self.containers[container].iteritems():
if ('container-name/' + key).startswith(prefix):
new_contents.append((key, value['content']))
new_contents.sort(key=lambda value: value[0])
data = dict(data)
data['content'] = ''.join([nc[1] for nc in new_contents])
return bool(data), data.get('content')
return bool(data), data.get('content')
def delete_object(self, container, path):
self.containers[container].pop(path, None)
class FakeQueue(object):
def __init__(self):
self.items = []
def get(self):
if not self.items:
return None
return self.items.pop()
def put(self, names, item, available_after=0):
self.items.append({
'names': names,
'item': item,
'available_after': available_after,
})
def test_fixed_path_concat():
swift = MockSwiftStorage(**base_args)
swift.exists('object/path')
swift._get_connection().head_object.assert_called_with('container-name', 'basepath/object/path')
def test_simple_path_concat():
simple_concat_args = dict(base_args)
simple_concat_args['simple_path_concat'] = True
swift = MockSwiftStorage(**simple_concat_args)
swift.exists('object/path')
swift._get_connection().head_object.assert_called_with('container-name', 'basepathobject/path')
def test_delete_unknown_path():
swift = SwiftStorage(**base_args)
with pytest.raises(IOError):
swift.remove('someunknownpath')
def test_simple_put_get():
swift = FakeSwiftStorage(**base_args)
assert not swift.exists('somepath')
swift.put_content('somepath', 'hello world!')
assert swift.exists('somepath')
assert swift.get_content('somepath') == 'hello world!'
def test_stream_read_write():
swift = FakeSwiftStorage(**base_args)
assert not swift.exists('somepath')
swift.stream_write('somepath', io.BytesIO('some content here'))
assert swift.exists('somepath')
assert swift.get_content('somepath') == 'some content here'
assert ''.join(list(swift.stream_read('somepath'))) == 'some content here'
def test_remove():
swift = FakeSwiftStorage(**base_args)
assert not swift.exists('somepath')
swift.put_content('somepath', 'hello world!')
assert swift.exists('somepath')
swift.remove('somepath')
assert not swift.exists('somepath')
def test_checksum():
swift = FakeSwiftStorage(**base_args)
swift.put_content('somepath', 'hello world!')
assert swift.get_checksum('somepath') is not None
def test_chunked_upload():
swift = FakeSwiftStorage(**base_args)
uuid, metadata = swift.initiate_chunked_upload()
chunks = ['this', 'is', 'some', 'chunked', 'data', '']
offset = 0
for chunk in chunks:
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, len(chunk),
io.BytesIO(chunk), metadata)
assert error is None
assert len(chunk) == bytes_written
offset += len(chunk)
swift.complete_chunked_upload(uuid, 'somepath', metadata)
assert swift.get_content('somepath') == ''.join(chunks)
def test_cancel_chunked_upload():
swift = FakeSwiftStorage(**base_args)
uuid, metadata = swift.initiate_chunked_upload()
chunks = ['this', 'is', 'some', 'chunked', 'data', '']
offset = 0
for chunk in chunks:
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, len(chunk),
io.BytesIO(chunk), metadata)
assert error is None
assert len(chunk) == bytes_written
offset += len(chunk)
swift.cancel_chunked_upload(uuid, metadata)
for segment in SwiftStorage._segment_list_from_metadata(metadata):
assert not swift.exists(segment.path)
def test_empty_chunks_queued_for_deletion():
chunk_cleanup_queue = FakeQueue()
args = dict(base_args)
args['context'] = StorageContext('nyc', None, chunk_cleanup_queue)
swift = FakeSwiftStorage(**args)
uuid, metadata = swift.initiate_chunked_upload()
chunks = ['this', '', 'is', 'some', '', 'chunked', 'data', '']
offset = 0
for chunk in chunks:
length = len(chunk)
if length == 0:
length = 1
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, length,
io.BytesIO(chunk), metadata)
assert error is None
assert len(chunk) == bytes_written
offset += len(chunk)
swift.complete_chunked_upload(uuid, 'somepath', metadata)
assert ''.join(chunks) == swift.get_content('somepath')
# Check the chunk deletion queue and ensure we have the last chunk queued.
found = chunk_cleanup_queue.get()
assert found is not None
found2 = chunk_cleanup_queue.get()
assert found2 is None

View file

@ -1,223 +0,0 @@
import io
import unittest
from collections import defaultdict
from mock import MagicMock
from storage import StorageContext
from storage.swift import SwiftStorage
class MockSwiftStorage(SwiftStorage):
def __init__(self, *args, **kwargs):
super(MockSwiftStorage, self).__init__(*args, **kwargs)
self._connection = MagicMock()
def _get_connection(self):
return self._connection
class MockSwiftTests(unittest.TestCase):
base_args = {
'context': StorageContext('nyc', None, None),
'swift_container': 'container-name',
'storage_path': '/basepath',
'auth_url': 'https://auth.com',
'swift_user': 'root',
'swift_password': 'password',
}
def test_fixed_path_concat(self):
swift = MockSwiftStorage(**self.base_args)
swift.exists('object/path')
swift._get_connection().head_object.assert_called_with('container-name', 'basepath/object/path')
def test_simple_path_concat(self):
simple_concat_args = dict(self.base_args)
simple_concat_args['simple_path_concat'] = True
swift = MockSwiftStorage(**simple_concat_args)
swift.exists('object/path')
swift._get_connection().head_object.assert_called_with('container-name', 'basepathobject/path')
def test_delete_unknown_path(self):
swift = SwiftStorage(**self.base_args)
with self.assertRaises(IOError):
swift.remove('someunknownpath')
class FakeSwiftStorage(SwiftStorage):
def __init__(self, *args, **kwargs):
super(FakeSwiftStorage, self).__init__(*args, **kwargs)
self._connection = FakeSwift()
def _get_connection(self):
return self._connection
class FakeSwift(object):
def __init__(self):
self.containers = defaultdict(dict)
def head_object(self, container, path):
return self.containers[container].get(path)
def put_object(self, container, path, content, chunk_size=None, content_type=None, headers=None):
if not isinstance(content, str):
content = content.read()
self.containers[container][path] = {
'content': content,
'chunk_size': chunk_size,
'content_type': content_type,
'headers': headers,
}
def get_object(self, container, path, resp_chunk_size=None):
data = self.containers[container].get(path, {})
if 'X-Object-Manifest' in data['headers']:
new_contents = []
prefix = data['headers']['X-Object-Manifest']
for key, value in self.containers[container].iteritems():
if ('container-name/' + key).startswith(prefix):
new_contents.append((key, value['content']))
new_contents.sort(key=lambda value: value[0])
data = dict(data)
data['content'] = ''.join([nc[1] for nc in new_contents])
return bool(data), data.get('content')
return bool(data), data.get('content')
def delete_object(self, container, path):
self.containers[container].pop(path, None)
class FakeQueue(object):
def __init__(self):
self.items = []
def get(self):
if not self.items:
return None
return self.items.pop()
def put(self, names, item, available_after=0):
self.items.append({
'names': names,
'item': item,
'available_after': available_after,
})
class FakeSwiftTests(unittest.TestCase):
base_args = {
'context': StorageContext('nyc', None, None),
'swift_container': 'container-name',
'storage_path': '/basepath',
'auth_url': 'https://auth.com',
'swift_user': 'root',
'swift_password': 'password',
}
def test_simple_put_get(self):
swift = FakeSwiftStorage(**self.base_args)
self.assertFalse(swift.exists('somepath'))
swift.put_content('somepath', 'hello world!')
self.assertTrue(swift.exists('somepath'))
self.assertEquals('hello world!', swift.get_content('somepath'))
def test_stream_read_write(self):
swift = FakeSwiftStorage(**self.base_args)
self.assertFalse(swift.exists('somepath'))
swift.stream_write('somepath', io.BytesIO('some content here'))
self.assertTrue(swift.exists('somepath'))
self.assertEquals('some content here', swift.get_content('somepath'))
self.assertEquals('some content here', ''.join(list(swift.stream_read('somepath'))))
def test_remove(self):
swift = FakeSwiftStorage(**self.base_args)
self.assertFalse(swift.exists('somepath'))
swift.put_content('somepath', 'hello world!')
self.assertTrue(swift.exists('somepath'))
swift.remove('somepath')
self.assertFalse(swift.exists('somepath'))
def test_checksum(self):
swift = FakeSwiftStorage(**self.base_args)
swift.put_content('somepath', 'hello world!')
self.assertIsNotNone(swift.get_checksum('somepath'))
def test_chunked_upload(self):
swift = FakeSwiftStorage(**self.base_args)
uuid, metadata = swift.initiate_chunked_upload()
chunks = ['this', 'is', 'some', 'chunked', 'data', '']
offset = 0
for chunk in chunks:
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, len(chunk),
io.BytesIO(chunk), metadata)
self.assertIsNone(error)
self.assertEquals(bytes_written, len(chunk))
offset += len(chunk)
swift.complete_chunked_upload(uuid, 'somepath', metadata)
self.assertEquals(''.join(chunks), swift.get_content('somepath'))
def test_cancel_chunked_upload(self):
swift = FakeSwiftStorage(**self.base_args)
uuid, metadata = swift.initiate_chunked_upload()
chunks = ['this', 'is', 'some', 'chunked', 'data', '']
offset = 0
for chunk in chunks:
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, len(chunk),
io.BytesIO(chunk), metadata)
self.assertIsNone(error)
self.assertEquals(bytes_written, len(chunk))
offset += len(chunk)
swift.cancel_chunked_upload(uuid, metadata)
for segment in SwiftStorage._segment_list_from_metadata(metadata):
self.assertFalse(swift.exists(segment.path))
def test_empty_chunks_queued_for_deletion(self):
chunk_cleanup_queue = FakeQueue()
args = dict(self.base_args)
args['context'] = StorageContext('nyc', None, chunk_cleanup_queue)
swift = FakeSwiftStorage(**args)
uuid, metadata = swift.initiate_chunked_upload()
chunks = ['this', '', 'is', 'some', '', 'chunked', 'data', '']
offset = 0
for chunk in chunks:
length = len(chunk)
if length == 0:
length = 1
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, length,
io.BytesIO(chunk), metadata)
self.assertIsNone(error)
self.assertEquals(bytes_written, len(chunk))
offset += len(chunk)
swift.complete_chunked_upload(uuid, 'somepath', metadata)
self.assertEquals(''.join(chunks), swift.get_content('somepath'))
# Check the chunk deletion queue and ensure we have the last chunk queued.
found = chunk_cleanup_queue.get()
self.assertIsNotNone(found)
found2 = chunk_cleanup_queue.get()
self.assertIsNone(found2)
if __name__ == '__main__':
unittest.main()