This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/storage/test/test_swift.py
Joseph Schorr 57523d22de Have Swift storage delete segments when deleting dynamic large objects
This ensures that we reclaim the space, rather than simply deleting the manifest

Fixes https://jira.coreos.com/browse/QUAY-942
2018-05-16 16:01:49 -04:00

320 lines
10 KiB
Python

import io
import pytest
import hashlib
import copy
from collections import defaultdict
from mock import MagicMock, patch
from storage import StorageContext
from storage.swift import SwiftStorage
from swiftclient.client import ClientException
base_args = {
'context': StorageContext('nyc', None, None, None, None),
'swift_container': 'container-name',
'storage_path': '/basepath',
'auth_url': 'https://auth.com',
'swift_user': 'root',
'swift_password': 'password',
}
class MockSwiftStorage(SwiftStorage):
def __init__(self, *args, **kwargs):
super(MockSwiftStorage, self).__init__(*args, **kwargs)
self._connection = MagicMock()
def _get_connection(self):
return self._connection
class FakeSwiftStorage(SwiftStorage):
def __init__(self, fail_checksum=False,connection=None, *args, **kwargs):
super(FakeSwiftStorage, self).__init__(*args, **kwargs)
self._connection = connection or FakeSwift(fail_checksum=fail_checksum,
temp_url_key=kwargs.get('temp_url_key'))
def _get_connection(self):
return self._connection
class FakeSwift(object):
def __init__(self, fail_checksum=False, temp_url_key=None):
self.containers = defaultdict(dict)
self.fail_checksum = fail_checksum
self.temp_url_key = temp_url_key
def get_auth(self):
if self.temp_url_key == 'exception':
raise ClientException('I failed!')
return 'http://fake/swift', None
def head_object(self, container, path):
return self.containers.get(container, {}).get(path, {}).get('headers', None)
def copy_object(self, container, path, target):
pieces = target.split('/', 2)
_, content = self.get_object(container, path)
self.put_object(pieces[1], pieces[2], content)
def get_container(self, container, prefix=None, full_listing=None):
container_entries = self.containers[container]
objs = []
for path, data in list(container_entries.iteritems()):
if not prefix or path.startswith(prefix):
objs.append({
'name': path,
'bytes': len(data['content']),
})
return {}, objs
def put_object(self, container, path, content, chunk_size=None, content_type=None, headers=None):
if not isinstance(content, str):
if hasattr(content, 'read'):
content = content.read()
else:
content = ''.join(content)
self.containers[container][path] = {
'content': content,
'chunk_size': chunk_size,
'content_type': content_type,
'headers': headers or {'is': True},
}
digest = hashlib.md5()
digest.update(content)
return digest.hexdigest() if not self.fail_checksum else 'invalid'
def get_object(self, container, path, resp_chunk_size=None):
data = self.containers[container].get(path, {})
if 'X-Object-Manifest' in data['headers']:
new_contents = []
prefix = data['headers']['X-Object-Manifest']
for key, value in self.containers[container].iteritems():
if ('container-name/' + key).startswith(prefix):
new_contents.append((key, value['content']))
new_contents.sort(key=lambda value: value[0])
data = dict(data)
data['content'] = ''.join([nc[1] for nc in new_contents])
return bool(data), data.get('content')
return bool(data), data.get('content')
def delete_object(self, container, path):
self.containers[container].pop(path, None)
class FakeQueue(object):
def __init__(self):
self.items = []
def get(self):
if not self.items:
return None
return self.items.pop()
def put(self, names, item, available_after=0):
self.items.append({
'names': names,
'item': item,
'available_after': available_after,
})
def test_fixed_path_concat():
swift = MockSwiftStorage(**base_args)
swift.exists('object/path')
swift._get_connection().head_object.assert_called_with('container-name', 'basepath/object/path')
def test_simple_path_concat():
simple_concat_args = dict(base_args)
simple_concat_args['simple_path_concat'] = True
swift = MockSwiftStorage(**simple_concat_args)
swift.exists('object/path')
swift._get_connection().head_object.assert_called_with('container-name', 'basepathobject/path')
def test_delete_unknown_path():
swift = SwiftStorage(**base_args)
with pytest.raises(IOError):
swift.remove('someunknownpath')
def test_simple_put_get():
swift = FakeSwiftStorage(**base_args)
assert not swift.exists('somepath')
swift.put_content('somepath', 'hello world!')
assert swift.exists('somepath')
assert swift.get_content('somepath') == 'hello world!'
def test_stream_read_write():
swift = FakeSwiftStorage(**base_args)
assert not swift.exists('somepath')
swift.stream_write('somepath', io.BytesIO('some content here'))
assert swift.exists('somepath')
assert swift.get_content('somepath') == 'some content here'
assert ''.join(list(swift.stream_read('somepath'))) == 'some content here'
def test_stream_read_write_invalid_checksum():
swift = FakeSwiftStorage(fail_checksum=True, **base_args)
assert not swift.exists('somepath')
with pytest.raises(IOError):
swift.stream_write('somepath', io.BytesIO('some content here'))
def test_remove():
swift = FakeSwiftStorage(**base_args)
assert not swift.exists('somepath')
swift.put_content('somepath', 'hello world!')
assert swift.exists('somepath')
swift.remove('somepath')
assert not swift.exists('somepath')
def test_copy_to():
swift = FakeSwiftStorage(**base_args)
modified_args = copy.deepcopy(base_args)
modified_args['swift_container'] = 'another_container'
another_swift = FakeSwiftStorage(connection=swift._connection, **modified_args)
swift.put_content('somepath', 'some content here')
swift.copy_to(another_swift, 'somepath')
assert swift.exists('somepath')
assert another_swift.exists('somepath')
assert swift.get_content('somepath') == 'some content here'
assert another_swift.get_content('somepath') == 'some content here'
def test_copy_to_different():
swift = FakeSwiftStorage(**base_args)
modified_args = copy.deepcopy(base_args)
modified_args['swift_user'] = 'foobarbaz'
modified_args['swift_container'] = 'another_container'
another_swift = FakeSwiftStorage(**modified_args)
swift.put_content('somepath', 'some content here')
swift.copy_to(another_swift, 'somepath')
assert swift.exists('somepath')
assert another_swift.exists('somepath')
assert swift.get_content('somepath') == 'some content here'
assert another_swift.get_content('somepath') == 'some content here'
def test_checksum():
swift = FakeSwiftStorage(**base_args)
swift.put_content('somepath', 'hello world!')
assert swift.get_checksum('somepath') is not None
@pytest.mark.parametrize('read_until_end', [
(True,),
(False,),
])
@pytest.mark.parametrize('max_chunk_size', [
(10000000),
(10),
(5),
(2),
(1),
])
@pytest.mark.parametrize('chunks', [
(['this', 'is', 'some', 'chunked', 'data', '']),
(['this is a very large chunk of data', '']),
(['h', 'e', 'l', 'l', 'o', '']),
])
def test_chunked_upload(chunks, max_chunk_size, read_until_end):
swift = FakeSwiftStorage(**base_args)
uuid, metadata = swift.initiate_chunked_upload()
offset = 0
with patch('storage.swift._MAXIMUM_SEGMENT_SIZE', max_chunk_size):
for chunk in chunks:
chunk_length = len(chunk) if not read_until_end else -1
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, chunk_length,
io.BytesIO(chunk), metadata)
assert error is None
assert len(chunk) == bytes_written
offset += len(chunk)
swift.complete_chunked_upload(uuid, 'somepath', metadata)
assert swift.get_content('somepath') == ''.join(chunks)
# Ensure each of the segments exist.
for segment in metadata['segments']:
assert swift.exists(segment.path)
# Delete the file and ensure all of its segments were removed.
swift.remove('somepath')
assert not swift.exists('somepath')
for segment in metadata['segments']:
assert not swift.exists(segment.path)
def test_cancel_chunked_upload():
swift = FakeSwiftStorage(**base_args)
uuid, metadata = swift.initiate_chunked_upload()
chunks = ['this', 'is', 'some', 'chunked', 'data', '']
offset = 0
for chunk in chunks:
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, len(chunk),
io.BytesIO(chunk), metadata)
assert error is None
assert len(chunk) == bytes_written
offset += len(chunk)
swift.cancel_chunked_upload(uuid, metadata)
for segment in SwiftStorage._segment_list_from_metadata(metadata):
assert not swift.exists(segment.path)
def test_empty_chunks_queued_for_deletion():
chunk_cleanup_queue = FakeQueue()
args = dict(base_args)
args['context'] = StorageContext('nyc', None, chunk_cleanup_queue, None, None)
swift = FakeSwiftStorage(**args)
uuid, metadata = swift.initiate_chunked_upload()
chunks = ['this', '', 'is', 'some', '', 'chunked', 'data', '']
offset = 0
for chunk in chunks:
length = len(chunk)
if length == 0:
length = 1
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, length,
io.BytesIO(chunk), metadata)
assert error is None
assert len(chunk) == bytes_written
offset += len(chunk)
swift.complete_chunked_upload(uuid, 'somepath', metadata)
assert ''.join(chunks) == swift.get_content('somepath')
# Check the chunk deletion queue and ensure we have the last chunk queued.
found = chunk_cleanup_queue.get()
assert found is not None
found2 = chunk_cleanup_queue.get()
assert found2 is None
@pytest.mark.parametrize('temp_url_key, expects_url', [
(None, False),
('foobarbaz', True),
('exception', False),
])
def test_get_direct_download_url(temp_url_key, expects_url):
swift = FakeSwiftStorage(temp_url_key=temp_url_key, **base_args)
swift.put_content('somepath', 'hello world!')
assert (swift.get_direct_download_url('somepath') is not None) == expects_url