Merge pull request #3086 from quay/joseph.schorr/QUAY-942/swift-delete-fix
Have Swift storage delete segments when deleting dynamic large objects
This commit is contained in:
commit
2e1a3c7184
3 changed files with 105 additions and 36 deletions
|
@ -9,19 +9,20 @@ import string
|
||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
|
|
||||||
from cachetools import lru_cache
|
|
||||||
from _pyio import BufferedReader
|
from _pyio import BufferedReader
|
||||||
|
|
||||||
|
from collections import namedtuple
|
||||||
|
from hashlib import sha1
|
||||||
|
from random import SystemRandom
|
||||||
|
from time import time
|
||||||
|
from urlparse import urlparse
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
|
from cachetools import lru_cache
|
||||||
from swiftclient.client import Connection, ClientException, ReadableToIterable
|
from swiftclient.client import Connection, ClientException, ReadableToIterable
|
||||||
from urlparse import urlparse
|
|
||||||
from random import SystemRandom
|
|
||||||
from hashlib import sha1
|
|
||||||
from time import time
|
|
||||||
from collections import namedtuple
|
|
||||||
|
|
||||||
from util.registry import filelike
|
|
||||||
from storage.basestorage import BaseStorage
|
from storage.basestorage import BaseStorage
|
||||||
|
from util.registry import filelike
|
||||||
from util.registry.generatorfile import GeneratorFile
|
from util.registry.generatorfile import GeneratorFile
|
||||||
|
|
||||||
|
|
||||||
|
@ -98,7 +99,7 @@ class SwiftStorage(BaseStorage):
|
||||||
_, obj = self._get_connection().get_object(self._swift_container, path,
|
_, obj = self._get_connection().get_object(self._swift_container, path,
|
||||||
resp_chunk_size=chunk_size)
|
resp_chunk_size=chunk_size)
|
||||||
return obj
|
return obj
|
||||||
except Exception as ex:
|
except ClientException as ex:
|
||||||
logger.exception('Could not get object at path %s: %s', path, ex)
|
logger.exception('Could not get object at path %s: %s', path, ex)
|
||||||
raise IOError('Path %s not found' % path)
|
raise IOError('Path %s not found' % path)
|
||||||
|
|
||||||
|
@ -122,9 +123,6 @@ class SwiftStorage(BaseStorage):
|
||||||
# We re-raise client exception here so that validation of config during setup can see
|
# We re-raise client exception here so that validation of config during setup can see
|
||||||
# the client exception messages.
|
# the client exception messages.
|
||||||
raise
|
raise
|
||||||
except Exception as ex:
|
|
||||||
logger.exception('Could not put object at path %s: %s', path, ex)
|
|
||||||
raise IOError("Could not put content: %s" % path)
|
|
||||||
|
|
||||||
# If we wrapped the content in a ReadableToIterable, compare its MD5 to the etag returned. If
|
# If we wrapped the content in a ReadableToIterable, compare its MD5 to the etag returned. If
|
||||||
# they don't match, raise an IOError indicating a write failure.
|
# they don't match, raise an IOError indicating a write failure.
|
||||||
|
@ -142,12 +140,9 @@ class SwiftStorage(BaseStorage):
|
||||||
return self._get_connection().head_object(self._swift_container, path)
|
return self._get_connection().head_object(self._swift_container, path)
|
||||||
except ClientException as ce:
|
except ClientException as ce:
|
||||||
if ce.http_status != 404:
|
if ce.http_status != 404:
|
||||||
logger.exception('Could not head object at path %s: %s', path, ex)
|
logger.exception('Could not head object at path %s: %s', path, ce)
|
||||||
|
|
||||||
return None
|
return None
|
||||||
except Exception as ex:
|
|
||||||
logger.exception('Could not head object at path %s: %s', path, ex)
|
|
||||||
return None
|
|
||||||
|
|
||||||
@lru_cache(maxsize=1)
|
@lru_cache(maxsize=1)
|
||||||
def _get_root_storage_url(self):
|
def _get_root_storage_url(self):
|
||||||
|
@ -157,7 +152,8 @@ class SwiftStorage(BaseStorage):
|
||||||
storage_url, _ = self._get_connection().get_auth()
|
storage_url, _ = self._get_connection().get_auth()
|
||||||
return storage_url
|
return storage_url
|
||||||
|
|
||||||
def get_direct_download_url(self, object_path, request_ip=None, expires_in=60, requires_cors=False, head=False):
|
def get_direct_download_url(self, object_path, request_ip=None, expires_in=60,
|
||||||
|
requires_cors=False, head=False):
|
||||||
if requires_cors:
|
if requires_cors:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -230,13 +226,56 @@ class SwiftStorage(BaseStorage):
|
||||||
return bool(self._head_object(path))
|
return bool(self._head_object(path))
|
||||||
|
|
||||||
def remove(self, path):
|
def remove(self, path):
|
||||||
|
# Retrieve the object so we can see if it is segmented. If so, we'll delete its segments after
|
||||||
|
# removing the object.
|
||||||
|
try:
|
||||||
|
headers = self._head_object(path)
|
||||||
|
except ClientException as ex:
|
||||||
|
logger.warning('Could not head for delete of path %s: %s', path, str(ex))
|
||||||
|
raise IOError('Cannot delete path: %s' % path)
|
||||||
|
|
||||||
|
logger.debug('Found headers for path %s to delete: %s', path, headers)
|
||||||
|
|
||||||
|
# Delete the path itself.
|
||||||
path = self._normalize_path(path)
|
path = self._normalize_path(path)
|
||||||
try:
|
try:
|
||||||
self._get_connection().delete_object(self._swift_container, path)
|
self._get_connection().delete_object(self._swift_container, path)
|
||||||
except Exception as ex:
|
except ClientException as ex:
|
||||||
logger.warning('Could not delete path %s: %s', path, str(ex))
|
logger.warning('Could not delete path %s: %s', path, str(ex))
|
||||||
raise IOError('Cannot delete path: %s' % path)
|
raise IOError('Cannot delete path: %s' % path)
|
||||||
|
|
||||||
|
# Delete the segments.
|
||||||
|
object_manifest = headers.get('x-object-manifest', headers.get('X-Object-Manifest'))
|
||||||
|
if object_manifest is not None:
|
||||||
|
logger.debug('Found DLO for path %s: %s', path, object_manifest)
|
||||||
|
|
||||||
|
# Remove the container name from the beginning.
|
||||||
|
container_name, prefix_path = object_manifest.split('/', 1)
|
||||||
|
if container_name != self._swift_container:
|
||||||
|
logger.error('Expected container name %s, found path %s', self._swift_container,
|
||||||
|
prefix_path)
|
||||||
|
raise Exception("How did we end up with an invalid container name?")
|
||||||
|
|
||||||
|
logger.debug('Loading Dynamic Large Object segments for path prefix %s', prefix_path)
|
||||||
|
try:
|
||||||
|
_, container_objects = self._get_connection().get_container(self._swift_container,
|
||||||
|
full_listing=True,
|
||||||
|
prefix=prefix_path)
|
||||||
|
except ClientException as ex:
|
||||||
|
logger.warning('Could not load objects with prefix path %s: %s', prefix_path, str(ex))
|
||||||
|
raise IOError('Cannot load path: %s' % prefix_path)
|
||||||
|
|
||||||
|
logger.debug('Found Dynamic Large Object segments for path prefix %s: %s', prefix_path,
|
||||||
|
len(container_objects))
|
||||||
|
for obj in container_objects:
|
||||||
|
try:
|
||||||
|
logger.debug('Deleting Dynamic Large Object segment %s for path prefix %s', obj['name'],
|
||||||
|
prefix_path)
|
||||||
|
self._get_connection().delete_object(self._swift_container, obj['name'])
|
||||||
|
except ClientException as ex:
|
||||||
|
logger.warning('Could not delete object with path %s: %s', obj['name'], str(ex))
|
||||||
|
raise IOError('Cannot delete path: %s' % obj['name'])
|
||||||
|
|
||||||
def _random_checksum(self, count):
|
def _random_checksum(self, count):
|
||||||
chars = string.ascii_uppercase + string.digits
|
chars = string.ascii_uppercase + string.digits
|
||||||
return ''.join(SystemRandom().choice(chars) for _ in range(count))
|
return ''.join(SystemRandom().choice(chars) for _ in range(count))
|
||||||
|
@ -346,7 +385,9 @@ class SwiftStorage(BaseStorage):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Queue the chunk to be deleted, as it is empty and therefore unused.
|
# Queue the chunk to be deleted, as it is empty and therefore unused.
|
||||||
self._context.chunk_cleanup_queue.put(['segment/%s/%s' % (self._context.location, uuid)], json.dumps({
|
self._context.chunk_cleanup_queue.put(
|
||||||
|
['segment/%s/%s' % (self._context.location, uuid)],
|
||||||
|
json.dumps({
|
||||||
'location': self._context.location,
|
'location': self._context.location,
|
||||||
'uuid': uuid,
|
'uuid': uuid,
|
||||||
'path': segment.path,
|
'path': segment.path,
|
||||||
|
@ -381,9 +422,10 @@ class SwiftStorage(BaseStorage):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._get_connection().copy_object(self._swift_container, normalized_path, target)
|
self._get_connection().copy_object(self._swift_container, normalized_path, target)
|
||||||
except Exception as ex:
|
except ClientException as ex:
|
||||||
logger.exception('Could not swift copy path %s: %s', path, ex)
|
logger.exception('Could not swift copy path %s: %s', path, ex)
|
||||||
raise IOError('Failed to swift copy path %s' % path)
|
raise IOError('Failed to swift copy path %s' % path)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# Fallback to a slower, default copy.
|
# Fallback to a slower, default copy.
|
||||||
|
|
|
@ -50,13 +50,24 @@ class FakeSwift(object):
|
||||||
return 'http://fake/swift', None
|
return 'http://fake/swift', None
|
||||||
|
|
||||||
def head_object(self, container, path):
|
def head_object(self, container, path):
|
||||||
return self.containers[container].get(path)
|
return self.containers.get(container, {}).get(path, {}).get('headers', None)
|
||||||
|
|
||||||
def copy_object(self, container, path, target):
|
def copy_object(self, container, path, target):
|
||||||
pieces = target.split('/', 2)
|
pieces = target.split('/', 2)
|
||||||
_, content = self.get_object(container, path)
|
_, content = self.get_object(container, path)
|
||||||
self.put_object(pieces[1], pieces[2], content)
|
self.put_object(pieces[1], pieces[2], content)
|
||||||
|
|
||||||
|
def get_container(self, container, prefix=None, full_listing=None):
|
||||||
|
container_entries = self.containers[container]
|
||||||
|
objs = []
|
||||||
|
for path, data in list(container_entries.iteritems()):
|
||||||
|
if not prefix or path.startswith(prefix):
|
||||||
|
objs.append({
|
||||||
|
'name': path,
|
||||||
|
'bytes': len(data['content']),
|
||||||
|
})
|
||||||
|
return {}, objs
|
||||||
|
|
||||||
def put_object(self, container, path, content, chunk_size=None, content_type=None, headers=None):
|
def put_object(self, container, path, content, chunk_size=None, content_type=None, headers=None):
|
||||||
if not isinstance(content, str):
|
if not isinstance(content, str):
|
||||||
if hasattr(content, 'read'):
|
if hasattr(content, 'read'):
|
||||||
|
@ -68,7 +79,7 @@ class FakeSwift(object):
|
||||||
'content': content,
|
'content': content,
|
||||||
'chunk_size': chunk_size,
|
'chunk_size': chunk_size,
|
||||||
'content_type': content_type,
|
'content_type': content_type,
|
||||||
'headers': headers or {},
|
'headers': headers or {'is': True},
|
||||||
}
|
}
|
||||||
|
|
||||||
digest = hashlib.md5()
|
digest = hashlib.md5()
|
||||||
|
@ -238,6 +249,18 @@ def test_chunked_upload(chunks, max_chunk_size, read_until_end):
|
||||||
swift.complete_chunked_upload(uuid, 'somepath', metadata)
|
swift.complete_chunked_upload(uuid, 'somepath', metadata)
|
||||||
assert swift.get_content('somepath') == ''.join(chunks)
|
assert swift.get_content('somepath') == ''.join(chunks)
|
||||||
|
|
||||||
|
# Ensure each of the segments exist.
|
||||||
|
for segment in metadata['segments']:
|
||||||
|
assert swift.exists(segment.path)
|
||||||
|
|
||||||
|
# Delete the file and ensure all of its segments were removed.
|
||||||
|
swift.remove('somepath')
|
||||||
|
assert not swift.exists('somepath')
|
||||||
|
|
||||||
|
for segment in metadata['segments']:
|
||||||
|
assert not swift.exists(segment.path)
|
||||||
|
|
||||||
|
|
||||||
def test_cancel_chunked_upload():
|
def test_cancel_chunked_upload():
|
||||||
swift = FakeSwiftStorage(**base_args)
|
swift = FakeSwiftStorage(**base_args)
|
||||||
uuid, metadata = swift.initiate_chunked_upload()
|
uuid, metadata = swift.initiate_chunked_upload()
|
||||||
|
|
|
@ -20,6 +20,10 @@ class ChunkCleanupWorker(QueueWorker):
|
||||||
storage_location = job_details['location']
|
storage_location = job_details['location']
|
||||||
storage_path = job_details['path']
|
storage_path = job_details['path']
|
||||||
|
|
||||||
|
if not storage.exists([storage_location], storage_path):
|
||||||
|
logger.debug('Chunk already deleted')
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
storage.remove([storage_location], storage_path)
|
storage.remove([storage_location], storage_path)
|
||||||
except IOError:
|
except IOError:
|
||||||
|
|
Reference in a new issue