Check the returned ETag in Swift when streaming data
This ensures that if Swift mis-writes the data, we know immediately and can fail
This commit is contained in:
parent
6b272cf7e6
commit
688312bb29
2 changed files with 38 additions and 8 deletions
|
@ -11,7 +11,7 @@ import json
|
|||
from _pyio import BufferedReader
|
||||
from uuid import uuid4
|
||||
|
||||
from swiftclient.client import Connection, ClientException
|
||||
from swiftclient.client import Connection, ClientException, ReadableToIterable
|
||||
from urlparse import urlparse
|
||||
from random import SystemRandom
|
||||
from hashlib import sha1
|
||||
|
@ -108,10 +108,14 @@ class SwiftStorage(BaseStorage):
|
|||
if content_encoding is not None:
|
||||
headers['Content-Encoding'] = content_encoding
|
||||
|
||||
is_filelike = hasattr(content, 'read')
|
||||
if is_filelike:
|
||||
content = ReadableToIterable(content, md5=True)
|
||||
|
||||
try:
|
||||
self._get_connection().put_object(self._swift_container, path, content,
|
||||
chunk_size=chunk, content_type=content_type,
|
||||
headers=headers)
|
||||
etag = self._get_connection().put_object(self._swift_container, path, content,
|
||||
chunk_size=chunk, content_type=content_type,
|
||||
headers=headers)
|
||||
except ClientException:
|
||||
# We re-raise client exception here so that validation of config during setup can see
|
||||
# the client exception messages.
|
||||
|
@ -120,6 +124,16 @@ class SwiftStorage(BaseStorage):
|
|||
logger.exception('Could not put object at path %s: %s', path, ex)
|
||||
raise IOError("Could not put content: %s" % path)
|
||||
|
||||
# If we wrapped the content in a ReadableToIterable, compare its MD5 to the etag returned. If
|
||||
# they don't match, raise an IOError indicating a write failure.
|
||||
if is_filelike:
|
||||
if etag != content.get_md5sum():
|
||||
logger.error('Got mismatch in md5 etag for path %s: Expected %s, but server has %s', path,
|
||||
content.get_md5sum(), etag)
|
||||
raise IOError('upload verification failed for path {0}:'
|
||||
'md5 mismatch, local {1} != remote {2}'
|
||||
.format(path, content.get_md5sum(), etag))
|
||||
|
||||
def _head_object(self, path):
|
||||
path = self._normalize_path(path)
|
||||
try:
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import io
|
||||
import pytest
|
||||
import hashlib
|
||||
|
||||
from collections import defaultdict
|
||||
from mock import MagicMock
|
||||
|
@ -25,24 +26,28 @@ class MockSwiftStorage(SwiftStorage):
|
|||
return self._connection
|
||||
|
||||
class FakeSwiftStorage(SwiftStorage):
|
||||
def __init__(self, *args, **kwargs):
|
||||
def __init__(self, fail_checksum=False, *args, **kwargs):
|
||||
super(FakeSwiftStorage, self).__init__(*args, **kwargs)
|
||||
self._connection = FakeSwift()
|
||||
self._connection = FakeSwift(fail_checksum=fail_checksum)
|
||||
|
||||
def _get_connection(self):
|
||||
return self._connection
|
||||
|
||||
|
||||
class FakeSwift(object):
|
||||
def __init__(self):
|
||||
def __init__(self, fail_checksum=False):
|
||||
self.containers = defaultdict(dict)
|
||||
self.fail_checksum = fail_checksum
|
||||
|
||||
def head_object(self, container, path):
|
||||
return self.containers[container].get(path)
|
||||
|
||||
def put_object(self, container, path, content, chunk_size=None, content_type=None, headers=None):
|
||||
if not isinstance(content, str):
|
||||
content = content.read()
|
||||
if hasattr(content, 'read'):
|
||||
content = content.read()
|
||||
else:
|
||||
content = ''.join(content)
|
||||
|
||||
self.containers[container][path] = {
|
||||
'content': content,
|
||||
|
@ -51,6 +56,10 @@ class FakeSwift(object):
|
|||
'headers': headers,
|
||||
}
|
||||
|
||||
digest = hashlib.md5()
|
||||
digest.update(content)
|
||||
return digest.hexdigest() if not self.fail_checksum else 'invalid'
|
||||
|
||||
def get_object(self, container, path, resp_chunk_size=None):
|
||||
data = self.containers[container].get(path, {})
|
||||
if 'X-Object-Manifest' in data['headers']:
|
||||
|
@ -124,6 +133,13 @@ def test_stream_read_write():
|
|||
assert swift.get_content('somepath') == 'some content here'
|
||||
assert ''.join(list(swift.stream_read('somepath'))) == 'some content here'
|
||||
|
||||
def test_stream_read_write_invalid_checksum():
|
||||
swift = FakeSwiftStorage(fail_checksum=True, **base_args)
|
||||
assert not swift.exists('somepath')
|
||||
|
||||
with pytest.raises(IOError):
|
||||
swift.stream_write('somepath', io.BytesIO('some content here'))
|
||||
|
||||
def test_remove():
|
||||
swift = FakeSwiftStorage(**base_args)
|
||||
assert not swift.exists('somepath')
|
||||
|
|
Reference in a new issue