From 4aee8116114d5be6b77ec6730021391bf6905b46 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 17 Jul 2018 16:14:23 -0400 Subject: [PATCH] Move cloud storage tests to pytest --- storage/test/test_cloud_storage.py | 246 +++++++++++++++++++++++++++++ test/test_cloud_storage.py | 234 --------------------------- 2 files changed, 246 insertions(+), 234 deletions(-) create mode 100644 storage/test/test_cloud_storage.py delete mode 100644 test/test_cloud_storage.py diff --git a/storage/test/test_cloud_storage.py b/storage/test/test_cloud_storage.py new file mode 100644 index 000000000..a57677c89 --- /dev/null +++ b/storage/test/test_cloud_storage.py @@ -0,0 +1,246 @@ +import os + +from StringIO import StringIO + +import pytest + +import moto +import boto + +from storage import S3Storage, StorageContext +from storage.cloud import _CloudStorage, _PartUploadMetadata +from storage.cloud import _CHUNKS_KEY + +_TEST_CONTENT = os.urandom(1024) +_TEST_BUCKET = 'some_bucket' +_TEST_USER = 'someuser' +_TEST_PASSWORD = 'somepassword' +_TEST_PATH = 'some/cool/path' +_TEST_CONTEXT = StorageContext('nyc', None, None, None, None) + +@pytest.fixture() +def storage_engine(): + mock = moto.mock_s3() + mock.start() + + # Create a test bucket and put some test content. + boto.connect_s3().create_bucket(_TEST_BUCKET) + engine = S3Storage(_TEST_CONTEXT, 'some/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD) + engine.put_content(_TEST_PATH, _TEST_CONTENT) + + yield engine + + mock.stop() + + +def test_basicop(storage_engine): + # Ensure the content exists. + assert storage_engine.exists(_TEST_PATH) + + # Verify it can be retrieved. + assert storage_engine.get_content(_TEST_PATH) == _TEST_CONTENT + + # Retrieve a checksum for the content. + storage_engine.get_checksum(_TEST_PATH) + + # Remove the file. + storage_engine.remove(_TEST_PATH) + + # Ensure it no longer exists. + with pytest.raises(IOError): + storage_engine.get_content(_TEST_PATH) + + with pytest.raises(IOError): + storage_engine.get_checksum(_TEST_PATH) + + assert not storage_engine.exists(_TEST_PATH) + + +def test_copy_samecreds(storage_engine): + # Copy the content to another engine. + another_engine = S3Storage(_TEST_CONTEXT, 'another/path', _TEST_BUCKET, _TEST_USER, + _TEST_PASSWORD) + storage_engine.copy_to(another_engine, _TEST_PATH) + + # Verify it can be retrieved. + assert another_engine.get_content(_TEST_PATH) == _TEST_CONTENT + + +def test_copy_differentcreds(storage_engine): + # Copy the content to another engine. + another_engine = S3Storage(_TEST_CONTEXT, 'another/path', 'another_bucket', 'blech', + 'password') + boto.connect_s3().create_bucket('another_bucket') + + storage_engine.copy_to(another_engine, _TEST_PATH) + + # Verify it can be retrieved. + assert another_engine.get_content(_TEST_PATH) == _TEST_CONTENT + + +def test_stream_read(storage_engine): + # Read the streaming content. + data = ''.join(storage_engine.stream_read(_TEST_PATH)) + assert data == _TEST_CONTENT + + +def test_stream_read_file(storage_engine): + with storage_engine.stream_read_file(_TEST_PATH) as f: + assert f.read() == _TEST_CONTENT + + +def test_stream_write(storage_engine): + new_data = os.urandom(4096) + storage_engine.stream_write(_TEST_PATH, StringIO(new_data), content_type='Cool/Type') + assert storage_engine.get_content(_TEST_PATH) == new_data + + +@pytest.mark.parametrize('chunk_count', [ + 0, + 1, + 50, +]) +@pytest.mark.parametrize('force_client_side', [ + False, + True +]) +def test_chunk_upload(storage_engine, chunk_count, force_client_side): + if chunk_count == 0 and force_client_side: + return + + upload_id, metadata = storage_engine.initiate_chunked_upload() + final_data = '' + + for index in range(0, chunk_count): + chunk_data = os.urandom(1024) + final_data = final_data + chunk_data + bytes_written, new_metadata, error = storage_engine.stream_upload_chunk(upload_id, 0, + len(chunk_data), + StringIO(chunk_data), + metadata) + metadata = new_metadata + + assert bytes_written == len(chunk_data) + assert error is None + assert len(metadata[_CHUNKS_KEY]) == index + 1 + + # Complete the chunked upload. + storage_engine.complete_chunked_upload(upload_id, 'some/chunked/path', metadata, + force_client_side=force_client_side) + + # Ensure the file contents are valid. + assert storage_engine.get_content('some/chunked/path') == final_data + + +@pytest.mark.parametrize('chunk_count', [ + 0, + 1, + 50, +]) +def test_cancel_chunked_upload(storage_engine, chunk_count): + upload_id, metadata = storage_engine.initiate_chunked_upload() + + for _ in range(0, chunk_count): + chunk_data = os.urandom(1024) + _, new_metadata, _ = storage_engine.stream_upload_chunk(upload_id, 0, + len(chunk_data), + StringIO(chunk_data), + metadata) + metadata = new_metadata + + # Cancel the upload. + storage_engine.cancel_chunked_upload(upload_id, metadata) + + # Ensure all chunks were deleted. + for chunk in metadata[_CHUNKS_KEY]: + assert not storage_engine.exists(chunk.path) + + +def test_large_chunks_upload(storage_engine): + # Make the max chunk size much smaller for testing. + storage_engine.maximum_chunk_size = storage_engine.minimum_chunk_size * 2 + + upload_id, metadata = storage_engine.initiate_chunked_upload() + + # Write a "super large" chunk, to ensure that it is broken into smaller chunks. + chunk_data = os.urandom(int(storage_engine.maximum_chunk_size * 2.5)) + bytes_written, new_metadata, _ = storage_engine.stream_upload_chunk(upload_id, 0, + -1, + StringIO(chunk_data), + metadata) + assert len(chunk_data) == bytes_written + + # Complete the chunked upload. + storage_engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata) + + # Ensure the file contents are valid. + assert len(chunk_data) == len(storage_engine.get_content('some/chunked/path')) + assert storage_engine.get_content('some/chunked/path') == chunk_data + + +def test_large_chunks_with_ragged_edge(storage_engine): + # Make the max chunk size much smaller for testing and force it to have a ragged edge. + storage_engine.maximum_chunk_size = storage_engine.minimum_chunk_size * 2 + 10 + + upload_id, metadata = storage_engine.initiate_chunked_upload() + + # Write a few "super large" chunks, to ensure that it is broken into smaller chunks. + all_data = '' + for _ in range(0, 2): + chunk_data = os.urandom(int(storage_engine.maximum_chunk_size) + 20) + bytes_written, new_metadata, _ = storage_engine.stream_upload_chunk(upload_id, 0, + -1, + StringIO(chunk_data), + metadata) + assert len(chunk_data) == bytes_written + all_data = all_data + chunk_data + metadata = new_metadata + + # Complete the chunked upload. + storage_engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata) + + # Ensure the file contents are valid. + assert len(all_data) == len(storage_engine.get_content('some/chunked/path')) + assert storage_engine.get_content('some/chunked/path') == all_data + + +@pytest.mark.parametrize('max_size, parts', [ + (50, [ + _PartUploadMetadata('foo', 0, 50), + _PartUploadMetadata('foo', 50, 50), + ]), + + (40, [ + _PartUploadMetadata('foo', 0, 25), + _PartUploadMetadata('foo', 25, 25), + _PartUploadMetadata('foo', 50, 25), + _PartUploadMetadata('foo', 75, 25) + ]), + + (51, [ + _PartUploadMetadata('foo', 0, 50), + _PartUploadMetadata('foo', 50, 50), + ]), + + (49, [ + _PartUploadMetadata('foo', 0, 25), + _PartUploadMetadata('foo', 25, 25), + _PartUploadMetadata('foo', 50, 25), + _PartUploadMetadata('foo', 75, 25), + ]), + + (99, [ + _PartUploadMetadata('foo', 0, 50), + _PartUploadMetadata('foo', 50, 50), + ]), + + (100, [ + _PartUploadMetadata('foo', 0, 100), + ]), +]) +def test_rechunked(max_size, parts): + chunk = _PartUploadMetadata('foo', 0, 100) + rechunked = list(_CloudStorage._rechunk(chunk, max_size)) + assert len(rechunked) == len(parts) + for index, chunk in enumerate(rechunked): + assert chunk == parts[index] diff --git a/test/test_cloud_storage.py b/test/test_cloud_storage.py deleted file mode 100644 index 66025af76..000000000 --- a/test/test_cloud_storage.py +++ /dev/null @@ -1,234 +0,0 @@ -import unittest -import moto -import boto -import os - -from storage import S3Storage, StorageContext -from storage.cloud import _CloudStorage, _PartUploadMetadata -from storage.cloud import _CHUNKS_KEY -from StringIO import StringIO - -_TEST_CONTENT = os.urandom(1024) -_TEST_BUCKET = 'some_bucket' -_TEST_USER = 'someuser' -_TEST_PASSWORD = 'somepassword' -_TEST_PATH = 'some/cool/path' -_TEST_CONTEXT = StorageContext('nyc', None, None, None, None) - -class TestCloudStorage(unittest.TestCase): - def setUp(self): - self.mock = moto.mock_s3() - self.mock.start() - - # Create a test bucket and put some test content. - boto.connect_s3().create_bucket(_TEST_BUCKET) - self.engine = S3Storage(_TEST_CONTEXT, 'some/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD) - self.engine.put_content(_TEST_PATH, _TEST_CONTENT) - - def tearDown(self): - self.mock.stop() - - def test_basicop(self): - # Ensure the content exists. - self.assertTrue(self.engine.exists(_TEST_PATH)) - - # Verify it can be retrieved. - self.assertEquals(_TEST_CONTENT, self.engine.get_content(_TEST_PATH)) - - # Retrieve a checksum for the content. - self.engine.get_checksum(_TEST_PATH) - - # Remove the file. - self.engine.remove(_TEST_PATH) - - # Ensure it no longer exists. - with self.assertRaises(IOError): - self.engine.get_content(_TEST_PATH) - - with self.assertRaises(IOError): - self.engine.get_checksum(_TEST_PATH) - - self.assertFalse(self.engine.exists(_TEST_PATH)) - - def test_copy_samecreds(self): - # Copy the content to another engine. - another_engine = S3Storage(_TEST_CONTEXT, 'another/path', _TEST_BUCKET, _TEST_USER, - _TEST_PASSWORD) - self.engine.copy_to(another_engine, _TEST_PATH) - - # Verify it can be retrieved. - self.assertEquals(_TEST_CONTENT, another_engine.get_content(_TEST_PATH)) - - def test_copy_differentcreds(self): - # Copy the content to another engine. - another_engine = S3Storage(_TEST_CONTEXT, 'another/path', 'another_bucket', 'blech', - 'password') - boto.connect_s3().create_bucket('another_bucket') - - self.engine.copy_to(another_engine, _TEST_PATH) - - # Verify it can be retrieved. - self.assertEquals(_TEST_CONTENT, another_engine.get_content(_TEST_PATH)) - - def test_stream_read(self): - # Read the streaming content. - data = ''.join(self.engine.stream_read(_TEST_PATH)) - self.assertEquals(_TEST_CONTENT, data) - - def test_stream_read_file(self): - with self.engine.stream_read_file(_TEST_PATH) as f: - self.assertEquals(_TEST_CONTENT, f.read()) - - def test_stream_write(self): - new_data = os.urandom(4096) - self.engine.stream_write(_TEST_PATH, StringIO(new_data), content_type='Cool/Type') - self.assertEquals(new_data, self.engine.get_content(_TEST_PATH)) - - def test_chunked_upload_no_chunks(self): - self._chunk_upload_test(0) - - def test_chunked_upload_single_chunk(self): - self._chunk_upload_test(1) - - def test_chunked_upload_multiple_chunks(self): - self._chunk_upload_test(50) - - def test_chunked_upload_single_chunk_client_side(self): - self._chunk_upload_test(1, force_client_side=True) - - def test_chunked_upload_multiple_chunks_client_side(self): - self._chunk_upload_test(50, force_client_side=True) - - def _chunk_upload_test(self, chunk_count, force_client_side=False): - upload_id, metadata = self.engine.initiate_chunked_upload() - final_data = '' - - for index in range(0, chunk_count): - chunk_data = os.urandom(1024) - final_data = final_data + chunk_data - bytes_written, new_metadata, error = self.engine.stream_upload_chunk(upload_id, 0, - len(chunk_data), - StringIO(chunk_data), - metadata) - metadata = new_metadata - - self.assertEquals(len(chunk_data), bytes_written) - self.assertIsNone(error) - self.assertEquals(index + 1, len(metadata[_CHUNKS_KEY])) - - # Complete the chunked upload. - self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', metadata, - force_client_side=force_client_side) - - # Ensure the file contents are valid. - self.assertEquals(final_data, self.engine.get_content('some/chunked/path')) - - def test_cancel_chunked_upload_single_chunk(self): - self._cancel_chunked_upload_test(1) - - def test_cancel_chunked_upload_multiple_chunks(self): - self._cancel_chunked_upload_test(50) - - def _cancel_chunked_upload_test(self, chunk_count): - upload_id, metadata = self.engine.initiate_chunked_upload() - - for _ in range(0, chunk_count): - chunk_data = os.urandom(1024) - _, new_metadata, _ = self.engine.stream_upload_chunk(upload_id, 0, - len(chunk_data), - StringIO(chunk_data), - metadata) - metadata = new_metadata - - # Cancel the upload. - self.engine.cancel_chunked_upload(upload_id, metadata) - - # Ensure all chunks were deleted. - for chunk in metadata[_CHUNKS_KEY]: - self.assertFalse(self.engine.exists(chunk.path)) - - def test_large_chunks_upload(self): - # Make the max chunk size much smaller for testing. - self.engine.maximum_chunk_size = self.engine.minimum_chunk_size * 2 - - upload_id, metadata = self.engine.initiate_chunked_upload() - - # Write a "super large" chunk, to ensure that it is broken into smaller chunks. - chunk_data = os.urandom(int(self.engine.maximum_chunk_size * 2.5)) - bytes_written, new_metadata, _ = self.engine.stream_upload_chunk(upload_id, 0, - -1, - StringIO(chunk_data), - metadata) - self.assertEquals(bytes_written, len(chunk_data)) - - # Complete the chunked upload. - self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata) - - # Ensure the file contents are valid. - self.assertEquals(len(self.engine.get_content('some/chunked/path')), len(chunk_data)) - self.assertEquals(chunk_data, self.engine.get_content('some/chunked/path')) - - def test_large_chunks_with_ragged_edge(self): - # Make the max chunk size much smaller for testing and force it to have a ragged edge. - self.engine.maximum_chunk_size = self.engine.minimum_chunk_size * 2 + 10 - - upload_id, metadata = self.engine.initiate_chunked_upload() - - # Write a few "super large" chunks, to ensure that it is broken into smaller chunks. - all_data = '' - for _ in range(0, 2): - chunk_data = os.urandom(int(self.engine.maximum_chunk_size) + 20) - bytes_written, new_metadata, _ = self.engine.stream_upload_chunk(upload_id, 0, - -1, - StringIO(chunk_data), - metadata) - self.assertEquals(bytes_written, len(chunk_data)) - all_data = all_data + chunk_data - metadata = new_metadata - - # Complete the chunked upload. - self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata) - - # Ensure the file contents are valid. - self.assertEquals(len(self.engine.get_content('some/chunked/path')), len(all_data)) - self.assertEquals(all_data, self.engine.get_content('some/chunked/path')) - - def assertRechunked(self, chunk, max_size, *args): - rechunked = list(_CloudStorage._rechunk(chunk, max_size)) - self.assertEquals(len(rechunked), len(args), rechunked) - for index, chunk in enumerate(rechunked): - self.assertEquals(args[index], chunk) - - def test_rechunking(self): - chunk = _PartUploadMetadata('foo', 0, 100) - - self.assertRechunked(chunk, 50, - _PartUploadMetadata('foo', 0, 50), - _PartUploadMetadata('foo', 50, 50)) - - self.assertRechunked(chunk, 40, - _PartUploadMetadata('foo', 0, 25), - _PartUploadMetadata('foo', 25, 25), - _PartUploadMetadata('foo', 50, 25), - _PartUploadMetadata('foo', 75, 25)) - - self.assertRechunked(chunk, 51, - _PartUploadMetadata('foo', 0, 50), - _PartUploadMetadata('foo', 50, 50)) - - self.assertRechunked(chunk, 49, - _PartUploadMetadata('foo', 0, 25), - _PartUploadMetadata('foo', 25, 25), - _PartUploadMetadata('foo', 50, 25), - _PartUploadMetadata('foo', 75, 25)) - - self.assertRechunked(chunk, 99, - _PartUploadMetadata('foo', 0, 50), - _PartUploadMetadata('foo', 50, 50)) - - self.assertRechunked(chunk, 100, - _PartUploadMetadata('foo', 0, 100)) - - -if __name__ == '__main__': - unittest.main()