Merge pull request #2732 from coreos-inc/swift-etag
Make sure to etag check Swift uploads
This commit is contained in:
commit
460a9b7fe8
3 changed files with 238 additions and 227 deletions
|
@ -11,7 +11,7 @@ import json
|
||||||
from _pyio import BufferedReader
|
from _pyio import BufferedReader
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
from swiftclient.client import Connection, ClientException
|
from swiftclient.client import Connection, ClientException, ReadableToIterable
|
||||||
from urlparse import urlparse
|
from urlparse import urlparse
|
||||||
from random import SystemRandom
|
from random import SystemRandom
|
||||||
from hashlib import sha1
|
from hashlib import sha1
|
||||||
|
@ -108,10 +108,14 @@ class SwiftStorage(BaseStorage):
|
||||||
if content_encoding is not None:
|
if content_encoding is not None:
|
||||||
headers['Content-Encoding'] = content_encoding
|
headers['Content-Encoding'] = content_encoding
|
||||||
|
|
||||||
|
is_filelike = hasattr(content, 'read')
|
||||||
|
if is_filelike:
|
||||||
|
content = ReadableToIterable(content, md5=True)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._get_connection().put_object(self._swift_container, path, content,
|
etag = self._get_connection().put_object(self._swift_container, path, content,
|
||||||
chunk_size=chunk, content_type=content_type,
|
chunk_size=chunk, content_type=content_type,
|
||||||
headers=headers)
|
headers=headers)
|
||||||
except ClientException:
|
except ClientException:
|
||||||
# We re-raise client exception here so that validation of config during setup can see
|
# We re-raise client exception here so that validation of config during setup can see
|
||||||
# the client exception messages.
|
# the client exception messages.
|
||||||
|
@ -120,6 +124,16 @@ class SwiftStorage(BaseStorage):
|
||||||
logger.exception('Could not put object at path %s: %s', path, ex)
|
logger.exception('Could not put object at path %s: %s', path, ex)
|
||||||
raise IOError("Could not put content: %s" % path)
|
raise IOError("Could not put content: %s" % path)
|
||||||
|
|
||||||
|
# If we wrapped the content in a ReadableToIterable, compare its MD5 to the etag returned. If
|
||||||
|
# they don't match, raise an IOError indicating a write failure.
|
||||||
|
if is_filelike:
|
||||||
|
if etag != content.get_md5sum():
|
||||||
|
logger.error('Got mismatch in md5 etag for path %s: Expected %s, but server has %s', path,
|
||||||
|
content.get_md5sum(), etag)
|
||||||
|
raise IOError('upload verification failed for path {0}:'
|
||||||
|
'md5 mismatch, local {1} != remote {2}'
|
||||||
|
.format(path, content.get_md5sum(), etag))
|
||||||
|
|
||||||
def _head_object(self, path):
|
def _head_object(self, path):
|
||||||
path = self._normalize_path(path)
|
path = self._normalize_path(path)
|
||||||
try:
|
try:
|
||||||
|
|
220
storage/test/test_swift.py
Normal file
220
storage/test/test_swift.py
Normal file
|
@ -0,0 +1,220 @@
|
||||||
|
import io
|
||||||
|
import pytest
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
from collections import defaultdict
|
||||||
|
from mock import MagicMock
|
||||||
|
|
||||||
|
from storage import StorageContext
|
||||||
|
from storage.swift import SwiftStorage
|
||||||
|
|
||||||
|
base_args = {
|
||||||
|
'context': StorageContext('nyc', None, None),
|
||||||
|
'swift_container': 'container-name',
|
||||||
|
'storage_path': '/basepath',
|
||||||
|
'auth_url': 'https://auth.com',
|
||||||
|
'swift_user': 'root',
|
||||||
|
'swift_password': 'password',
|
||||||
|
}
|
||||||
|
|
||||||
|
class MockSwiftStorage(SwiftStorage):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(MockSwiftStorage, self).__init__(*args, **kwargs)
|
||||||
|
self._connection = MagicMock()
|
||||||
|
|
||||||
|
def _get_connection(self):
|
||||||
|
return self._connection
|
||||||
|
|
||||||
|
class FakeSwiftStorage(SwiftStorage):
|
||||||
|
def __init__(self, fail_checksum=False, *args, **kwargs):
|
||||||
|
super(FakeSwiftStorage, self).__init__(*args, **kwargs)
|
||||||
|
self._connection = FakeSwift(fail_checksum=fail_checksum)
|
||||||
|
|
||||||
|
def _get_connection(self):
|
||||||
|
return self._connection
|
||||||
|
|
||||||
|
|
||||||
|
class FakeSwift(object):
|
||||||
|
def __init__(self, fail_checksum=False):
|
||||||
|
self.containers = defaultdict(dict)
|
||||||
|
self.fail_checksum = fail_checksum
|
||||||
|
|
||||||
|
def head_object(self, container, path):
|
||||||
|
return self.containers[container].get(path)
|
||||||
|
|
||||||
|
def put_object(self, container, path, content, chunk_size=None, content_type=None, headers=None):
|
||||||
|
if not isinstance(content, str):
|
||||||
|
if hasattr(content, 'read'):
|
||||||
|
content = content.read()
|
||||||
|
else:
|
||||||
|
content = ''.join(content)
|
||||||
|
|
||||||
|
self.containers[container][path] = {
|
||||||
|
'content': content,
|
||||||
|
'chunk_size': chunk_size,
|
||||||
|
'content_type': content_type,
|
||||||
|
'headers': headers,
|
||||||
|
}
|
||||||
|
|
||||||
|
digest = hashlib.md5()
|
||||||
|
digest.update(content)
|
||||||
|
return digest.hexdigest() if not self.fail_checksum else 'invalid'
|
||||||
|
|
||||||
|
def get_object(self, container, path, resp_chunk_size=None):
|
||||||
|
data = self.containers[container].get(path, {})
|
||||||
|
if 'X-Object-Manifest' in data['headers']:
|
||||||
|
new_contents = []
|
||||||
|
prefix = data['headers']['X-Object-Manifest']
|
||||||
|
for key, value in self.containers[container].iteritems():
|
||||||
|
if ('container-name/' + key).startswith(prefix):
|
||||||
|
new_contents.append((key, value['content']))
|
||||||
|
|
||||||
|
new_contents.sort(key=lambda value: value[0])
|
||||||
|
|
||||||
|
data = dict(data)
|
||||||
|
data['content'] = ''.join([nc[1] for nc in new_contents])
|
||||||
|
return bool(data), data.get('content')
|
||||||
|
|
||||||
|
return bool(data), data.get('content')
|
||||||
|
|
||||||
|
def delete_object(self, container, path):
|
||||||
|
self.containers[container].pop(path, None)
|
||||||
|
|
||||||
|
|
||||||
|
class FakeQueue(object):
|
||||||
|
def __init__(self):
|
||||||
|
self.items = []
|
||||||
|
|
||||||
|
def get(self):
|
||||||
|
if not self.items:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return self.items.pop()
|
||||||
|
|
||||||
|
def put(self, names, item, available_after=0):
|
||||||
|
self.items.append({
|
||||||
|
'names': names,
|
||||||
|
'item': item,
|
||||||
|
'available_after': available_after,
|
||||||
|
})
|
||||||
|
|
||||||
|
def test_fixed_path_concat():
|
||||||
|
swift = MockSwiftStorage(**base_args)
|
||||||
|
swift.exists('object/path')
|
||||||
|
swift._get_connection().head_object.assert_called_with('container-name', 'basepath/object/path')
|
||||||
|
|
||||||
|
|
||||||
|
def test_simple_path_concat():
|
||||||
|
simple_concat_args = dict(base_args)
|
||||||
|
simple_concat_args['simple_path_concat'] = True
|
||||||
|
swift = MockSwiftStorage(**simple_concat_args)
|
||||||
|
swift.exists('object/path')
|
||||||
|
swift._get_connection().head_object.assert_called_with('container-name', 'basepathobject/path')
|
||||||
|
|
||||||
|
def test_delete_unknown_path():
|
||||||
|
swift = SwiftStorage(**base_args)
|
||||||
|
with pytest.raises(IOError):
|
||||||
|
swift.remove('someunknownpath')
|
||||||
|
|
||||||
|
def test_simple_put_get():
|
||||||
|
swift = FakeSwiftStorage(**base_args)
|
||||||
|
assert not swift.exists('somepath')
|
||||||
|
|
||||||
|
swift.put_content('somepath', 'hello world!')
|
||||||
|
assert swift.exists('somepath')
|
||||||
|
assert swift.get_content('somepath') == 'hello world!'
|
||||||
|
|
||||||
|
def test_stream_read_write():
|
||||||
|
swift = FakeSwiftStorage(**base_args)
|
||||||
|
assert not swift.exists('somepath')
|
||||||
|
|
||||||
|
swift.stream_write('somepath', io.BytesIO('some content here'))
|
||||||
|
assert swift.exists('somepath')
|
||||||
|
assert swift.get_content('somepath') == 'some content here'
|
||||||
|
assert ''.join(list(swift.stream_read('somepath'))) == 'some content here'
|
||||||
|
|
||||||
|
def test_stream_read_write_invalid_checksum():
|
||||||
|
swift = FakeSwiftStorage(fail_checksum=True, **base_args)
|
||||||
|
assert not swift.exists('somepath')
|
||||||
|
|
||||||
|
with pytest.raises(IOError):
|
||||||
|
swift.stream_write('somepath', io.BytesIO('some content here'))
|
||||||
|
|
||||||
|
def test_remove():
|
||||||
|
swift = FakeSwiftStorage(**base_args)
|
||||||
|
assert not swift.exists('somepath')
|
||||||
|
|
||||||
|
swift.put_content('somepath', 'hello world!')
|
||||||
|
assert swift.exists('somepath')
|
||||||
|
|
||||||
|
swift.remove('somepath')
|
||||||
|
assert not swift.exists('somepath')
|
||||||
|
|
||||||
|
def test_checksum():
|
||||||
|
swift = FakeSwiftStorage(**base_args)
|
||||||
|
swift.put_content('somepath', 'hello world!')
|
||||||
|
assert swift.get_checksum('somepath') is not None
|
||||||
|
|
||||||
|
def test_chunked_upload():
|
||||||
|
swift = FakeSwiftStorage(**base_args)
|
||||||
|
uuid, metadata = swift.initiate_chunked_upload()
|
||||||
|
|
||||||
|
chunks = ['this', 'is', 'some', 'chunked', 'data', '']
|
||||||
|
offset = 0
|
||||||
|
for chunk in chunks:
|
||||||
|
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, len(chunk),
|
||||||
|
io.BytesIO(chunk), metadata)
|
||||||
|
assert error is None
|
||||||
|
assert len(chunk) == bytes_written
|
||||||
|
offset += len(chunk)
|
||||||
|
|
||||||
|
swift.complete_chunked_upload(uuid, 'somepath', metadata)
|
||||||
|
assert swift.get_content('somepath') == ''.join(chunks)
|
||||||
|
|
||||||
|
def test_cancel_chunked_upload():
|
||||||
|
swift = FakeSwiftStorage(**base_args)
|
||||||
|
uuid, metadata = swift.initiate_chunked_upload()
|
||||||
|
|
||||||
|
chunks = ['this', 'is', 'some', 'chunked', 'data', '']
|
||||||
|
offset = 0
|
||||||
|
for chunk in chunks:
|
||||||
|
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, len(chunk),
|
||||||
|
io.BytesIO(chunk), metadata)
|
||||||
|
assert error is None
|
||||||
|
assert len(chunk) == bytes_written
|
||||||
|
offset += len(chunk)
|
||||||
|
|
||||||
|
swift.cancel_chunked_upload(uuid, metadata)
|
||||||
|
for segment in SwiftStorage._segment_list_from_metadata(metadata):
|
||||||
|
assert not swift.exists(segment.path)
|
||||||
|
|
||||||
|
def test_empty_chunks_queued_for_deletion():
|
||||||
|
chunk_cleanup_queue = FakeQueue()
|
||||||
|
args = dict(base_args)
|
||||||
|
args['context'] = StorageContext('nyc', None, chunk_cleanup_queue)
|
||||||
|
|
||||||
|
swift = FakeSwiftStorage(**args)
|
||||||
|
uuid, metadata = swift.initiate_chunked_upload()
|
||||||
|
|
||||||
|
chunks = ['this', '', 'is', 'some', '', 'chunked', 'data', '']
|
||||||
|
offset = 0
|
||||||
|
for chunk in chunks:
|
||||||
|
length = len(chunk)
|
||||||
|
if length == 0:
|
||||||
|
length = 1
|
||||||
|
|
||||||
|
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, length,
|
||||||
|
io.BytesIO(chunk), metadata)
|
||||||
|
assert error is None
|
||||||
|
assert len(chunk) == bytes_written
|
||||||
|
offset += len(chunk)
|
||||||
|
|
||||||
|
swift.complete_chunked_upload(uuid, 'somepath', metadata)
|
||||||
|
assert ''.join(chunks) == swift.get_content('somepath')
|
||||||
|
|
||||||
|
# Check the chunk deletion queue and ensure we have the last chunk queued.
|
||||||
|
found = chunk_cleanup_queue.get()
|
||||||
|
assert found is not None
|
||||||
|
|
||||||
|
found2 = chunk_cleanup_queue.get()
|
||||||
|
assert found2 is None
|
|
@ -1,223 +0,0 @@
|
||||||
import io
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
from collections import defaultdict
|
|
||||||
from mock import MagicMock
|
|
||||||
|
|
||||||
from storage import StorageContext
|
|
||||||
from storage.swift import SwiftStorage
|
|
||||||
|
|
||||||
|
|
||||||
class MockSwiftStorage(SwiftStorage):
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super(MockSwiftStorage, self).__init__(*args, **kwargs)
|
|
||||||
self._connection = MagicMock()
|
|
||||||
|
|
||||||
def _get_connection(self):
|
|
||||||
return self._connection
|
|
||||||
|
|
||||||
|
|
||||||
class MockSwiftTests(unittest.TestCase):
|
|
||||||
base_args = {
|
|
||||||
'context': StorageContext('nyc', None, None),
|
|
||||||
'swift_container': 'container-name',
|
|
||||||
'storage_path': '/basepath',
|
|
||||||
'auth_url': 'https://auth.com',
|
|
||||||
'swift_user': 'root',
|
|
||||||
'swift_password': 'password',
|
|
||||||
}
|
|
||||||
|
|
||||||
def test_fixed_path_concat(self):
|
|
||||||
swift = MockSwiftStorage(**self.base_args)
|
|
||||||
swift.exists('object/path')
|
|
||||||
swift._get_connection().head_object.assert_called_with('container-name', 'basepath/object/path')
|
|
||||||
|
|
||||||
|
|
||||||
def test_simple_path_concat(self):
|
|
||||||
simple_concat_args = dict(self.base_args)
|
|
||||||
simple_concat_args['simple_path_concat'] = True
|
|
||||||
swift = MockSwiftStorage(**simple_concat_args)
|
|
||||||
swift.exists('object/path')
|
|
||||||
swift._get_connection().head_object.assert_called_with('container-name', 'basepathobject/path')
|
|
||||||
|
|
||||||
def test_delete_unknown_path(self):
|
|
||||||
swift = SwiftStorage(**self.base_args)
|
|
||||||
with self.assertRaises(IOError):
|
|
||||||
swift.remove('someunknownpath')
|
|
||||||
|
|
||||||
|
|
||||||
class FakeSwiftStorage(SwiftStorage):
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super(FakeSwiftStorage, self).__init__(*args, **kwargs)
|
|
||||||
self._connection = FakeSwift()
|
|
||||||
|
|
||||||
def _get_connection(self):
|
|
||||||
return self._connection
|
|
||||||
|
|
||||||
|
|
||||||
class FakeSwift(object):
|
|
||||||
def __init__(self):
|
|
||||||
self.containers = defaultdict(dict)
|
|
||||||
|
|
||||||
def head_object(self, container, path):
|
|
||||||
return self.containers[container].get(path)
|
|
||||||
|
|
||||||
def put_object(self, container, path, content, chunk_size=None, content_type=None, headers=None):
|
|
||||||
if not isinstance(content, str):
|
|
||||||
content = content.read()
|
|
||||||
|
|
||||||
self.containers[container][path] = {
|
|
||||||
'content': content,
|
|
||||||
'chunk_size': chunk_size,
|
|
||||||
'content_type': content_type,
|
|
||||||
'headers': headers,
|
|
||||||
}
|
|
||||||
|
|
||||||
def get_object(self, container, path, resp_chunk_size=None):
|
|
||||||
data = self.containers[container].get(path, {})
|
|
||||||
if 'X-Object-Manifest' in data['headers']:
|
|
||||||
new_contents = []
|
|
||||||
prefix = data['headers']['X-Object-Manifest']
|
|
||||||
for key, value in self.containers[container].iteritems():
|
|
||||||
if ('container-name/' + key).startswith(prefix):
|
|
||||||
new_contents.append((key, value['content']))
|
|
||||||
|
|
||||||
new_contents.sort(key=lambda value: value[0])
|
|
||||||
|
|
||||||
data = dict(data)
|
|
||||||
data['content'] = ''.join([nc[1] for nc in new_contents])
|
|
||||||
return bool(data), data.get('content')
|
|
||||||
|
|
||||||
return bool(data), data.get('content')
|
|
||||||
|
|
||||||
def delete_object(self, container, path):
|
|
||||||
self.containers[container].pop(path, None)
|
|
||||||
|
|
||||||
|
|
||||||
class FakeQueue(object):
|
|
||||||
def __init__(self):
|
|
||||||
self.items = []
|
|
||||||
|
|
||||||
def get(self):
|
|
||||||
if not self.items:
|
|
||||||
return None
|
|
||||||
|
|
||||||
return self.items.pop()
|
|
||||||
|
|
||||||
def put(self, names, item, available_after=0):
|
|
||||||
self.items.append({
|
|
||||||
'names': names,
|
|
||||||
'item': item,
|
|
||||||
'available_after': available_after,
|
|
||||||
})
|
|
||||||
|
|
||||||
class FakeSwiftTests(unittest.TestCase):
|
|
||||||
base_args = {
|
|
||||||
'context': StorageContext('nyc', None, None),
|
|
||||||
'swift_container': 'container-name',
|
|
||||||
'storage_path': '/basepath',
|
|
||||||
'auth_url': 'https://auth.com',
|
|
||||||
'swift_user': 'root',
|
|
||||||
'swift_password': 'password',
|
|
||||||
}
|
|
||||||
|
|
||||||
def test_simple_put_get(self):
|
|
||||||
swift = FakeSwiftStorage(**self.base_args)
|
|
||||||
self.assertFalse(swift.exists('somepath'))
|
|
||||||
|
|
||||||
swift.put_content('somepath', 'hello world!')
|
|
||||||
self.assertTrue(swift.exists('somepath'))
|
|
||||||
|
|
||||||
self.assertEquals('hello world!', swift.get_content('somepath'))
|
|
||||||
|
|
||||||
def test_stream_read_write(self):
|
|
||||||
swift = FakeSwiftStorage(**self.base_args)
|
|
||||||
self.assertFalse(swift.exists('somepath'))
|
|
||||||
|
|
||||||
swift.stream_write('somepath', io.BytesIO('some content here'))
|
|
||||||
self.assertTrue(swift.exists('somepath'))
|
|
||||||
|
|
||||||
self.assertEquals('some content here', swift.get_content('somepath'))
|
|
||||||
self.assertEquals('some content here', ''.join(list(swift.stream_read('somepath'))))
|
|
||||||
|
|
||||||
def test_remove(self):
|
|
||||||
swift = FakeSwiftStorage(**self.base_args)
|
|
||||||
self.assertFalse(swift.exists('somepath'))
|
|
||||||
|
|
||||||
swift.put_content('somepath', 'hello world!')
|
|
||||||
self.assertTrue(swift.exists('somepath'))
|
|
||||||
|
|
||||||
swift.remove('somepath')
|
|
||||||
self.assertFalse(swift.exists('somepath'))
|
|
||||||
|
|
||||||
def test_checksum(self):
|
|
||||||
swift = FakeSwiftStorage(**self.base_args)
|
|
||||||
swift.put_content('somepath', 'hello world!')
|
|
||||||
self.assertIsNotNone(swift.get_checksum('somepath'))
|
|
||||||
|
|
||||||
def test_chunked_upload(self):
|
|
||||||
swift = FakeSwiftStorage(**self.base_args)
|
|
||||||
uuid, metadata = swift.initiate_chunked_upload()
|
|
||||||
|
|
||||||
chunks = ['this', 'is', 'some', 'chunked', 'data', '']
|
|
||||||
offset = 0
|
|
||||||
for chunk in chunks:
|
|
||||||
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, len(chunk),
|
|
||||||
io.BytesIO(chunk), metadata)
|
|
||||||
self.assertIsNone(error)
|
|
||||||
self.assertEquals(bytes_written, len(chunk))
|
|
||||||
offset += len(chunk)
|
|
||||||
|
|
||||||
swift.complete_chunked_upload(uuid, 'somepath', metadata)
|
|
||||||
self.assertEquals(''.join(chunks), swift.get_content('somepath'))
|
|
||||||
|
|
||||||
def test_cancel_chunked_upload(self):
|
|
||||||
swift = FakeSwiftStorage(**self.base_args)
|
|
||||||
uuid, metadata = swift.initiate_chunked_upload()
|
|
||||||
|
|
||||||
chunks = ['this', 'is', 'some', 'chunked', 'data', '']
|
|
||||||
offset = 0
|
|
||||||
for chunk in chunks:
|
|
||||||
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, len(chunk),
|
|
||||||
io.BytesIO(chunk), metadata)
|
|
||||||
self.assertIsNone(error)
|
|
||||||
self.assertEquals(bytes_written, len(chunk))
|
|
||||||
offset += len(chunk)
|
|
||||||
|
|
||||||
swift.cancel_chunked_upload(uuid, metadata)
|
|
||||||
for segment in SwiftStorage._segment_list_from_metadata(metadata):
|
|
||||||
self.assertFalse(swift.exists(segment.path))
|
|
||||||
|
|
||||||
def test_empty_chunks_queued_for_deletion(self):
|
|
||||||
chunk_cleanup_queue = FakeQueue()
|
|
||||||
args = dict(self.base_args)
|
|
||||||
args['context'] = StorageContext('nyc', None, chunk_cleanup_queue)
|
|
||||||
|
|
||||||
swift = FakeSwiftStorage(**args)
|
|
||||||
uuid, metadata = swift.initiate_chunked_upload()
|
|
||||||
|
|
||||||
chunks = ['this', '', 'is', 'some', '', 'chunked', 'data', '']
|
|
||||||
offset = 0
|
|
||||||
for chunk in chunks:
|
|
||||||
length = len(chunk)
|
|
||||||
if length == 0:
|
|
||||||
length = 1
|
|
||||||
|
|
||||||
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, length,
|
|
||||||
io.BytesIO(chunk), metadata)
|
|
||||||
self.assertIsNone(error)
|
|
||||||
self.assertEquals(bytes_written, len(chunk))
|
|
||||||
offset += len(chunk)
|
|
||||||
|
|
||||||
swift.complete_chunked_upload(uuid, 'somepath', metadata)
|
|
||||||
self.assertEquals(''.join(chunks), swift.get_content('somepath'))
|
|
||||||
|
|
||||||
# Check the chunk deletion queue and ensure we have the last chunk queued.
|
|
||||||
found = chunk_cleanup_queue.get()
|
|
||||||
self.assertIsNotNone(found)
|
|
||||||
|
|
||||||
found2 = chunk_cleanup_queue.get()
|
|
||||||
self.assertIsNone(found2)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main()
|
|
Reference in a new issue