import unittest import moto import boto import os from storage import S3Storage, StorageContext from storage.cloud import _CloudStorage, _PartUploadMetadata from storage.cloud import _CHUNKS_KEY from StringIO import StringIO _TEST_CONTENT = os.urandom(1024) _TEST_BUCKET = 'some_bucket' _TEST_USER = 'someuser' _TEST_PASSWORD = 'somepassword' _TEST_PATH = 'some/cool/path' _TEST_CONTEXT = StorageContext('nyc', None, None) class TestCloudStorage(unittest.TestCase): def setUp(self): self.mock = moto.mock_s3() self.mock.start() # Create a test bucket and put some test content. boto.connect_s3().create_bucket(_TEST_BUCKET) self.engine = S3Storage(_TEST_CONTEXT, 'some/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD) self.engine.put_content(_TEST_PATH, _TEST_CONTENT) def tearDown(self): self.mock.stop() def test_basicop(self): # Ensure the content exists. self.assertTrue(self.engine.exists(_TEST_PATH)) # Verify it can be retrieved. self.assertEquals(_TEST_CONTENT, self.engine.get_content(_TEST_PATH)) # Retrieve a checksum for the content. self.engine.get_checksum(_TEST_PATH) # Remove the file. self.engine.remove(_TEST_PATH) # Ensure it no longer exists. with self.assertRaises(IOError): self.engine.get_content(_TEST_PATH) with self.assertRaises(IOError): self.engine.get_checksum(_TEST_PATH) self.assertFalse(self.engine.exists(_TEST_PATH)) def test_copy_samecreds(self): # Copy the content to another engine. another_engine = S3Storage(_TEST_CONTEXT, 'another/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD) self.engine.copy_to(another_engine, _TEST_PATH) # Verify it can be retrieved. self.assertEquals(_TEST_CONTENT, another_engine.get_content(_TEST_PATH)) def test_copy_differentcreds(self): # Copy the content to another engine. another_engine = S3Storage(_TEST_CONTEXT, 'another/path', 'another_bucket', 'blech', 'password') boto.connect_s3().create_bucket('another_bucket') self.engine.copy_to(another_engine, _TEST_PATH) # Verify it can be retrieved. self.assertEquals(_TEST_CONTENT, another_engine.get_content(_TEST_PATH)) def test_stream_read(self): # Read the streaming content. data = ''.join(self.engine.stream_read(_TEST_PATH)) self.assertEquals(_TEST_CONTENT, data) def test_stream_read_file(self): with self.engine.stream_read_file(_TEST_PATH) as f: self.assertEquals(_TEST_CONTENT, f.read()) def test_stream_write(self): new_data = os.urandom(4096) self.engine.stream_write(_TEST_PATH, StringIO(new_data), content_type='Cool/Type') self.assertEquals(new_data, self.engine.get_content(_TEST_PATH)) def test_chunked_upload_single_chunk(self): self._chunk_upload_test(1) def test_chunked_upload_multiple_chunks(self): self._chunk_upload_test(50) def test_chunked_upload_single_chunk_client_side(self): self._chunk_upload_test(1, force_client_side=True) def test_chunked_upload_multiple_chunks_client_side(self): self._chunk_upload_test(50, force_client_side=True) def _chunk_upload_test(self, chunk_count, force_client_side=False): upload_id, metadata = self.engine.initiate_chunked_upload() final_data = '' for index in range(0, chunk_count): chunk_data = os.urandom(1024) final_data = final_data + chunk_data bytes_written, new_metadata, error = self.engine.stream_upload_chunk(upload_id, 0, len(chunk_data), StringIO(chunk_data), metadata) metadata = new_metadata self.assertEquals(len(chunk_data), bytes_written) self.assertIsNone(error) self.assertEquals(index + 1, len(metadata[_CHUNKS_KEY])) # Complete the chunked upload. self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata, force_client_side=force_client_side) # Ensure the file contents are valid. self.assertEquals(final_data, self.engine.get_content('some/chunked/path')) def test_cancel_chunked_upload_single_chunk(self): self._cancel_chunked_upload_test(1) def test_cancel_chunked_upload_multiple_chunks(self): self._cancel_chunked_upload_test(50) def _cancel_chunked_upload_test(self, chunk_count): upload_id, metadata = self.engine.initiate_chunked_upload() for _ in range(0, chunk_count): chunk_data = os.urandom(1024) _, new_metadata, _ = self.engine.stream_upload_chunk(upload_id, 0, len(chunk_data), StringIO(chunk_data), metadata) metadata = new_metadata # Cancel the upload. self.engine.cancel_chunked_upload(upload_id, metadata) # Ensure all chunks were deleted. for chunk in metadata[_CHUNKS_KEY]: self.assertFalse(self.engine.exists(chunk.path)) def test_large_chunks_upload(self): # Make the max chunk size much smaller for testing. self.engine.maximum_chunk_size = self.engine.minimum_chunk_size * 2 upload_id, metadata = self.engine.initiate_chunked_upload() # Write a "super large" chunk, to ensure that it is broken into smaller chunks. chunk_data = os.urandom(int(self.engine.maximum_chunk_size * 2.5)) bytes_written, new_metadata, _ = self.engine.stream_upload_chunk(upload_id, 0, -1, StringIO(chunk_data), metadata) self.assertEquals(bytes_written, len(chunk_data)) # Complete the chunked upload. self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata) # Ensure the file contents are valid. self.assertEquals(len(self.engine.get_content('some/chunked/path')), len(chunk_data)) self.assertEquals(chunk_data, self.engine.get_content('some/chunked/path')) def test_large_chunks_with_ragged_edge(self): # Make the max chunk size much smaller for testing and force it to have a ragged edge. self.engine.maximum_chunk_size = self.engine.minimum_chunk_size * 2 + 10 upload_id, metadata = self.engine.initiate_chunked_upload() # Write a few "super large" chunks, to ensure that it is broken into smaller chunks. all_data = '' for _ in range(0, 2): chunk_data = os.urandom(int(self.engine.maximum_chunk_size) + 20) bytes_written, new_metadata, _ = self.engine.stream_upload_chunk(upload_id, 0, -1, StringIO(chunk_data), metadata) self.assertEquals(bytes_written, len(chunk_data)) all_data = all_data + chunk_data metadata = new_metadata # Complete the chunked upload. self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata) # Ensure the file contents are valid. self.assertEquals(len(self.engine.get_content('some/chunked/path')), len(all_data)) self.assertEquals(all_data, self.engine.get_content('some/chunked/path')) def assertRechunked(self, chunk, max_size, *args): rechunked = list(_CloudStorage._rechunk(chunk, max_size)) self.assertEquals(len(rechunked), len(args), rechunked) for index, chunk in enumerate(rechunked): self.assertEquals(args[index], chunk) def test_rechunking(self): chunk = _PartUploadMetadata('foo', 0, 100) self.assertRechunked(chunk, 50, _PartUploadMetadata('foo', 0, 50), _PartUploadMetadata('foo', 50, 50)) self.assertRechunked(chunk, 40, _PartUploadMetadata('foo', 0, 25), _PartUploadMetadata('foo', 25, 25), _PartUploadMetadata('foo', 50, 25), _PartUploadMetadata('foo', 75, 25)) self.assertRechunked(chunk, 51, _PartUploadMetadata('foo', 0, 50), _PartUploadMetadata('foo', 50, 50)) self.assertRechunked(chunk, 49, _PartUploadMetadata('foo', 0, 25), _PartUploadMetadata('foo', 25, 25), _PartUploadMetadata('foo', 50, 25), _PartUploadMetadata('foo', 75, 25)) self.assertRechunked(chunk, 99, _PartUploadMetadata('foo', 0, 50), _PartUploadMetadata('foo', 50, 50)) self.assertRechunked(chunk, 100, _PartUploadMetadata('foo', 0, 100)) if __name__ == '__main__': unittest.main()