Have Swift storage delete segments when deleting dynamic large objects

This ensures that we reclaim the space, rather than simply deleting the manifest

Fixes https://jira.coreos.com/browse/QUAY-942
This commit is contained in:
Joseph Schorr 2018-05-16 16:01:49 -04:00
parent 66b4e45929
commit 57523d22de
3 changed files with 105 additions and 36 deletions

View file

@ -9,19 +9,20 @@ import string
import logging
import json
from cachetools import lru_cache
from _pyio import BufferedReader
from collections import namedtuple
from hashlib import sha1
from random import SystemRandom
from time import time
from urlparse import urlparse
from uuid import uuid4
from cachetools import lru_cache
from swiftclient.client import Connection, ClientException, ReadableToIterable
from urlparse import urlparse
from random import SystemRandom
from hashlib import sha1
from time import time
from collections import namedtuple
from util.registry import filelike
from storage.basestorage import BaseStorage
from util.registry import filelike
from util.registry.generatorfile import GeneratorFile
@ -98,7 +99,7 @@ class SwiftStorage(BaseStorage):
_, obj = self._get_connection().get_object(self._swift_container, path,
resp_chunk_size=chunk_size)
return obj
except Exception as ex:
except ClientException as ex:
logger.exception('Could not get object at path %s: %s', path, ex)
raise IOError('Path %s not found' % path)
@ -122,9 +123,6 @@ class SwiftStorage(BaseStorage):
# We re-raise client exception here so that validation of config during setup can see
# the client exception messages.
raise
except Exception as ex:
logger.exception('Could not put object at path %s: %s', path, ex)
raise IOError("Could not put content: %s" % path)
# If we wrapped the content in a ReadableToIterable, compare its MD5 to the etag returned. If
# they don't match, raise an IOError indicating a write failure.
@ -142,12 +140,9 @@ class SwiftStorage(BaseStorage):
return self._get_connection().head_object(self._swift_container, path)
except ClientException as ce:
if ce.http_status != 404:
logger.exception('Could not head object at path %s: %s', path, ex)
logger.exception('Could not head object at path %s: %s', path, ce)
return None
except Exception as ex:
logger.exception('Could not head object at path %s: %s', path, ex)
return None
@lru_cache(maxsize=1)
def _get_root_storage_url(self):
@ -157,7 +152,8 @@ class SwiftStorage(BaseStorage):
storage_url, _ = self._get_connection().get_auth()
return storage_url
def get_direct_download_url(self, object_path, request_ip=None, expires_in=60, requires_cors=False, head=False):
def get_direct_download_url(self, object_path, request_ip=None, expires_in=60,
requires_cors=False, head=False):
if requires_cors:
return None
@ -230,13 +226,56 @@ class SwiftStorage(BaseStorage):
return bool(self._head_object(path))
def remove(self, path):
# Retrieve the object so we can see if it is segmented. If so, we'll delete its segments after
# removing the object.
try:
headers = self._head_object(path)
except ClientException as ex:
logger.warning('Could not head for delete of path %s: %s', path, str(ex))
raise IOError('Cannot delete path: %s' % path)
logger.debug('Found headers for path %s to delete: %s', path, headers)
# Delete the path itself.
path = self._normalize_path(path)
try:
self._get_connection().delete_object(self._swift_container, path)
except Exception as ex:
except ClientException as ex:
logger.warning('Could not delete path %s: %s', path, str(ex))
raise IOError('Cannot delete path: %s' % path)
# Delete the segments.
object_manifest = headers.get('x-object-manifest', headers.get('X-Object-Manifest'))
if object_manifest is not None:
logger.debug('Found DLO for path %s: %s', path, object_manifest)
# Remove the container name from the beginning.
container_name, prefix_path = object_manifest.split('/', 1)
if container_name != self._swift_container:
logger.error('Expected container name %s, found path %s', self._swift_container,
prefix_path)
raise Exception("How did we end up with an invalid container name?")
logger.debug('Loading Dynamic Large Object segments for path prefix %s', prefix_path)
try:
_, container_objects = self._get_connection().get_container(self._swift_container,
full_listing=True,
prefix=prefix_path)
except ClientException as ex:
logger.warning('Could not load objects with prefix path %s: %s', prefix_path, str(ex))
raise IOError('Cannot load path: %s' % prefix_path)
logger.debug('Found Dynamic Large Object segments for path prefix %s: %s', prefix_path,
len(container_objects))
for obj in container_objects:
try:
logger.debug('Deleting Dynamic Large Object segment %s for path prefix %s', obj['name'],
prefix_path)
self._get_connection().delete_object(self._swift_container, obj['name'])
except ClientException as ex:
logger.warning('Could not delete object with path %s: %s', obj['name'], str(ex))
raise IOError('Cannot delete path: %s' % obj['name'])
def _random_checksum(self, count):
chars = string.ascii_uppercase + string.digits
return ''.join(SystemRandom().choice(chars) for _ in range(count))
@ -346,11 +385,13 @@ class SwiftStorage(BaseStorage):
continue
# Queue the chunk to be deleted, as it is empty and therefore unused.
self._context.chunk_cleanup_queue.put(['segment/%s/%s' % (self._context.location, uuid)], json.dumps({
'location': self._context.location,
'uuid': uuid,
'path': segment.path,
}), available_after=_CHUNK_CLEANUP_DELAY)
self._context.chunk_cleanup_queue.put(
['segment/%s/%s' % (self._context.location, uuid)],
json.dumps({
'location': self._context.location,
'uuid': uuid,
'path': segment.path,
}), available_after=_CHUNK_CLEANUP_DELAY)
# Finally, we write an empty file at the proper location with a X-Object-Manifest
# header pointing to the prefix for the segments.
@ -373,18 +414,19 @@ class SwiftStorage(BaseStorage):
self._swift_password == destination._swift_password and
self._auth_url == destination._auth_url and
self._auth_version == destination._auth_version):
logger.debug('Copying file from swift %s to swift %s via a Swift copy',
self._swift_container, destination)
logger.debug('Copying file from swift %s to swift %s via a Swift copy',
self._swift_container, destination)
normalized_path = self._normalize_path(path)
target = '/%s/%s' % (destination._swift_container, normalized_path)
normalized_path = self._normalize_path(path)
target = '/%s/%s' % (destination._swift_container, normalized_path)
try:
self._get_connection().copy_object(self._swift_container, normalized_path, target)
except Exception as ex:
logger.exception('Could not swift copy path %s: %s', path, ex)
raise IOError('Failed to swift copy path %s' % path)
return
try:
self._get_connection().copy_object(self._swift_container, normalized_path, target)
except ClientException as ex:
logger.exception('Could not swift copy path %s: %s', path, ex)
raise IOError('Failed to swift copy path %s' % path)
return
# Fallback to a slower, default copy.
logger.debug('Copying file from swift %s to %s via a streamed copy', self._swift_container,

View file

@ -50,13 +50,24 @@ class FakeSwift(object):
return 'http://fake/swift', None
def head_object(self, container, path):
return self.containers[container].get(path)
return self.containers.get(container, {}).get(path, {}).get('headers', None)
def copy_object(self, container, path, target):
pieces = target.split('/', 2)
_, content = self.get_object(container, path)
self.put_object(pieces[1], pieces[2], content)
def get_container(self, container, prefix=None, full_listing=None):
container_entries = self.containers[container]
objs = []
for path, data in list(container_entries.iteritems()):
if not prefix or path.startswith(prefix):
objs.append({
'name': path,
'bytes': len(data['content']),
})
return {}, objs
def put_object(self, container, path, content, chunk_size=None, content_type=None, headers=None):
if not isinstance(content, str):
if hasattr(content, 'read'):
@ -68,7 +79,7 @@ class FakeSwift(object):
'content': content,
'chunk_size': chunk_size,
'content_type': content_type,
'headers': headers or {},
'headers': headers or {'is': True},
}
digest = hashlib.md5()
@ -179,7 +190,7 @@ def test_copy_to():
assert another_swift.exists('somepath')
assert swift.get_content('somepath') == 'some content here'
assert another_swift.get_content('somepath') == 'some content here'
assert another_swift.get_content('somepath') == 'some content here'
def test_copy_to_different():
swift = FakeSwiftStorage(**base_args)
@ -197,7 +208,7 @@ def test_copy_to_different():
assert another_swift.exists('somepath')
assert swift.get_content('somepath') == 'some content here'
assert another_swift.get_content('somepath') == 'some content here'
assert another_swift.get_content('somepath') == 'some content here'
def test_checksum():
swift = FakeSwiftStorage(**base_args)
@ -238,6 +249,18 @@ def test_chunked_upload(chunks, max_chunk_size, read_until_end):
swift.complete_chunked_upload(uuid, 'somepath', metadata)
assert swift.get_content('somepath') == ''.join(chunks)
# Ensure each of the segments exist.
for segment in metadata['segments']:
assert swift.exists(segment.path)
# Delete the file and ensure all of its segments were removed.
swift.remove('somepath')
assert not swift.exists('somepath')
for segment in metadata['segments']:
assert not swift.exists(segment.path)
def test_cancel_chunked_upload():
swift = FakeSwiftStorage(**base_args)
uuid, metadata = swift.initiate_chunked_upload()

View file

@ -20,6 +20,10 @@ class ChunkCleanupWorker(QueueWorker):
storage_location = job_details['location']
storage_path = job_details['path']
if not storage.exists([storage_location], storage_path):
logger.debug('Chunk already deleted')
return
try:
storage.remove([storage_location], storage_path)
except IOError: