From eab6af2b87c28d82a79cdebfcd54eb8058b8baf5 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 22 Mar 2016 18:16:48 -0400 Subject: [PATCH] Add mocked unit tests for cloud storage engine --- requirements-nover.txt | 3 +- requirements.txt | 1 + storage/cloud.py | 31 +++++--- test/test_cloud_storage.py | 145 +++++++++++++++++++++++++++++++++++++ 4 files changed, 168 insertions(+), 12 deletions(-) create mode 100644 test/test_cloud_storage.py diff --git a/requirements-nover.txt b/requirements-nover.txt index c671f52b6..4ef530475 100644 --- a/requirements-nover.txt +++ b/requirements-nover.txt @@ -61,4 +61,5 @@ redlock semantic-version bencode cryptography==1.1.2 # Remove version when https://github.com/pyca/cryptography/issues/2690 fixed -httmock \ No newline at end of file +httmock +moto diff --git a/requirements.txt b/requirements.txt index 4feffb10d..c1af2bcb5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -47,6 +47,7 @@ MarkupSafe==0.23 mixpanel==4.3.0 mock==1.3.0 monotonic==0.6 +moto==0.4.23 msgpack-python==0.4.7 ndg-httpsclient==0.4.0 netaddr==0.7.18 diff --git a/storage/cloud.py b/storage/cloud.py index d204efea9..1d8ff4673 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -163,7 +163,9 @@ class _CloudStorage(BaseStorageV2): if content_encoding is not None: metadata['Content-Encoding'] = content_encoding - self._metric_queue.put('MultipartUploadStart', 1) + if self._metric_queue is not None: + self._metric_queue.put('MultipartUploadStart', 1) + return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata, **self._upload_params) @@ -200,7 +202,10 @@ class _CloudStorage(BaseStorageV2): except IOError as ex: logger.warn('stream write error: %s', ex) error = ex - self._metric_queue.put('MultipartUploadFailure', 1) + + if self._metric_queue is not None: + self._metric_queue.put('MultipartUploadFailure', 1) + if cancel_on_error: mp.cancel_upload() return 0, error @@ -208,7 +213,9 @@ class _CloudStorage(BaseStorageV2): break if total_bytes_written > 0: - self._metric_queue.put('MultipartUploadSuccess', 1) + if self._metric_queue is not None: + self._metric_queue.put('MultipartUploadSuccess', 1) + mp.complete_upload() return total_bytes_written, error @@ -334,18 +341,20 @@ class _CloudStorage(BaseStorageV2): msg = 'Failed to clean up chunk %s for reupload of %s' logger.exception(msg, chunk.path, final_path) - def complete_chunked_upload(self, uuid, final_path, storage_metadata): + def complete_chunked_upload(self, uuid, final_path, storage_metadata, force_client_side=False): self._initialize_cloud_conn() + chunk_list = self._chunk_list_from_metadata(storage_metadata) # Here is where things get interesting: we are going to try to assemble this server side # In order to be a candidate all parts (after offsets have been computed) must be at least 5MB - server_side_assembly = True - chunk_list = self._chunk_list_from_metadata(storage_metadata) - for chunk_offset, chunk in enumerate(chunk_list): - # If the chunk is both too small, and not the last chunk, we rule out server side assembly - if chunk.length < self.automatic_chunk_size and (chunk_offset + 1) < len(chunk_list): - server_side_assembly = False - break + server_side_assembly = False + if not force_client_side: + server_side_assembly = True + for chunk_offset, chunk in enumerate(chunk_list): + # If the chunk is both too small, and not the last chunk, we rule out server side assembly + if chunk.length < self.automatic_chunk_size and (chunk_offset + 1) < len(chunk_list): + server_side_assembly = False + break if server_side_assembly: logger.debug('Performing server side assembly of multi-part upload for: %s', final_path) diff --git a/test/test_cloud_storage.py b/test/test_cloud_storage.py new file mode 100644 index 000000000..cdde73068 --- /dev/null +++ b/test/test_cloud_storage.py @@ -0,0 +1,145 @@ +import unittest +import moto +import boto +import os + +from storage import S3Storage +from storage.cloud import _CHUNKS_KEY +from StringIO import StringIO + +_TEST_CONTENT = os.urandom(1024) +_TEST_BUCKET = 'some_bucket' +_TEST_USER = 'someuser' +_TEST_PASSWORD = 'somepassword' +_TEST_PATH = 'some/cool/path' + +class TestCloudStorage(unittest.TestCase): + def setUp(self): + self.mock = moto.mock_s3() + self.mock.start() + + # Create a test bucket and put some test content. + boto.connect_s3().create_bucket(_TEST_BUCKET) + self.engine = S3Storage(None, 'some/path', _TEST_USER, _TEST_PASSWORD, _TEST_BUCKET) + self.engine.put_content(_TEST_PATH, _TEST_CONTENT) + + def tearDown(self): + self.mock.stop() + + def test_basicop(self): + # Ensure the content exists. + self.assertTrue(self.engine.exists(_TEST_PATH)) + + # Verify it can be retrieved. + self.assertEquals(_TEST_CONTENT, self.engine.get_content(_TEST_PATH)) + + # Retrieve a checksum for the content. + self.engine.get_checksum(_TEST_PATH) + + # Remove the file. + self.engine.remove(_TEST_PATH) + + # Ensure it no longer exists. + with self.assertRaises(IOError): + self.engine.get_content(_TEST_PATH) + + with self.assertRaises(IOError): + self.engine.get_checksum(_TEST_PATH) + + self.assertFalse(self.engine.exists(_TEST_PATH)) + + def test_copy_samecreds(self): + # Copy the content to another engine. + another_engine = S3Storage(None, 'another/path', _TEST_USER, _TEST_PASSWORD, _TEST_BUCKET) + self.engine.copy_to(another_engine, _TEST_PATH) + + # Verify it can be retrieved. + self.assertEquals(_TEST_CONTENT, another_engine.get_content(_TEST_PATH)) + + def test_copy_differentcreds(self): + # Copy the content to another engine. + another_engine = S3Storage(None, 'another/path', 'blech', 'password', 'another_bucket') + boto.connect_s3().create_bucket('another_bucket') + + self.engine.copy_to(another_engine, _TEST_PATH) + + # Verify it can be retrieved. + self.assertEquals(_TEST_CONTENT, another_engine.get_content(_TEST_PATH)) + + def test_stream_read(self): + # Read the streaming content. + data = ''.join(self.engine.stream_read(_TEST_PATH)) + self.assertEquals(_TEST_CONTENT, data) + + def test_stream_read_file(self): + with self.engine.stream_read_file(_TEST_PATH) as f: + self.assertEquals(_TEST_CONTENT, f.read()) + + def test_stream_write(self): + new_data = os.urandom(4096) + self.engine.stream_write(_TEST_PATH, StringIO(new_data), content_type='Cool/Type') + self.assertEquals(new_data, self.engine.get_content(_TEST_PATH)) + + def test_chunked_upload_single_chunk(self): + self._chunk_upload_test(1) + + def test_chunked_upload_multiple_chunks(self): + self._chunk_upload_test(50) + + def test_chunked_upload_single_chunk_client_side(self): + self._chunk_upload_test(1, force_client_side=True) + + def test_chunked_upload_multiple_chunks_client_side(self): + self._chunk_upload_test(50, force_client_side=True) + + def _chunk_upload_test(self, chunk_count, force_client_side=False): + upload_id, metadata = self.engine.initiate_chunked_upload() + final_data = '' + + for index in range(0, chunk_count): + chunk_data = os.urandom(1024) + final_data = final_data + chunk_data + bytes_written, new_metadata, error = self.engine.stream_upload_chunk(upload_id, 0, + len(chunk_data), + StringIO(chunk_data), + metadata) + metadata = new_metadata + + self.assertEquals(len(chunk_data), bytes_written) + self.assertIsNone(error) + self.assertEquals(index + 1, len(metadata[_CHUNKS_KEY])) + + # Complete the chunked upload. + self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata, + force_client_side=force_client_side) + + # Ensure the file contents are valid. + self.assertEquals(final_data, self.engine.get_content('some/chunked/path')) + + def test_cancel_chunked_upload_single_chunk(self): + self._cancel_chunked_upload_test(1) + + def test_cancel_chunked_upload_multiple_chunks(self): + self._cancel_chunked_upload_test(50) + + def _cancel_chunked_upload_test(self, chunk_count): + upload_id, metadata = self.engine.initiate_chunked_upload() + + for _ in range(0, chunk_count): + chunk_data = os.urandom(1024) + _, new_metadata, _ = self.engine.stream_upload_chunk(upload_id, 0, + len(chunk_data), + StringIO(chunk_data), + metadata) + metadata = new_metadata + + # Cancel the upload. + self.engine.cancel_chunked_upload(upload_id, metadata) + + # Ensure all chunks were deleted. + for chunk in metadata[_CHUNKS_KEY]: + self.assertFalse(self.engine.exists(chunk.path)) + + +if __name__ == '__main__': + unittest.main()