Add a chunk cleanup queue for async GC of empty chunks

Instead of having the Swift storage engine try to delete the empty chunk(s) synchronously, we simply queue them and have a worker come along after 30s to delete the empty chunks. This has a few key benefits: it is async (doesn't slow down the push code), helps deal with Swift's eventual consistency (less retries necessary) and is generic for other storage engines if/when they need this as well
This commit is contained in:
Joseph Schorr 2016-11-10 13:54:04 -05:00
parent 59cb6bd216
commit 5f99448adc
12 changed files with 191 additions and 59 deletions

View file

@ -15,27 +15,36 @@ STORAGE_DRIVER_CLASSES = {
'SwiftStorage': SwiftStorage,
}
def get_storage_driver(metric_queue, storage_params):
def get_storage_driver(location, metric_queue, chunk_cleanup_queue, storage_params):
""" Returns a storage driver class for the given storage configuration
(a pair of string name and a dict of parameters). """
driver = storage_params[0]
parameters = storage_params[1]
driver_class = STORAGE_DRIVER_CLASSES.get(driver, FakeStorage)
return driver_class(metric_queue, **parameters)
context = StorageContext(location, metric_queue, chunk_cleanup_queue)
return driver_class(context, **parameters)
class StorageContext(object):
def __init__(self, location, metric_queue, chunk_cleanup_queue):
self.location = location
self.metric_queue = metric_queue
self.chunk_cleanup_queue = chunk_cleanup_queue
class Storage(object):
def __init__(self, app=None, metric_queue=None, instance_keys=None):
def __init__(self, app=None, metric_queue=None, chunk_cleanup_queue=None, instance_keys=None):
self.app = app
if app is not None:
self.state = self.init_app(app, metric_queue, instance_keys)
self.state = self.init_app(app, metric_queue, chunk_cleanup_queue, instance_keys)
else:
self.state = None
def init_app(self, app, metric_queue, instance_keys):
def init_app(self, app, metric_queue, chunk_cleanup_queue, instance_keys):
storages = {}
for location, storage_params in app.config.get('DISTRIBUTED_STORAGE_CONFIG').items():
storages[location] = get_storage_driver(metric_queue, storage_params)
storages[location] = get_storage_driver(location, metric_queue, chunk_cleanup_queue,
storage_params)
preference = app.config.get('DISTRIBUTED_STORAGE_PREFERENCE', None)
if not preference:

View file

@ -49,7 +49,7 @@ class StreamReadKeyAsFile(BufferedIOBase):
class _CloudStorage(BaseStorageV2):
def __init__(self, metric_queue, connection_class, key_class, connect_kwargs, upload_params,
def __init__(self, context, connection_class, key_class, connect_kwargs, upload_params,
storage_path, bucket_name, access_key=None, secret_key=None):
super(_CloudStorage, self).__init__()
@ -67,7 +67,7 @@ class _CloudStorage(BaseStorageV2):
self._connect_kwargs = connect_kwargs
self._cloud_conn = None
self._cloud_bucket = None
self._metric_queue = metric_queue
self._context = context
def _initialize_cloud_conn(self):
if not self._initialized:
@ -166,9 +166,9 @@ class _CloudStorage(BaseStorageV2):
if content_encoding is not None:
metadata['Content-Encoding'] = content_encoding
if self._metric_queue is not None:
self._metric_queue.put_deprecated('MultipartUploadStart', 1)
self._metric_queue.multipart_upload_start.Inc()
if self._context.metric_queue is not None:
self._context.metric_queue.put_deprecated('MultipartUploadStart', 1)
self._context.metric_queue.multipart_upload_start.Inc()
return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
**self._upload_params)
@ -207,9 +207,9 @@ class _CloudStorage(BaseStorageV2):
logger.warn('Error when writing to stream in stream_write_internal at path %s: %s', path, e)
write_error = e
if self._metric_queue is not None:
self._metric_queue.put_deprecated('MultipartUploadFailure', 1)
self._metric_queue.multipart_upload_end.Inc(labelvalues=['failure'])
if self._context.metric_queue is not None:
self._context.metric_queue.put_deprecated('MultipartUploadFailure', 1)
self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['failure'])
if cancel_on_error:
mp.cancel_upload()
@ -218,9 +218,9 @@ class _CloudStorage(BaseStorageV2):
break
if total_bytes_written > 0:
if self._metric_queue is not None:
self._metric_queue.put_deprecated('MultipartUploadSuccess', 1)
self._metric_queue.multipart_upload_end.Inc(labelvalues=['success'])
if self._context.metric_queue is not None:
self._context.metric_queue.put_deprecated('MultipartUploadSuccess', 1)
self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['success'])
self._perform_action_with_retry(mp.complete_upload)
@ -436,8 +436,8 @@ class _CloudStorage(BaseStorageV2):
class S3Storage(_CloudStorage):
def __init__(self, metric_queue, storage_path, s3_bucket, s3_access_key=None, s3_secret_key=None,
host=None):
def __init__(self, context, storage_path, s3_bucket, s3_access_key=None,
s3_secret_key=None, host=None):
upload_params = {
'encrypt_key': True,
}
@ -447,7 +447,7 @@ class S3Storage(_CloudStorage):
raise ValueError('host name must not start with http:// or https://')
connect_kwargs['host'] = host
super(S3Storage, self).__init__(metric_queue, boto.s3.connection.S3Connection, boto.s3.key.Key,
super(S3Storage, self).__init__(context, boto.s3.connection.S3Connection, boto.s3.key.Key,
connect_kwargs, upload_params, storage_path, s3_bucket,
access_key=s3_access_key or None,
secret_key=s3_secret_key or None)
@ -474,10 +474,10 @@ class S3Storage(_CloudStorage):
</CORSConfiguration>""")
class GoogleCloudStorage(_CloudStorage):
def __init__(self, metric_queue, storage_path, access_key, secret_key, bucket_name):
def __init__(self, context, storage_path, access_key, secret_key, bucket_name):
upload_params = {}
connect_kwargs = {}
super(GoogleCloudStorage, self).__init__(metric_queue, boto.gs.connection.GSConnection,
super(GoogleCloudStorage, self).__init__(context, boto.gs.connection.GSConnection,
boto.gs.key.Key, connect_kwargs, upload_params,
storage_path, bucket_name, access_key, secret_key)
@ -534,7 +534,7 @@ class GoogleCloudStorage(_CloudStorage):
class RadosGWStorage(_CloudStorage):
def __init__(self, metric_queue, hostname, is_secure, storage_path, access_key, secret_key,
def __init__(self, context, hostname, is_secure, storage_path, access_key, secret_key,
bucket_name):
upload_params = {}
connect_kwargs = {
@ -543,7 +543,7 @@ class RadosGWStorage(_CloudStorage):
'calling_format': boto.s3.connection.OrdinaryCallingFormat(),
}
super(RadosGWStorage, self).__init__(metric_queue, boto.s3.connection.S3Connection,
super(RadosGWStorage, self).__init__(context, boto.s3.connection.S3Connection,
boto.s3.key.Key, connect_kwargs, upload_params,
storage_path, bucket_name, access_key, secret_key)

View file

@ -9,7 +9,7 @@ from storage.basestorage import BaseStorageV2
_FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO)
class FakeStorage(BaseStorageV2):
def __init__(self, metric_queue):
def __init__(self, context):
super(FakeStorage, self).__init__()
def _init_path(self, path=None, create=False):

View file

@ -14,7 +14,7 @@ logger = logging.getLogger(__name__)
class LocalStorage(BaseStorageV2):
def __init__(self, metric_queue, storage_path):
def __init__(self, context, storage_path):
super(LocalStorage, self).__init__()
self._root_path = storage_path

View file

@ -14,7 +14,7 @@ from swiftclient.client import Connection, ClientException
from urlparse import urlparse
from random import SystemRandom
from hashlib import sha1
from time import time, sleep
from time import time
from collections import namedtuple
from util.registry import filelike
@ -26,17 +26,20 @@ logger = logging.getLogger(__name__)
_PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length'])
_SEGMENTS_KEY = 'segments'
_EMPTY_SEGMENTS_KEY = 'emptysegments'
_SEGMENT_DIRECTORY = 'segments'
_MAXIMUM_SEGMENT_SIZE = 200000000 # ~200 MB
_DEFAULT_SWIFT_CONNECT_TIMEOUT = 5 # seconds
_CHUNK_CLEANUP_DELAY = 30 # seconds
class SwiftStorage(BaseStorage):
def __init__(self, metric_queue, swift_container, storage_path, auth_url, swift_user,
swift_password, auth_version=None, os_options=None, ca_cert_path=None,
temp_url_key=None, simple_path_concat=False, connect_timeout=None,
retry_count=None, retry_on_ratelimit=True):
def __init__(self, context, swift_container, storage_path, auth_url, swift_user, swift_password,
auth_version=None, os_options=None, ca_cert_path=None, temp_url_key=None,
simple_path_concat=False, connect_timeout=None, retry_count=None,
retry_on_ratelimit=True):
super(SwiftStorage, self).__init__()
self._swift_container = swift_container
self._context = context
self._storage_path = storage_path.lstrip('/')
self._simple_path_concat = simple_path_concat
@ -205,7 +208,8 @@ class SwiftStorage(BaseStorage):
path = self._normalize_path(path)
try:
self._get_connection().delete_object(self._swift_container, path)
except Exception:
except Exception as ex:
logger.warning('Could not delete path %s: %s', path, str(ex))
raise IOError('Cannot delete path: %s' % path)
def _random_checksum(self, count):
@ -220,14 +224,15 @@ class SwiftStorage(BaseStorage):
return headers.get('etag', '')[1:-1][:7] or self._random_checksum(7)
@staticmethod
def _segment_list_from_metadata(storage_metadata):
return [_PartUploadMetadata(*segment_args) for segment_args in storage_metadata[_SEGMENTS_KEY]]
def _segment_list_from_metadata(storage_metadata, key=_SEGMENTS_KEY):
return [_PartUploadMetadata(*segment_args) for segment_args in storage_metadata[key]]
def initiate_chunked_upload(self):
random_uuid = str(uuid4())
metadata = {
_SEGMENTS_KEY: [],
_EMPTY_SEGMENTS_KEY: [],
}
return random_uuid, metadata
@ -292,18 +297,8 @@ class SwiftStorage(BaseStorage):
updated_metadata[_SEGMENTS_KEY].append(_PartUploadMetadata(segment_path, offset,
bytes_written))
else:
# Try to delete the empty segment, as it is not needed. This will occasionally fail
# due to Swift's eventual consistency, so we retry a few times and then just leave it be.
for remaining_retries in range(2, -1, -1):
try:
self.remove(segment_path)
except IOError:
if remaining_retries:
sleep(0.25)
continue
# Otherwise, ignore it.
break
updated_metadata[_EMPTY_SEGMENTS_KEY].append(_PartUploadMetadata(segment_path, offset,
bytes_written))
return bytes_written, updated_metadata
@ -311,6 +306,26 @@ class SwiftStorage(BaseStorage):
""" Complete the chunked upload and store the final results in the path indicated.
Returns nothing.
"""
# Check all potentially empty segments against the segments that were uploaded; if the path
# is still empty, then we queue the segment to be deleted.
if self._context.chunk_cleanup_queue is not None:
nonempty_segments = SwiftStorage._segment_list_from_metadata(storage_metadata,
key=_SEGMENTS_KEY)
potentially_empty_segments = SwiftStorage._segment_list_from_metadata(storage_metadata,
key=_EMPTY_SEGMENTS_KEY)
nonempty_paths = set([segment.path for segment in nonempty_segments])
for segment in potentially_empty_segments:
if segment.path in nonempty_paths:
continue
# Queue the chunk to be deleted, as it is empty and therefore unused.
self._context.chunk_cleanup_queue.put(['segment/%s/%s' % (self._context.location, uuid)], {
'location': self._context.location,
'uuid': uuid,
'path': segment.path,
}, available_after=_CHUNK_CLEANUP_DELAY)
# Finally, we write an empty file at the proper location with a X-Object-Manifest
# header pointing to the prefix for the segments.
segments_prefix_path = self._normalize_path('%s/%s' % (_SEGMENT_DIRECTORY, uuid))
@ -323,5 +338,5 @@ class SwiftStorage(BaseStorage):
Returns nothing.
"""
# Delete all the uploaded segments.
for segment in SwiftStorage._segment_list_from_metadata(storage_metadata):
for segment in SwiftStorage._segment_list_from_metadata(storage_metadata, key=_SEGMENTS_KEY):
self.remove(segment.path)