328 lines
10 KiB
Python
328 lines
10 KiB
Python
|
import io
|
||
|
import pytest
|
||
|
import hashlib
|
||
|
import copy
|
||
|
|
||
|
from collections import defaultdict
|
||
|
from mock import MagicMock, patch
|
||
|
|
||
|
from storage import StorageContext
|
||
|
from storage.swift import SwiftStorage, _EMPTY_SEGMENTS_KEY
|
||
|
from swiftclient.client import ClientException
|
||
|
|
||
|
base_args = {
|
||
|
'context': StorageContext('nyc', None, None, None, None),
|
||
|
'swift_container': 'container-name',
|
||
|
'storage_path': '/basepath',
|
||
|
'auth_url': 'https://auth.com',
|
||
|
'swift_user': 'root',
|
||
|
'swift_password': 'password',
|
||
|
}
|
||
|
|
||
|
class MockSwiftStorage(SwiftStorage):
|
||
|
def __init__(self, *args, **kwargs):
|
||
|
super(MockSwiftStorage, self).__init__(*args, **kwargs)
|
||
|
self._connection = MagicMock()
|
||
|
|
||
|
def _get_connection(self):
|
||
|
return self._connection
|
||
|
|
||
|
class FakeSwiftStorage(SwiftStorage):
|
||
|
def __init__(self, fail_checksum=False,connection=None, *args, **kwargs):
|
||
|
super(FakeSwiftStorage, self).__init__(*args, **kwargs)
|
||
|
self._connection = connection or FakeSwift(fail_checksum=fail_checksum,
|
||
|
temp_url_key=kwargs.get('temp_url_key'))
|
||
|
|
||
|
def _get_connection(self):
|
||
|
return self._connection
|
||
|
|
||
|
|
||
|
class FakeSwift(object):
|
||
|
def __init__(self, fail_checksum=False, temp_url_key=None):
|
||
|
self.containers = defaultdict(dict)
|
||
|
self.fail_checksum = fail_checksum
|
||
|
self.temp_url_key = temp_url_key
|
||
|
|
||
|
def get_auth(self):
|
||
|
if self.temp_url_key == 'exception':
|
||
|
raise ClientException('I failed!')
|
||
|
|
||
|
return 'http://fake/swift', None
|
||
|
|
||
|
def head_object(self, container, path):
|
||
|
return self.containers.get(container, {}).get(path, {}).get('headers', None)
|
||
|
|
||
|
def copy_object(self, container, path, target):
|
||
|
pieces = target.split('/', 2)
|
||
|
_, content = self.get_object(container, path)
|
||
|
self.put_object(pieces[1], pieces[2], content)
|
||
|
|
||
|
def get_container(self, container, prefix=None, full_listing=None):
|
||
|
container_entries = self.containers[container]
|
||
|
objs = []
|
||
|
for path, data in list(container_entries.iteritems()):
|
||
|
if not prefix or path.startswith(prefix):
|
||
|
objs.append({
|
||
|
'name': path,
|
||
|
'bytes': len(data['content']),
|
||
|
})
|
||
|
return {}, objs
|
||
|
|
||
|
def put_object(self, container, path, content, chunk_size=None, content_type=None, headers=None):
|
||
|
if not isinstance(content, str):
|
||
|
if hasattr(content, 'read'):
|
||
|
content = content.read()
|
||
|
else:
|
||
|
content = ''.join(content)
|
||
|
|
||
|
self.containers[container][path] = {
|
||
|
'content': content,
|
||
|
'chunk_size': chunk_size,
|
||
|
'content_type': content_type,
|
||
|
'headers': headers or {'is': True},
|
||
|
}
|
||
|
|
||
|
digest = hashlib.md5()
|
||
|
digest.update(content)
|
||
|
return digest.hexdigest() if not self.fail_checksum else 'invalid'
|
||
|
|
||
|
def get_object(self, container, path, resp_chunk_size=None):
|
||
|
data = self.containers[container].get(path, {})
|
||
|
if 'X-Object-Manifest' in data['headers']:
|
||
|
new_contents = []
|
||
|
prefix = data['headers']['X-Object-Manifest']
|
||
|
for key, value in self.containers[container].iteritems():
|
||
|
if ('container-name/' + key).startswith(prefix):
|
||
|
new_contents.append((key, value['content']))
|
||
|
|
||
|
new_contents.sort(key=lambda value: value[0])
|
||
|
|
||
|
data = dict(data)
|
||
|
data['content'] = ''.join([nc[1] for nc in new_contents])
|
||
|
return bool(data), data.get('content')
|
||
|
|
||
|
return bool(data), data.get('content')
|
||
|
|
||
|
def delete_object(self, container, path):
|
||
|
self.containers[container].pop(path, None)
|
||
|
|
||
|
|
||
|
class FakeQueue(object):
|
||
|
def __init__(self):
|
||
|
self.items = []
|
||
|
|
||
|
def get(self):
|
||
|
if not self.items:
|
||
|
return None
|
||
|
|
||
|
return self.items.pop()
|
||
|
|
||
|
def put(self, names, item, available_after=0):
|
||
|
self.items.append({
|
||
|
'names': names,
|
||
|
'item': item,
|
||
|
'available_after': available_after,
|
||
|
})
|
||
|
|
||
|
def test_fixed_path_concat():
|
||
|
swift = MockSwiftStorage(**base_args)
|
||
|
swift.exists('object/path')
|
||
|
swift._get_connection().head_object.assert_called_with('container-name', 'basepath/object/path')
|
||
|
|
||
|
def test_simple_path_concat():
|
||
|
simple_concat_args = dict(base_args)
|
||
|
simple_concat_args['simple_path_concat'] = True
|
||
|
swift = MockSwiftStorage(**simple_concat_args)
|
||
|
swift.exists('object/path')
|
||
|
swift._get_connection().head_object.assert_called_with('container-name', 'basepathobject/path')
|
||
|
|
||
|
def test_delete_unknown_path():
|
||
|
swift = SwiftStorage(**base_args)
|
||
|
with pytest.raises(IOError):
|
||
|
swift.remove('someunknownpath')
|
||
|
|
||
|
def test_simple_put_get():
|
||
|
swift = FakeSwiftStorage(**base_args)
|
||
|
assert not swift.exists('somepath')
|
||
|
|
||
|
swift.put_content('somepath', 'hello world!')
|
||
|
assert swift.exists('somepath')
|
||
|
assert swift.get_content('somepath') == 'hello world!'
|
||
|
|
||
|
def test_stream_read_write():
|
||
|
swift = FakeSwiftStorage(**base_args)
|
||
|
assert not swift.exists('somepath')
|
||
|
|
||
|
swift.stream_write('somepath', io.BytesIO('some content here'))
|
||
|
assert swift.exists('somepath')
|
||
|
assert swift.get_content('somepath') == 'some content here'
|
||
|
assert ''.join(list(swift.stream_read('somepath'))) == 'some content here'
|
||
|
|
||
|
def test_stream_read_write_invalid_checksum():
|
||
|
swift = FakeSwiftStorage(fail_checksum=True, **base_args)
|
||
|
assert not swift.exists('somepath')
|
||
|
|
||
|
with pytest.raises(IOError):
|
||
|
swift.stream_write('somepath', io.BytesIO('some content here'))
|
||
|
|
||
|
def test_remove():
|
||
|
swift = FakeSwiftStorage(**base_args)
|
||
|
assert not swift.exists('somepath')
|
||
|
|
||
|
swift.put_content('somepath', 'hello world!')
|
||
|
assert swift.exists('somepath')
|
||
|
|
||
|
swift.remove('somepath')
|
||
|
assert not swift.exists('somepath')
|
||
|
|
||
|
def test_copy_to():
|
||
|
swift = FakeSwiftStorage(**base_args)
|
||
|
|
||
|
modified_args = copy.deepcopy(base_args)
|
||
|
modified_args['swift_container'] = 'another_container'
|
||
|
|
||
|
another_swift = FakeSwiftStorage(connection=swift._connection, **modified_args)
|
||
|
|
||
|
swift.put_content('somepath', 'some content here')
|
||
|
swift.copy_to(another_swift, 'somepath')
|
||
|
|
||
|
assert swift.exists('somepath')
|
||
|
assert another_swift.exists('somepath')
|
||
|
|
||
|
assert swift.get_content('somepath') == 'some content here'
|
||
|
assert another_swift.get_content('somepath') == 'some content here'
|
||
|
|
||
|
def test_copy_to_different():
|
||
|
swift = FakeSwiftStorage(**base_args)
|
||
|
|
||
|
modified_args = copy.deepcopy(base_args)
|
||
|
modified_args['swift_user'] = 'foobarbaz'
|
||
|
modified_args['swift_container'] = 'another_container'
|
||
|
|
||
|
another_swift = FakeSwiftStorage(**modified_args)
|
||
|
|
||
|
swift.put_content('somepath', 'some content here')
|
||
|
swift.copy_to(another_swift, 'somepath')
|
||
|
|
||
|
assert swift.exists('somepath')
|
||
|
assert another_swift.exists('somepath')
|
||
|
|
||
|
assert swift.get_content('somepath') == 'some content here'
|
||
|
assert another_swift.get_content('somepath') == 'some content here'
|
||
|
|
||
|
def test_checksum():
|
||
|
swift = FakeSwiftStorage(**base_args)
|
||
|
swift.put_content('somepath', 'hello world!')
|
||
|
assert swift.get_checksum('somepath') is not None
|
||
|
|
||
|
@pytest.mark.parametrize('read_until_end', [
|
||
|
(True,),
|
||
|
(False,),
|
||
|
])
|
||
|
@pytest.mark.parametrize('max_chunk_size', [
|
||
|
(10000000),
|
||
|
(10),
|
||
|
(5),
|
||
|
(2),
|
||
|
(1),
|
||
|
])
|
||
|
@pytest.mark.parametrize('chunks', [
|
||
|
(['this', 'is', 'some', 'chunked', 'data', '']),
|
||
|
(['this is a very large chunk of data', '']),
|
||
|
(['h', 'e', 'l', 'l', 'o', '']),
|
||
|
])
|
||
|
def test_chunked_upload(chunks, max_chunk_size, read_until_end):
|
||
|
swift = FakeSwiftStorage(**base_args)
|
||
|
uuid, metadata = swift.initiate_chunked_upload()
|
||
|
|
||
|
offset = 0
|
||
|
|
||
|
with patch('storage.swift._MAXIMUM_SEGMENT_SIZE', max_chunk_size):
|
||
|
for chunk in chunks:
|
||
|
chunk_length = len(chunk) if not read_until_end else -1
|
||
|
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, chunk_length,
|
||
|
io.BytesIO(chunk), metadata)
|
||
|
assert error is None
|
||
|
assert len(chunk) == bytes_written
|
||
|
offset += len(chunk)
|
||
|
|
||
|
swift.complete_chunked_upload(uuid, 'somepath', metadata)
|
||
|
assert swift.get_content('somepath') == ''.join(chunks)
|
||
|
|
||
|
# Ensure each of the segments exist.
|
||
|
for segment in metadata['segments']:
|
||
|
assert swift.exists(segment.path)
|
||
|
|
||
|
# Delete the file and ensure all of its segments were removed.
|
||
|
swift.remove('somepath')
|
||
|
assert not swift.exists('somepath')
|
||
|
|
||
|
for segment in metadata['segments']:
|
||
|
assert not swift.exists(segment.path)
|
||
|
|
||
|
|
||
|
def test_cancel_chunked_upload():
|
||
|
chunk_cleanup_queue = FakeQueue()
|
||
|
|
||
|
args = dict(base_args)
|
||
|
args['context'] = StorageContext('nyc', None, chunk_cleanup_queue, None, None)
|
||
|
|
||
|
swift = FakeSwiftStorage(**args)
|
||
|
uuid, metadata = swift.initiate_chunked_upload()
|
||
|
|
||
|
chunks = ['this', 'is', 'some', 'chunked', 'data', '']
|
||
|
offset = 0
|
||
|
for chunk in chunks:
|
||
|
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, len(chunk),
|
||
|
io.BytesIO(chunk), metadata)
|
||
|
assert error is None
|
||
|
assert len(chunk) == bytes_written
|
||
|
offset += len(chunk)
|
||
|
|
||
|
swift.cancel_chunked_upload(uuid, metadata)
|
||
|
|
||
|
found = chunk_cleanup_queue.get()
|
||
|
assert found is not None
|
||
|
|
||
|
|
||
|
def test_empty_chunks_queued_for_deletion():
|
||
|
chunk_cleanup_queue = FakeQueue()
|
||
|
args = dict(base_args)
|
||
|
args['context'] = StorageContext('nyc', None, chunk_cleanup_queue, None, None)
|
||
|
|
||
|
swift = FakeSwiftStorage(**args)
|
||
|
uuid, metadata = swift.initiate_chunked_upload()
|
||
|
|
||
|
chunks = ['this', '', 'is', 'some', '', 'chunked', 'data', '']
|
||
|
offset = 0
|
||
|
for chunk in chunks:
|
||
|
length = len(chunk)
|
||
|
if length == 0:
|
||
|
length = 1
|
||
|
|
||
|
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, length,
|
||
|
io.BytesIO(chunk), metadata)
|
||
|
assert error is None
|
||
|
assert len(chunk) == bytes_written
|
||
|
offset += len(chunk)
|
||
|
|
||
|
swift.complete_chunked_upload(uuid, 'somepath', metadata)
|
||
|
assert ''.join(chunks) == swift.get_content('somepath')
|
||
|
|
||
|
# Check the chunk deletion queue and ensure we have the last chunk queued.
|
||
|
found = chunk_cleanup_queue.get()
|
||
|
assert found is not None
|
||
|
|
||
|
found2 = chunk_cleanup_queue.get()
|
||
|
assert found2 is None
|
||
|
|
||
|
@pytest.mark.parametrize('temp_url_key, expects_url', [
|
||
|
(None, False),
|
||
|
('foobarbaz', True),
|
||
|
('exception', False),
|
||
|
])
|
||
|
def test_get_direct_download_url(temp_url_key, expects_url):
|
||
|
swift = FakeSwiftStorage(temp_url_key=temp_url_key, **base_args)
|
||
|
swift.put_content('somepath', 'hello world!')
|
||
|
assert (swift.get_direct_download_url('somepath') is not None) == expects_url
|