import unittest import moto import boto import os from storage import S3Storage from storage.cloud import _CHUNKS_KEY from StringIO import StringIO _TEST_CONTENT = os.urandom(1024) _TEST_BUCKET = 'some_bucket' _TEST_USER = 'someuser' _TEST_PASSWORD = 'somepassword' _TEST_PATH = 'some/cool/path' class TestCloudStorage(unittest.TestCase): def setUp(self): self.mock = moto.mock_s3() self.mock.start() # Create a test bucket and put some test content. boto.connect_s3().create_bucket(_TEST_BUCKET) self.engine = S3Storage(None, 'some/path', _TEST_USER, _TEST_PASSWORD, _TEST_BUCKET) self.engine.put_content(_TEST_PATH, _TEST_CONTENT) def tearDown(self): self.mock.stop() def test_basicop(self): # Ensure the content exists. self.assertTrue(self.engine.exists(_TEST_PATH)) # Verify it can be retrieved. self.assertEquals(_TEST_CONTENT, self.engine.get_content(_TEST_PATH)) # Retrieve a checksum for the content. self.engine.get_checksum(_TEST_PATH) # Remove the file. self.engine.remove(_TEST_PATH) # Ensure it no longer exists. with self.assertRaises(IOError): self.engine.get_content(_TEST_PATH) with self.assertRaises(IOError): self.engine.get_checksum(_TEST_PATH) self.assertFalse(self.engine.exists(_TEST_PATH)) def test_copy_samecreds(self): # Copy the content to another engine. another_engine = S3Storage(None, 'another/path', _TEST_USER, _TEST_PASSWORD, _TEST_BUCKET) self.engine.copy_to(another_engine, _TEST_PATH) # Verify it can be retrieved. self.assertEquals(_TEST_CONTENT, another_engine.get_content(_TEST_PATH)) def test_copy_differentcreds(self): # Copy the content to another engine. another_engine = S3Storage(None, 'another/path', 'blech', 'password', 'another_bucket') boto.connect_s3().create_bucket('another_bucket') self.engine.copy_to(another_engine, _TEST_PATH) # Verify it can be retrieved. self.assertEquals(_TEST_CONTENT, another_engine.get_content(_TEST_PATH)) def test_stream_read(self): # Read the streaming content. data = ''.join(self.engine.stream_read(_TEST_PATH)) self.assertEquals(_TEST_CONTENT, data) def test_stream_read_file(self): with self.engine.stream_read_file(_TEST_PATH) as f: self.assertEquals(_TEST_CONTENT, f.read()) def test_stream_write(self): new_data = os.urandom(4096) self.engine.stream_write(_TEST_PATH, StringIO(new_data), content_type='Cool/Type') self.assertEquals(new_data, self.engine.get_content(_TEST_PATH)) def test_chunked_upload_single_chunk(self): self._chunk_upload_test(1) def test_chunked_upload_multiple_chunks(self): self._chunk_upload_test(50) def test_chunked_upload_single_chunk_client_side(self): self._chunk_upload_test(1, force_client_side=True) def test_chunked_upload_multiple_chunks_client_side(self): self._chunk_upload_test(50, force_client_side=True) def _chunk_upload_test(self, chunk_count, force_client_side=False): upload_id, metadata = self.engine.initiate_chunked_upload() final_data = '' for index in range(0, chunk_count): chunk_data = os.urandom(1024) final_data = final_data + chunk_data bytes_written, new_metadata, error = self.engine.stream_upload_chunk(upload_id, 0, len(chunk_data), StringIO(chunk_data), metadata) metadata = new_metadata self.assertEquals(len(chunk_data), bytes_written) self.assertIsNone(error) self.assertEquals(index + 1, len(metadata[_CHUNKS_KEY])) # Complete the chunked upload. self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata, force_client_side=force_client_side) # Ensure the file contents are valid. self.assertEquals(final_data, self.engine.get_content('some/chunked/path')) def test_cancel_chunked_upload_single_chunk(self): self._cancel_chunked_upload_test(1) def test_cancel_chunked_upload_multiple_chunks(self): self._cancel_chunked_upload_test(50) def _cancel_chunked_upload_test(self, chunk_count): upload_id, metadata = self.engine.initiate_chunked_upload() for _ in range(0, chunk_count): chunk_data = os.urandom(1024) _, new_metadata, _ = self.engine.stream_upload_chunk(upload_id, 0, len(chunk_data), StringIO(chunk_data), metadata) metadata = new_metadata # Cancel the upload. self.engine.cancel_chunked_upload(upload_id, metadata) # Ensure all chunks were deleted. for chunk in metadata[_CHUNKS_KEY]: self.assertFalse(self.engine.exists(chunk.path)) if __name__ == '__main__': unittest.main()