Add mocked unit tests for cloud storage engine
This commit is contained in:
parent
d33804c22e
commit
eab6af2b87
4 changed files with 168 additions and 12 deletions
|
@ -61,4 +61,5 @@ redlock
|
||||||
semantic-version
|
semantic-version
|
||||||
bencode
|
bencode
|
||||||
cryptography==1.1.2 # Remove version when https://github.com/pyca/cryptography/issues/2690 fixed
|
cryptography==1.1.2 # Remove version when https://github.com/pyca/cryptography/issues/2690 fixed
|
||||||
httmock
|
httmock
|
||||||
|
moto
|
||||||
|
|
|
@ -47,6 +47,7 @@ MarkupSafe==0.23
|
||||||
mixpanel==4.3.0
|
mixpanel==4.3.0
|
||||||
mock==1.3.0
|
mock==1.3.0
|
||||||
monotonic==0.6
|
monotonic==0.6
|
||||||
|
moto==0.4.23
|
||||||
msgpack-python==0.4.7
|
msgpack-python==0.4.7
|
||||||
ndg-httpsclient==0.4.0
|
ndg-httpsclient==0.4.0
|
||||||
netaddr==0.7.18
|
netaddr==0.7.18
|
||||||
|
|
|
@ -163,7 +163,9 @@ class _CloudStorage(BaseStorageV2):
|
||||||
if content_encoding is not None:
|
if content_encoding is not None:
|
||||||
metadata['Content-Encoding'] = content_encoding
|
metadata['Content-Encoding'] = content_encoding
|
||||||
|
|
||||||
self._metric_queue.put('MultipartUploadStart', 1)
|
if self._metric_queue is not None:
|
||||||
|
self._metric_queue.put('MultipartUploadStart', 1)
|
||||||
|
|
||||||
return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
|
return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
|
||||||
**self._upload_params)
|
**self._upload_params)
|
||||||
|
|
||||||
|
@ -200,7 +202,10 @@ class _CloudStorage(BaseStorageV2):
|
||||||
except IOError as ex:
|
except IOError as ex:
|
||||||
logger.warn('stream write error: %s', ex)
|
logger.warn('stream write error: %s', ex)
|
||||||
error = ex
|
error = ex
|
||||||
self._metric_queue.put('MultipartUploadFailure', 1)
|
|
||||||
|
if self._metric_queue is not None:
|
||||||
|
self._metric_queue.put('MultipartUploadFailure', 1)
|
||||||
|
|
||||||
if cancel_on_error:
|
if cancel_on_error:
|
||||||
mp.cancel_upload()
|
mp.cancel_upload()
|
||||||
return 0, error
|
return 0, error
|
||||||
|
@ -208,7 +213,9 @@ class _CloudStorage(BaseStorageV2):
|
||||||
break
|
break
|
||||||
|
|
||||||
if total_bytes_written > 0:
|
if total_bytes_written > 0:
|
||||||
self._metric_queue.put('MultipartUploadSuccess', 1)
|
if self._metric_queue is not None:
|
||||||
|
self._metric_queue.put('MultipartUploadSuccess', 1)
|
||||||
|
|
||||||
mp.complete_upload()
|
mp.complete_upload()
|
||||||
return total_bytes_written, error
|
return total_bytes_written, error
|
||||||
|
|
||||||
|
@ -334,18 +341,20 @@ class _CloudStorage(BaseStorageV2):
|
||||||
msg = 'Failed to clean up chunk %s for reupload of %s'
|
msg = 'Failed to clean up chunk %s for reupload of %s'
|
||||||
logger.exception(msg, chunk.path, final_path)
|
logger.exception(msg, chunk.path, final_path)
|
||||||
|
|
||||||
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
|
def complete_chunked_upload(self, uuid, final_path, storage_metadata, force_client_side=False):
|
||||||
self._initialize_cloud_conn()
|
self._initialize_cloud_conn()
|
||||||
|
chunk_list = self._chunk_list_from_metadata(storage_metadata)
|
||||||
|
|
||||||
# Here is where things get interesting: we are going to try to assemble this server side
|
# Here is where things get interesting: we are going to try to assemble this server side
|
||||||
# In order to be a candidate all parts (after offsets have been computed) must be at least 5MB
|
# In order to be a candidate all parts (after offsets have been computed) must be at least 5MB
|
||||||
server_side_assembly = True
|
server_side_assembly = False
|
||||||
chunk_list = self._chunk_list_from_metadata(storage_metadata)
|
if not force_client_side:
|
||||||
for chunk_offset, chunk in enumerate(chunk_list):
|
server_side_assembly = True
|
||||||
# If the chunk is both too small, and not the last chunk, we rule out server side assembly
|
for chunk_offset, chunk in enumerate(chunk_list):
|
||||||
if chunk.length < self.automatic_chunk_size and (chunk_offset + 1) < len(chunk_list):
|
# If the chunk is both too small, and not the last chunk, we rule out server side assembly
|
||||||
server_side_assembly = False
|
if chunk.length < self.automatic_chunk_size and (chunk_offset + 1) < len(chunk_list):
|
||||||
break
|
server_side_assembly = False
|
||||||
|
break
|
||||||
|
|
||||||
if server_side_assembly:
|
if server_side_assembly:
|
||||||
logger.debug('Performing server side assembly of multi-part upload for: %s', final_path)
|
logger.debug('Performing server side assembly of multi-part upload for: %s', final_path)
|
||||||
|
|
145
test/test_cloud_storage.py
Normal file
145
test/test_cloud_storage.py
Normal file
|
@ -0,0 +1,145 @@
|
||||||
|
import unittest
|
||||||
|
import moto
|
||||||
|
import boto
|
||||||
|
import os
|
||||||
|
|
||||||
|
from storage import S3Storage
|
||||||
|
from storage.cloud import _CHUNKS_KEY
|
||||||
|
from StringIO import StringIO
|
||||||
|
|
||||||
|
_TEST_CONTENT = os.urandom(1024)
|
||||||
|
_TEST_BUCKET = 'some_bucket'
|
||||||
|
_TEST_USER = 'someuser'
|
||||||
|
_TEST_PASSWORD = 'somepassword'
|
||||||
|
_TEST_PATH = 'some/cool/path'
|
||||||
|
|
||||||
|
class TestCloudStorage(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.mock = moto.mock_s3()
|
||||||
|
self.mock.start()
|
||||||
|
|
||||||
|
# Create a test bucket and put some test content.
|
||||||
|
boto.connect_s3().create_bucket(_TEST_BUCKET)
|
||||||
|
self.engine = S3Storage(None, 'some/path', _TEST_USER, _TEST_PASSWORD, _TEST_BUCKET)
|
||||||
|
self.engine.put_content(_TEST_PATH, _TEST_CONTENT)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.mock.stop()
|
||||||
|
|
||||||
|
def test_basicop(self):
|
||||||
|
# Ensure the content exists.
|
||||||
|
self.assertTrue(self.engine.exists(_TEST_PATH))
|
||||||
|
|
||||||
|
# Verify it can be retrieved.
|
||||||
|
self.assertEquals(_TEST_CONTENT, self.engine.get_content(_TEST_PATH))
|
||||||
|
|
||||||
|
# Retrieve a checksum for the content.
|
||||||
|
self.engine.get_checksum(_TEST_PATH)
|
||||||
|
|
||||||
|
# Remove the file.
|
||||||
|
self.engine.remove(_TEST_PATH)
|
||||||
|
|
||||||
|
# Ensure it no longer exists.
|
||||||
|
with self.assertRaises(IOError):
|
||||||
|
self.engine.get_content(_TEST_PATH)
|
||||||
|
|
||||||
|
with self.assertRaises(IOError):
|
||||||
|
self.engine.get_checksum(_TEST_PATH)
|
||||||
|
|
||||||
|
self.assertFalse(self.engine.exists(_TEST_PATH))
|
||||||
|
|
||||||
|
def test_copy_samecreds(self):
|
||||||
|
# Copy the content to another engine.
|
||||||
|
another_engine = S3Storage(None, 'another/path', _TEST_USER, _TEST_PASSWORD, _TEST_BUCKET)
|
||||||
|
self.engine.copy_to(another_engine, _TEST_PATH)
|
||||||
|
|
||||||
|
# Verify it can be retrieved.
|
||||||
|
self.assertEquals(_TEST_CONTENT, another_engine.get_content(_TEST_PATH))
|
||||||
|
|
||||||
|
def test_copy_differentcreds(self):
|
||||||
|
# Copy the content to another engine.
|
||||||
|
another_engine = S3Storage(None, 'another/path', 'blech', 'password', 'another_bucket')
|
||||||
|
boto.connect_s3().create_bucket('another_bucket')
|
||||||
|
|
||||||
|
self.engine.copy_to(another_engine, _TEST_PATH)
|
||||||
|
|
||||||
|
# Verify it can be retrieved.
|
||||||
|
self.assertEquals(_TEST_CONTENT, another_engine.get_content(_TEST_PATH))
|
||||||
|
|
||||||
|
def test_stream_read(self):
|
||||||
|
# Read the streaming content.
|
||||||
|
data = ''.join(self.engine.stream_read(_TEST_PATH))
|
||||||
|
self.assertEquals(_TEST_CONTENT, data)
|
||||||
|
|
||||||
|
def test_stream_read_file(self):
|
||||||
|
with self.engine.stream_read_file(_TEST_PATH) as f:
|
||||||
|
self.assertEquals(_TEST_CONTENT, f.read())
|
||||||
|
|
||||||
|
def test_stream_write(self):
|
||||||
|
new_data = os.urandom(4096)
|
||||||
|
self.engine.stream_write(_TEST_PATH, StringIO(new_data), content_type='Cool/Type')
|
||||||
|
self.assertEquals(new_data, self.engine.get_content(_TEST_PATH))
|
||||||
|
|
||||||
|
def test_chunked_upload_single_chunk(self):
|
||||||
|
self._chunk_upload_test(1)
|
||||||
|
|
||||||
|
def test_chunked_upload_multiple_chunks(self):
|
||||||
|
self._chunk_upload_test(50)
|
||||||
|
|
||||||
|
def test_chunked_upload_single_chunk_client_side(self):
|
||||||
|
self._chunk_upload_test(1, force_client_side=True)
|
||||||
|
|
||||||
|
def test_chunked_upload_multiple_chunks_client_side(self):
|
||||||
|
self._chunk_upload_test(50, force_client_side=True)
|
||||||
|
|
||||||
|
def _chunk_upload_test(self, chunk_count, force_client_side=False):
|
||||||
|
upload_id, metadata = self.engine.initiate_chunked_upload()
|
||||||
|
final_data = ''
|
||||||
|
|
||||||
|
for index in range(0, chunk_count):
|
||||||
|
chunk_data = os.urandom(1024)
|
||||||
|
final_data = final_data + chunk_data
|
||||||
|
bytes_written, new_metadata, error = self.engine.stream_upload_chunk(upload_id, 0,
|
||||||
|
len(chunk_data),
|
||||||
|
StringIO(chunk_data),
|
||||||
|
metadata)
|
||||||
|
metadata = new_metadata
|
||||||
|
|
||||||
|
self.assertEquals(len(chunk_data), bytes_written)
|
||||||
|
self.assertIsNone(error)
|
||||||
|
self.assertEquals(index + 1, len(metadata[_CHUNKS_KEY]))
|
||||||
|
|
||||||
|
# Complete the chunked upload.
|
||||||
|
self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata,
|
||||||
|
force_client_side=force_client_side)
|
||||||
|
|
||||||
|
# Ensure the file contents are valid.
|
||||||
|
self.assertEquals(final_data, self.engine.get_content('some/chunked/path'))
|
||||||
|
|
||||||
|
def test_cancel_chunked_upload_single_chunk(self):
|
||||||
|
self._cancel_chunked_upload_test(1)
|
||||||
|
|
||||||
|
def test_cancel_chunked_upload_multiple_chunks(self):
|
||||||
|
self._cancel_chunked_upload_test(50)
|
||||||
|
|
||||||
|
def _cancel_chunked_upload_test(self, chunk_count):
|
||||||
|
upload_id, metadata = self.engine.initiate_chunked_upload()
|
||||||
|
|
||||||
|
for _ in range(0, chunk_count):
|
||||||
|
chunk_data = os.urandom(1024)
|
||||||
|
_, new_metadata, _ = self.engine.stream_upload_chunk(upload_id, 0,
|
||||||
|
len(chunk_data),
|
||||||
|
StringIO(chunk_data),
|
||||||
|
metadata)
|
||||||
|
metadata = new_metadata
|
||||||
|
|
||||||
|
# Cancel the upload.
|
||||||
|
self.engine.cancel_chunked_upload(upload_id, metadata)
|
||||||
|
|
||||||
|
# Ensure all chunks were deleted.
|
||||||
|
for chunk in metadata[_CHUNKS_KEY]:
|
||||||
|
self.assertFalse(self.engine.exists(chunk.path))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
Reference in a new issue