Merge pull request #3152 from quay/move-cloud-storage-test
Move cloud storage tests to pytest
This commit is contained in:
commit
7cb0e0dc80
2 changed files with 246 additions and 234 deletions
246
storage/test/test_cloud_storage.py
Normal file
246
storage/test/test_cloud_storage.py
Normal file
|
@ -0,0 +1,246 @@
|
|||
import os
|
||||
|
||||
from StringIO import StringIO
|
||||
|
||||
import pytest
|
||||
|
||||
import moto
|
||||
import boto
|
||||
|
||||
from storage import S3Storage, StorageContext
|
||||
from storage.cloud import _CloudStorage, _PartUploadMetadata
|
||||
from storage.cloud import _CHUNKS_KEY
|
||||
|
||||
_TEST_CONTENT = os.urandom(1024)
|
||||
_TEST_BUCKET = 'some_bucket'
|
||||
_TEST_USER = 'someuser'
|
||||
_TEST_PASSWORD = 'somepassword'
|
||||
_TEST_PATH = 'some/cool/path'
|
||||
_TEST_CONTEXT = StorageContext('nyc', None, None, None, None)
|
||||
|
||||
@pytest.fixture()
|
||||
def storage_engine():
|
||||
mock = moto.mock_s3()
|
||||
mock.start()
|
||||
|
||||
# Create a test bucket and put some test content.
|
||||
boto.connect_s3().create_bucket(_TEST_BUCKET)
|
||||
engine = S3Storage(_TEST_CONTEXT, 'some/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD)
|
||||
engine.put_content(_TEST_PATH, _TEST_CONTENT)
|
||||
|
||||
yield engine
|
||||
|
||||
mock.stop()
|
||||
|
||||
|
||||
def test_basicop(storage_engine):
|
||||
# Ensure the content exists.
|
||||
assert storage_engine.exists(_TEST_PATH)
|
||||
|
||||
# Verify it can be retrieved.
|
||||
assert storage_engine.get_content(_TEST_PATH) == _TEST_CONTENT
|
||||
|
||||
# Retrieve a checksum for the content.
|
||||
storage_engine.get_checksum(_TEST_PATH)
|
||||
|
||||
# Remove the file.
|
||||
storage_engine.remove(_TEST_PATH)
|
||||
|
||||
# Ensure it no longer exists.
|
||||
with pytest.raises(IOError):
|
||||
storage_engine.get_content(_TEST_PATH)
|
||||
|
||||
with pytest.raises(IOError):
|
||||
storage_engine.get_checksum(_TEST_PATH)
|
||||
|
||||
assert not storage_engine.exists(_TEST_PATH)
|
||||
|
||||
|
||||
def test_copy_samecreds(storage_engine):
|
||||
# Copy the content to another engine.
|
||||
another_engine = S3Storage(_TEST_CONTEXT, 'another/path', _TEST_BUCKET, _TEST_USER,
|
||||
_TEST_PASSWORD)
|
||||
storage_engine.copy_to(another_engine, _TEST_PATH)
|
||||
|
||||
# Verify it can be retrieved.
|
||||
assert another_engine.get_content(_TEST_PATH) == _TEST_CONTENT
|
||||
|
||||
|
||||
def test_copy_differentcreds(storage_engine):
|
||||
# Copy the content to another engine.
|
||||
another_engine = S3Storage(_TEST_CONTEXT, 'another/path', 'another_bucket', 'blech',
|
||||
'password')
|
||||
boto.connect_s3().create_bucket('another_bucket')
|
||||
|
||||
storage_engine.copy_to(another_engine, _TEST_PATH)
|
||||
|
||||
# Verify it can be retrieved.
|
||||
assert another_engine.get_content(_TEST_PATH) == _TEST_CONTENT
|
||||
|
||||
|
||||
def test_stream_read(storage_engine):
|
||||
# Read the streaming content.
|
||||
data = ''.join(storage_engine.stream_read(_TEST_PATH))
|
||||
assert data == _TEST_CONTENT
|
||||
|
||||
|
||||
def test_stream_read_file(storage_engine):
|
||||
with storage_engine.stream_read_file(_TEST_PATH) as f:
|
||||
assert f.read() == _TEST_CONTENT
|
||||
|
||||
|
||||
def test_stream_write(storage_engine):
|
||||
new_data = os.urandom(4096)
|
||||
storage_engine.stream_write(_TEST_PATH, StringIO(new_data), content_type='Cool/Type')
|
||||
assert storage_engine.get_content(_TEST_PATH) == new_data
|
||||
|
||||
|
||||
@pytest.mark.parametrize('chunk_count', [
|
||||
0,
|
||||
1,
|
||||
50,
|
||||
])
|
||||
@pytest.mark.parametrize('force_client_side', [
|
||||
False,
|
||||
True
|
||||
])
|
||||
def test_chunk_upload(storage_engine, chunk_count, force_client_side):
|
||||
if chunk_count == 0 and force_client_side:
|
||||
return
|
||||
|
||||
upload_id, metadata = storage_engine.initiate_chunked_upload()
|
||||
final_data = ''
|
||||
|
||||
for index in range(0, chunk_count):
|
||||
chunk_data = os.urandom(1024)
|
||||
final_data = final_data + chunk_data
|
||||
bytes_written, new_metadata, error = storage_engine.stream_upload_chunk(upload_id, 0,
|
||||
len(chunk_data),
|
||||
StringIO(chunk_data),
|
||||
metadata)
|
||||
metadata = new_metadata
|
||||
|
||||
assert bytes_written == len(chunk_data)
|
||||
assert error is None
|
||||
assert len(metadata[_CHUNKS_KEY]) == index + 1
|
||||
|
||||
# Complete the chunked upload.
|
||||
storage_engine.complete_chunked_upload(upload_id, 'some/chunked/path', metadata,
|
||||
force_client_side=force_client_side)
|
||||
|
||||
# Ensure the file contents are valid.
|
||||
assert storage_engine.get_content('some/chunked/path') == final_data
|
||||
|
||||
|
||||
@pytest.mark.parametrize('chunk_count', [
|
||||
0,
|
||||
1,
|
||||
50,
|
||||
])
|
||||
def test_cancel_chunked_upload(storage_engine, chunk_count):
|
||||
upload_id, metadata = storage_engine.initiate_chunked_upload()
|
||||
|
||||
for _ in range(0, chunk_count):
|
||||
chunk_data = os.urandom(1024)
|
||||
_, new_metadata, _ = storage_engine.stream_upload_chunk(upload_id, 0,
|
||||
len(chunk_data),
|
||||
StringIO(chunk_data),
|
||||
metadata)
|
||||
metadata = new_metadata
|
||||
|
||||
# Cancel the upload.
|
||||
storage_engine.cancel_chunked_upload(upload_id, metadata)
|
||||
|
||||
# Ensure all chunks were deleted.
|
||||
for chunk in metadata[_CHUNKS_KEY]:
|
||||
assert not storage_engine.exists(chunk.path)
|
||||
|
||||
|
||||
def test_large_chunks_upload(storage_engine):
|
||||
# Make the max chunk size much smaller for testing.
|
||||
storage_engine.maximum_chunk_size = storage_engine.minimum_chunk_size * 2
|
||||
|
||||
upload_id, metadata = storage_engine.initiate_chunked_upload()
|
||||
|
||||
# Write a "super large" chunk, to ensure that it is broken into smaller chunks.
|
||||
chunk_data = os.urandom(int(storage_engine.maximum_chunk_size * 2.5))
|
||||
bytes_written, new_metadata, _ = storage_engine.stream_upload_chunk(upload_id, 0,
|
||||
-1,
|
||||
StringIO(chunk_data),
|
||||
metadata)
|
||||
assert len(chunk_data) == bytes_written
|
||||
|
||||
# Complete the chunked upload.
|
||||
storage_engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata)
|
||||
|
||||
# Ensure the file contents are valid.
|
||||
assert len(chunk_data) == len(storage_engine.get_content('some/chunked/path'))
|
||||
assert storage_engine.get_content('some/chunked/path') == chunk_data
|
||||
|
||||
|
||||
def test_large_chunks_with_ragged_edge(storage_engine):
|
||||
# Make the max chunk size much smaller for testing and force it to have a ragged edge.
|
||||
storage_engine.maximum_chunk_size = storage_engine.minimum_chunk_size * 2 + 10
|
||||
|
||||
upload_id, metadata = storage_engine.initiate_chunked_upload()
|
||||
|
||||
# Write a few "super large" chunks, to ensure that it is broken into smaller chunks.
|
||||
all_data = ''
|
||||
for _ in range(0, 2):
|
||||
chunk_data = os.urandom(int(storage_engine.maximum_chunk_size) + 20)
|
||||
bytes_written, new_metadata, _ = storage_engine.stream_upload_chunk(upload_id, 0,
|
||||
-1,
|
||||
StringIO(chunk_data),
|
||||
metadata)
|
||||
assert len(chunk_data) == bytes_written
|
||||
all_data = all_data + chunk_data
|
||||
metadata = new_metadata
|
||||
|
||||
# Complete the chunked upload.
|
||||
storage_engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata)
|
||||
|
||||
# Ensure the file contents are valid.
|
||||
assert len(all_data) == len(storage_engine.get_content('some/chunked/path'))
|
||||
assert storage_engine.get_content('some/chunked/path') == all_data
|
||||
|
||||
|
||||
@pytest.mark.parametrize('max_size, parts', [
|
||||
(50, [
|
||||
_PartUploadMetadata('foo', 0, 50),
|
||||
_PartUploadMetadata('foo', 50, 50),
|
||||
]),
|
||||
|
||||
(40, [
|
||||
_PartUploadMetadata('foo', 0, 25),
|
||||
_PartUploadMetadata('foo', 25, 25),
|
||||
_PartUploadMetadata('foo', 50, 25),
|
||||
_PartUploadMetadata('foo', 75, 25)
|
||||
]),
|
||||
|
||||
(51, [
|
||||
_PartUploadMetadata('foo', 0, 50),
|
||||
_PartUploadMetadata('foo', 50, 50),
|
||||
]),
|
||||
|
||||
(49, [
|
||||
_PartUploadMetadata('foo', 0, 25),
|
||||
_PartUploadMetadata('foo', 25, 25),
|
||||
_PartUploadMetadata('foo', 50, 25),
|
||||
_PartUploadMetadata('foo', 75, 25),
|
||||
]),
|
||||
|
||||
(99, [
|
||||
_PartUploadMetadata('foo', 0, 50),
|
||||
_PartUploadMetadata('foo', 50, 50),
|
||||
]),
|
||||
|
||||
(100, [
|
||||
_PartUploadMetadata('foo', 0, 100),
|
||||
]),
|
||||
])
|
||||
def test_rechunked(max_size, parts):
|
||||
chunk = _PartUploadMetadata('foo', 0, 100)
|
||||
rechunked = list(_CloudStorage._rechunk(chunk, max_size))
|
||||
assert len(rechunked) == len(parts)
|
||||
for index, chunk in enumerate(rechunked):
|
||||
assert chunk == parts[index]
|
|
@ -1,234 +0,0 @@
|
|||
import unittest
|
||||
import moto
|
||||
import boto
|
||||
import os
|
||||
|
||||
from storage import S3Storage, StorageContext
|
||||
from storage.cloud import _CloudStorage, _PartUploadMetadata
|
||||
from storage.cloud import _CHUNKS_KEY
|
||||
from StringIO import StringIO
|
||||
|
||||
_TEST_CONTENT = os.urandom(1024)
|
||||
_TEST_BUCKET = 'some_bucket'
|
||||
_TEST_USER = 'someuser'
|
||||
_TEST_PASSWORD = 'somepassword'
|
||||
_TEST_PATH = 'some/cool/path'
|
||||
_TEST_CONTEXT = StorageContext('nyc', None, None, None, None)
|
||||
|
||||
class TestCloudStorage(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.mock = moto.mock_s3()
|
||||
self.mock.start()
|
||||
|
||||
# Create a test bucket and put some test content.
|
||||
boto.connect_s3().create_bucket(_TEST_BUCKET)
|
||||
self.engine = S3Storage(_TEST_CONTEXT, 'some/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD)
|
||||
self.engine.put_content(_TEST_PATH, _TEST_CONTENT)
|
||||
|
||||
def tearDown(self):
|
||||
self.mock.stop()
|
||||
|
||||
def test_basicop(self):
|
||||
# Ensure the content exists.
|
||||
self.assertTrue(self.engine.exists(_TEST_PATH))
|
||||
|
||||
# Verify it can be retrieved.
|
||||
self.assertEquals(_TEST_CONTENT, self.engine.get_content(_TEST_PATH))
|
||||
|
||||
# Retrieve a checksum for the content.
|
||||
self.engine.get_checksum(_TEST_PATH)
|
||||
|
||||
# Remove the file.
|
||||
self.engine.remove(_TEST_PATH)
|
||||
|
||||
# Ensure it no longer exists.
|
||||
with self.assertRaises(IOError):
|
||||
self.engine.get_content(_TEST_PATH)
|
||||
|
||||
with self.assertRaises(IOError):
|
||||
self.engine.get_checksum(_TEST_PATH)
|
||||
|
||||
self.assertFalse(self.engine.exists(_TEST_PATH))
|
||||
|
||||
def test_copy_samecreds(self):
|
||||
# Copy the content to another engine.
|
||||
another_engine = S3Storage(_TEST_CONTEXT, 'another/path', _TEST_BUCKET, _TEST_USER,
|
||||
_TEST_PASSWORD)
|
||||
self.engine.copy_to(another_engine, _TEST_PATH)
|
||||
|
||||
# Verify it can be retrieved.
|
||||
self.assertEquals(_TEST_CONTENT, another_engine.get_content(_TEST_PATH))
|
||||
|
||||
def test_copy_differentcreds(self):
|
||||
# Copy the content to another engine.
|
||||
another_engine = S3Storage(_TEST_CONTEXT, 'another/path', 'another_bucket', 'blech',
|
||||
'password')
|
||||
boto.connect_s3().create_bucket('another_bucket')
|
||||
|
||||
self.engine.copy_to(another_engine, _TEST_PATH)
|
||||
|
||||
# Verify it can be retrieved.
|
||||
self.assertEquals(_TEST_CONTENT, another_engine.get_content(_TEST_PATH))
|
||||
|
||||
def test_stream_read(self):
|
||||
# Read the streaming content.
|
||||
data = ''.join(self.engine.stream_read(_TEST_PATH))
|
||||
self.assertEquals(_TEST_CONTENT, data)
|
||||
|
||||
def test_stream_read_file(self):
|
||||
with self.engine.stream_read_file(_TEST_PATH) as f:
|
||||
self.assertEquals(_TEST_CONTENT, f.read())
|
||||
|
||||
def test_stream_write(self):
|
||||
new_data = os.urandom(4096)
|
||||
self.engine.stream_write(_TEST_PATH, StringIO(new_data), content_type='Cool/Type')
|
||||
self.assertEquals(new_data, self.engine.get_content(_TEST_PATH))
|
||||
|
||||
def test_chunked_upload_no_chunks(self):
|
||||
self._chunk_upload_test(0)
|
||||
|
||||
def test_chunked_upload_single_chunk(self):
|
||||
self._chunk_upload_test(1)
|
||||
|
||||
def test_chunked_upload_multiple_chunks(self):
|
||||
self._chunk_upload_test(50)
|
||||
|
||||
def test_chunked_upload_single_chunk_client_side(self):
|
||||
self._chunk_upload_test(1, force_client_side=True)
|
||||
|
||||
def test_chunked_upload_multiple_chunks_client_side(self):
|
||||
self._chunk_upload_test(50, force_client_side=True)
|
||||
|
||||
def _chunk_upload_test(self, chunk_count, force_client_side=False):
|
||||
upload_id, metadata = self.engine.initiate_chunked_upload()
|
||||
final_data = ''
|
||||
|
||||
for index in range(0, chunk_count):
|
||||
chunk_data = os.urandom(1024)
|
||||
final_data = final_data + chunk_data
|
||||
bytes_written, new_metadata, error = self.engine.stream_upload_chunk(upload_id, 0,
|
||||
len(chunk_data),
|
||||
StringIO(chunk_data),
|
||||
metadata)
|
||||
metadata = new_metadata
|
||||
|
||||
self.assertEquals(len(chunk_data), bytes_written)
|
||||
self.assertIsNone(error)
|
||||
self.assertEquals(index + 1, len(metadata[_CHUNKS_KEY]))
|
||||
|
||||
# Complete the chunked upload.
|
||||
self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', metadata,
|
||||
force_client_side=force_client_side)
|
||||
|
||||
# Ensure the file contents are valid.
|
||||
self.assertEquals(final_data, self.engine.get_content('some/chunked/path'))
|
||||
|
||||
def test_cancel_chunked_upload_single_chunk(self):
|
||||
self._cancel_chunked_upload_test(1)
|
||||
|
||||
def test_cancel_chunked_upload_multiple_chunks(self):
|
||||
self._cancel_chunked_upload_test(50)
|
||||
|
||||
def _cancel_chunked_upload_test(self, chunk_count):
|
||||
upload_id, metadata = self.engine.initiate_chunked_upload()
|
||||
|
||||
for _ in range(0, chunk_count):
|
||||
chunk_data = os.urandom(1024)
|
||||
_, new_metadata, _ = self.engine.stream_upload_chunk(upload_id, 0,
|
||||
len(chunk_data),
|
||||
StringIO(chunk_data),
|
||||
metadata)
|
||||
metadata = new_metadata
|
||||
|
||||
# Cancel the upload.
|
||||
self.engine.cancel_chunked_upload(upload_id, metadata)
|
||||
|
||||
# Ensure all chunks were deleted.
|
||||
for chunk in metadata[_CHUNKS_KEY]:
|
||||
self.assertFalse(self.engine.exists(chunk.path))
|
||||
|
||||
def test_large_chunks_upload(self):
|
||||
# Make the max chunk size much smaller for testing.
|
||||
self.engine.maximum_chunk_size = self.engine.minimum_chunk_size * 2
|
||||
|
||||
upload_id, metadata = self.engine.initiate_chunked_upload()
|
||||
|
||||
# Write a "super large" chunk, to ensure that it is broken into smaller chunks.
|
||||
chunk_data = os.urandom(int(self.engine.maximum_chunk_size * 2.5))
|
||||
bytes_written, new_metadata, _ = self.engine.stream_upload_chunk(upload_id, 0,
|
||||
-1,
|
||||
StringIO(chunk_data),
|
||||
metadata)
|
||||
self.assertEquals(bytes_written, len(chunk_data))
|
||||
|
||||
# Complete the chunked upload.
|
||||
self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata)
|
||||
|
||||
# Ensure the file contents are valid.
|
||||
self.assertEquals(len(self.engine.get_content('some/chunked/path')), len(chunk_data))
|
||||
self.assertEquals(chunk_data, self.engine.get_content('some/chunked/path'))
|
||||
|
||||
def test_large_chunks_with_ragged_edge(self):
|
||||
# Make the max chunk size much smaller for testing and force it to have a ragged edge.
|
||||
self.engine.maximum_chunk_size = self.engine.minimum_chunk_size * 2 + 10
|
||||
|
||||
upload_id, metadata = self.engine.initiate_chunked_upload()
|
||||
|
||||
# Write a few "super large" chunks, to ensure that it is broken into smaller chunks.
|
||||
all_data = ''
|
||||
for _ in range(0, 2):
|
||||
chunk_data = os.urandom(int(self.engine.maximum_chunk_size) + 20)
|
||||
bytes_written, new_metadata, _ = self.engine.stream_upload_chunk(upload_id, 0,
|
||||
-1,
|
||||
StringIO(chunk_data),
|
||||
metadata)
|
||||
self.assertEquals(bytes_written, len(chunk_data))
|
||||
all_data = all_data + chunk_data
|
||||
metadata = new_metadata
|
||||
|
||||
# Complete the chunked upload.
|
||||
self.engine.complete_chunked_upload(upload_id, 'some/chunked/path', new_metadata)
|
||||
|
||||
# Ensure the file contents are valid.
|
||||
self.assertEquals(len(self.engine.get_content('some/chunked/path')), len(all_data))
|
||||
self.assertEquals(all_data, self.engine.get_content('some/chunked/path'))
|
||||
|
||||
def assertRechunked(self, chunk, max_size, *args):
|
||||
rechunked = list(_CloudStorage._rechunk(chunk, max_size))
|
||||
self.assertEquals(len(rechunked), len(args), rechunked)
|
||||
for index, chunk in enumerate(rechunked):
|
||||
self.assertEquals(args[index], chunk)
|
||||
|
||||
def test_rechunking(self):
|
||||
chunk = _PartUploadMetadata('foo', 0, 100)
|
||||
|
||||
self.assertRechunked(chunk, 50,
|
||||
_PartUploadMetadata('foo', 0, 50),
|
||||
_PartUploadMetadata('foo', 50, 50))
|
||||
|
||||
self.assertRechunked(chunk, 40,
|
||||
_PartUploadMetadata('foo', 0, 25),
|
||||
_PartUploadMetadata('foo', 25, 25),
|
||||
_PartUploadMetadata('foo', 50, 25),
|
||||
_PartUploadMetadata('foo', 75, 25))
|
||||
|
||||
self.assertRechunked(chunk, 51,
|
||||
_PartUploadMetadata('foo', 0, 50),
|
||||
_PartUploadMetadata('foo', 50, 50))
|
||||
|
||||
self.assertRechunked(chunk, 49,
|
||||
_PartUploadMetadata('foo', 0, 25),
|
||||
_PartUploadMetadata('foo', 25, 25),
|
||||
_PartUploadMetadata('foo', 50, 25),
|
||||
_PartUploadMetadata('foo', 75, 25))
|
||||
|
||||
self.assertRechunked(chunk, 99,
|
||||
_PartUploadMetadata('foo', 0, 50),
|
||||
_PartUploadMetadata('foo', 50, 50))
|
||||
|
||||
self.assertRechunked(chunk, 100,
|
||||
_PartUploadMetadata('foo', 0, 100))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Reference in a new issue