Merge pull request #2105 from coreos-inc/frack-swift

Fix swift exception reporting on deletion and add async chunk cleanup
This commit is contained in:
josephschorr 2016-11-15 17:59:48 -05:00 committed by GitHub
commit 1346b7fb63
12 changed files with 196 additions and 60 deletions

10
app.py
View file

@ -172,13 +172,17 @@ app.url_map.converters['apirepopath'] = APIRepositoryPathConverter
Principal(app, use_sessions=False)
tf = app.config['DB_TRANSACTION_FACTORY']
chunk_cleanup_queue = WorkQueue(app.config['CHUNK_CLEANUP_QUEUE_NAME'], tf)
avatar = Avatar(app)
login_manager = LoginManager(app)
mail = Mail(app)
prometheus = PrometheusPlugin(app)
metric_queue = MetricQueue(prometheus)
instance_keys = InstanceKeys(app)
storage = Storage(app, metric_queue, instance_keys)
storage = Storage(app, metric_queue, chunk_cleanup_queue, instance_keys)
userfiles = Userfiles(app, storage)
log_archive = LogArchive(app, storage)
analytics = Analytics(app)
@ -198,8 +202,6 @@ license_validator.start()
start_cloudwatch_sender(metric_queue, app)
tf = app.config['DB_TRANSACTION_FACTORY']
github_login = GithubOAuthConfig(app.config, 'GITHUB_LOGIN_CONFIG')
github_trigger = GithubOAuthConfig(app.config, 'GITHUB_TRIGGER_CONFIG')
gitlab_trigger = GitLabOAuthConfig(app.config, 'GITLAB_TRIGGER_CONFIG')
@ -217,7 +219,7 @@ secscan_notification_queue = WorkQueue(app.config['SECSCAN_NOTIFICATION_QUEUE_NA
has_namespace=False)
all_queues = [image_replication_queue, dockerfile_build_queue, notification_queue,
secscan_notification_queue]
secscan_notification_queue, chunk_cleanup_queue]
secscan_api = SecurityScannerAPI(app, app.config, storage)

View file

@ -0,0 +1,7 @@
#!/bin/sh
# Ensure dependencies start before the logger
sv check syslog-ng > /dev/null || exit 1
# Start the logger
exec logger -i -t chunkcleanupworker

View file

@ -0,0 +1,8 @@
#! /bin/bash
echo 'Starting chunk cleanup worker'
cd /
venv/bin/python -m workers.chunkcleanupworker 2>&1
echo 'Chunk cleanup worker exited'

View file

@ -137,6 +137,7 @@ class DefaultConfig(object):
DOCKERFILE_BUILD_QUEUE_NAME = 'dockerfilebuild'
REPLICATION_QUEUE_NAME = 'imagestoragereplication'
SECSCAN_NOTIFICATION_QUEUE_NAME = 'security_notification'
CHUNK_CLEANUP_QUEUE_NAME = 'chunk_cleanup'
# Super user config. Note: This MUST BE an empty list for the default config.
SUPER_USERS = []

View file

@ -15,27 +15,36 @@ STORAGE_DRIVER_CLASSES = {
'SwiftStorage': SwiftStorage,
}
def get_storage_driver(metric_queue, storage_params):
def get_storage_driver(location, metric_queue, chunk_cleanup_queue, storage_params):
""" Returns a storage driver class for the given storage configuration
(a pair of string name and a dict of parameters). """
driver = storage_params[0]
parameters = storage_params[1]
driver_class = STORAGE_DRIVER_CLASSES.get(driver, FakeStorage)
return driver_class(metric_queue, **parameters)
context = StorageContext(location, metric_queue, chunk_cleanup_queue)
return driver_class(context, **parameters)
class StorageContext(object):
def __init__(self, location, metric_queue, chunk_cleanup_queue):
self.location = location
self.metric_queue = metric_queue
self.chunk_cleanup_queue = chunk_cleanup_queue
class Storage(object):
def __init__(self, app=None, metric_queue=None, instance_keys=None):
def __init__(self, app=None, metric_queue=None, chunk_cleanup_queue=None, instance_keys=None):
self.app = app
if app is not None:
self.state = self.init_app(app, metric_queue, instance_keys)
self.state = self.init_app(app, metric_queue, chunk_cleanup_queue, instance_keys)
else:
self.state = None
def init_app(self, app, metric_queue, instance_keys):
def init_app(self, app, metric_queue, chunk_cleanup_queue, instance_keys):
storages = {}
for location, storage_params in app.config.get('DISTRIBUTED_STORAGE_CONFIG').items():
storages[location] = get_storage_driver(metric_queue, storage_params)
storages[location] = get_storage_driver(location, metric_queue, chunk_cleanup_queue,
storage_params)
preference = app.config.get('DISTRIBUTED_STORAGE_PREFERENCE', None)
if not preference:

View file

@ -49,7 +49,7 @@ class StreamReadKeyAsFile(BufferedIOBase):
class _CloudStorage(BaseStorageV2):
def __init__(self, metric_queue, connection_class, key_class, connect_kwargs, upload_params,
def __init__(self, context, connection_class, key_class, connect_kwargs, upload_params,
storage_path, bucket_name, access_key=None, secret_key=None):
super(_CloudStorage, self).__init__()
@ -67,7 +67,7 @@ class _CloudStorage(BaseStorageV2):
self._connect_kwargs = connect_kwargs
self._cloud_conn = None
self._cloud_bucket = None
self._metric_queue = metric_queue
self._context = context
def _initialize_cloud_conn(self):
if not self._initialized:
@ -166,9 +166,9 @@ class _CloudStorage(BaseStorageV2):
if content_encoding is not None:
metadata['Content-Encoding'] = content_encoding
if self._metric_queue is not None:
self._metric_queue.put_deprecated('MultipartUploadStart', 1)
self._metric_queue.multipart_upload_start.Inc()
if self._context.metric_queue is not None:
self._context.metric_queue.put_deprecated('MultipartUploadStart', 1)
self._context.metric_queue.multipart_upload_start.Inc()
return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
**self._upload_params)
@ -207,9 +207,9 @@ class _CloudStorage(BaseStorageV2):
logger.warn('Error when writing to stream in stream_write_internal at path %s: %s', path, e)
write_error = e
if self._metric_queue is not None:
self._metric_queue.put_deprecated('MultipartUploadFailure', 1)
self._metric_queue.multipart_upload_end.Inc(labelvalues=['failure'])
if self._context.metric_queue is not None:
self._context.metric_queue.put_deprecated('MultipartUploadFailure', 1)
self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['failure'])
if cancel_on_error:
mp.cancel_upload()
@ -218,9 +218,9 @@ class _CloudStorage(BaseStorageV2):
break
if total_bytes_written > 0:
if self._metric_queue is not None:
self._metric_queue.put_deprecated('MultipartUploadSuccess', 1)
self._metric_queue.multipart_upload_end.Inc(labelvalues=['success'])
if self._context.metric_queue is not None:
self._context.metric_queue.put_deprecated('MultipartUploadSuccess', 1)
self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['success'])
self._perform_action_with_retry(mp.complete_upload)
@ -436,8 +436,8 @@ class _CloudStorage(BaseStorageV2):
class S3Storage(_CloudStorage):
def __init__(self, metric_queue, storage_path, s3_bucket, s3_access_key=None, s3_secret_key=None,
host=None):
def __init__(self, context, storage_path, s3_bucket, s3_access_key=None,
s3_secret_key=None, host=None):
upload_params = {
'encrypt_key': True,
}
@ -447,7 +447,7 @@ class S3Storage(_CloudStorage):
raise ValueError('host name must not start with http:// or https://')
connect_kwargs['host'] = host
super(S3Storage, self).__init__(metric_queue, boto.s3.connection.S3Connection, boto.s3.key.Key,
super(S3Storage, self).__init__(context, boto.s3.connection.S3Connection, boto.s3.key.Key,
connect_kwargs, upload_params, storage_path, s3_bucket,
access_key=s3_access_key or None,
secret_key=s3_secret_key or None)
@ -474,10 +474,10 @@ class S3Storage(_CloudStorage):
</CORSConfiguration>""")
class GoogleCloudStorage(_CloudStorage):
def __init__(self, metric_queue, storage_path, access_key, secret_key, bucket_name):
def __init__(self, context, storage_path, access_key, secret_key, bucket_name):
upload_params = {}
connect_kwargs = {}
super(GoogleCloudStorage, self).__init__(metric_queue, boto.gs.connection.GSConnection,
super(GoogleCloudStorage, self).__init__(context, boto.gs.connection.GSConnection,
boto.gs.key.Key, connect_kwargs, upload_params,
storage_path, bucket_name, access_key, secret_key)
@ -534,7 +534,7 @@ class GoogleCloudStorage(_CloudStorage):
class RadosGWStorage(_CloudStorage):
def __init__(self, metric_queue, hostname, is_secure, storage_path, access_key, secret_key,
def __init__(self, context, hostname, is_secure, storage_path, access_key, secret_key,
bucket_name):
upload_params = {}
connect_kwargs = {
@ -543,7 +543,7 @@ class RadosGWStorage(_CloudStorage):
'calling_format': boto.s3.connection.OrdinaryCallingFormat(),
}
super(RadosGWStorage, self).__init__(metric_queue, boto.s3.connection.S3Connection,
super(RadosGWStorage, self).__init__(context, boto.s3.connection.S3Connection,
boto.s3.key.Key, connect_kwargs, upload_params,
storage_path, bucket_name, access_key, secret_key)

View file

@ -9,7 +9,7 @@ from storage.basestorage import BaseStorageV2
_FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO)
class FakeStorage(BaseStorageV2):
def __init__(self, metric_queue):
def __init__(self, context):
super(FakeStorage, self).__init__()
def _init_path(self, path=None, create=False):

View file

@ -14,7 +14,7 @@ logger = logging.getLogger(__name__)
class LocalStorage(BaseStorageV2):
def __init__(self, metric_queue, storage_path):
def __init__(self, context, storage_path):
super(LocalStorage, self).__init__()
self._root_path = storage_path

View file

@ -14,7 +14,7 @@ from swiftclient.client import Connection, ClientException
from urlparse import urlparse
from random import SystemRandom
from hashlib import sha1
from time import time, sleep
from time import time
from collections import namedtuple
from util.registry import filelike
@ -26,17 +26,20 @@ logger = logging.getLogger(__name__)
_PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length'])
_SEGMENTS_KEY = 'segments'
_EMPTY_SEGMENTS_KEY = 'emptysegments'
_SEGMENT_DIRECTORY = 'segments'
_MAXIMUM_SEGMENT_SIZE = 200000000 # ~200 MB
_DEFAULT_SWIFT_CONNECT_TIMEOUT = 5 # seconds
_CHUNK_CLEANUP_DELAY = 30 # seconds
class SwiftStorage(BaseStorage):
def __init__(self, metric_queue, swift_container, storage_path, auth_url, swift_user,
swift_password, auth_version=None, os_options=None, ca_cert_path=None,
temp_url_key=None, simple_path_concat=False, connect_timeout=None,
retry_count=None, retry_on_ratelimit=True):
def __init__(self, context, swift_container, storage_path, auth_url, swift_user, swift_password,
auth_version=None, os_options=None, ca_cert_path=None, temp_url_key=None,
simple_path_concat=False, connect_timeout=None, retry_count=None,
retry_on_ratelimit=True):
super(SwiftStorage, self).__init__()
self._swift_container = swift_container
self._context = context
self._storage_path = storage_path.lstrip('/')
self._simple_path_concat = simple_path_concat
@ -205,8 +208,8 @@ class SwiftStorage(BaseStorage):
path = self._normalize_path(path)
try:
self._get_connection().delete_object(self._swift_container, path)
except Exception:
logger.exception('Could not delete path %s', path)
except Exception as ex:
logger.warning('Could not delete path %s: %s', path, str(ex))
raise IOError('Cannot delete path: %s' % path)
def _random_checksum(self, count):
@ -221,14 +224,15 @@ class SwiftStorage(BaseStorage):
return headers.get('etag', '')[1:-1][:7] or self._random_checksum(7)
@staticmethod
def _segment_list_from_metadata(storage_metadata):
return [_PartUploadMetadata(*segment_args) for segment_args in storage_metadata[_SEGMENTS_KEY]]
def _segment_list_from_metadata(storage_metadata, key=_SEGMENTS_KEY):
return [_PartUploadMetadata(*segment_args) for segment_args in storage_metadata[key]]
def initiate_chunked_upload(self):
random_uuid = str(uuid4())
metadata = {
_SEGMENTS_KEY: [],
_EMPTY_SEGMENTS_KEY: [],
}
return random_uuid, metadata
@ -293,18 +297,8 @@ class SwiftStorage(BaseStorage):
updated_metadata[_SEGMENTS_KEY].append(_PartUploadMetadata(segment_path, offset,
bytes_written))
else:
# Try to delete the empty segment, as it is not needed. This will occasionally fail
# due to Swift's eventual consistency, so we retry a few times and then just leave it be.
for remaining_retries in range(2, -1, -1):
try:
self.remove(segment_path)
except IOError:
if remaining_retries:
sleep(0.25)
continue
# Otherwise, ignore it.
break
updated_metadata[_EMPTY_SEGMENTS_KEY].append(_PartUploadMetadata(segment_path, offset,
bytes_written))
return bytes_written, updated_metadata
@ -312,6 +306,26 @@ class SwiftStorage(BaseStorage):
""" Complete the chunked upload and store the final results in the path indicated.
Returns nothing.
"""
# Check all potentially empty segments against the segments that were uploaded; if the path
# is still empty, then we queue the segment to be deleted.
if self._context.chunk_cleanup_queue is not None:
nonempty_segments = SwiftStorage._segment_list_from_metadata(storage_metadata,
key=_SEGMENTS_KEY)
potentially_empty_segments = SwiftStorage._segment_list_from_metadata(storage_metadata,
key=_EMPTY_SEGMENTS_KEY)
nonempty_paths = set([segment.path for segment in nonempty_segments])
for segment in potentially_empty_segments:
if segment.path in nonempty_paths:
continue
# Queue the chunk to be deleted, as it is empty and therefore unused.
self._context.chunk_cleanup_queue.put(['segment/%s/%s' % (self._context.location, uuid)], {
'location': self._context.location,
'uuid': uuid,
'path': segment.path,
}, available_after=_CHUNK_CLEANUP_DELAY)
# Finally, we write an empty file at the proper location with a X-Object-Manifest
# header pointing to the prefix for the segments.
segments_prefix_path = self._normalize_path('%s/%s' % (_SEGMENT_DIRECTORY, uuid))
@ -324,5 +338,5 @@ class SwiftStorage(BaseStorage):
Returns nothing.
"""
# Delete all the uploaded segments.
for segment in SwiftStorage._segment_list_from_metadata(storage_metadata):
for segment in SwiftStorage._segment_list_from_metadata(storage_metadata, key=_SEGMENTS_KEY):
self.remove(segment.path)

View file

@ -3,7 +3,7 @@ import moto
import boto
import os
from storage import S3Storage
from storage import S3Storage, StorageContext
from storage.cloud import _CloudStorage, _PartUploadMetadata
from storage.cloud import _CHUNKS_KEY
from StringIO import StringIO
@ -13,6 +13,7 @@ _TEST_BUCKET = 'some_bucket'
_TEST_USER = 'someuser'
_TEST_PASSWORD = 'somepassword'
_TEST_PATH = 'some/cool/path'
_TEST_CONTEXT = StorageContext('nyc', None, None)
class TestCloudStorage(unittest.TestCase):
def setUp(self):
@ -21,7 +22,7 @@ class TestCloudStorage(unittest.TestCase):
# Create a test bucket and put some test content.
boto.connect_s3().create_bucket(_TEST_BUCKET)
self.engine = S3Storage(None, 'some/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD)
self.engine = S3Storage(_TEST_CONTEXT, 'some/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD)
self.engine.put_content(_TEST_PATH, _TEST_CONTENT)
def tearDown(self):
@ -51,7 +52,8 @@ class TestCloudStorage(unittest.TestCase):
def test_copy_samecreds(self):
# Copy the content to another engine.
another_engine = S3Storage(None, 'another/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD)
another_engine = S3Storage(_TEST_CONTEXT, 'another/path', _TEST_BUCKET, _TEST_USER,
_TEST_PASSWORD)
self.engine.copy_to(another_engine, _TEST_PATH)
# Verify it can be retrieved.
@ -59,7 +61,8 @@ class TestCloudStorage(unittest.TestCase):
def test_copy_differentcreds(self):
# Copy the content to another engine.
another_engine = S3Storage(None, 'another/path', 'another_bucket', 'blech', 'password')
another_engine = S3Storage(_TEST_CONTEXT, 'another/path', 'another_bucket', 'blech',
'password')
boto.connect_s3().create_bucket('another_bucket')
self.engine.copy_to(another_engine, _TEST_PATH)

View file

@ -2,10 +2,11 @@ import io
import unittest
from collections import defaultdict
from storage.swift import SwiftStorage
from mock import MagicMock
from storage import StorageContext
from storage.swift import SwiftStorage
class MockSwiftStorage(SwiftStorage):
def __init__(self, *args, **kwargs):
@ -18,7 +19,7 @@ class MockSwiftStorage(SwiftStorage):
class MockSwiftTests(unittest.TestCase):
base_args = {
'metric_queue': None,
'context': StorageContext('nyc', None, None),
'swift_container': 'container-name',
'storage_path': '/basepath',
'auth_url': 'https://auth.com',
@ -39,6 +40,11 @@ class MockSwiftTests(unittest.TestCase):
swift.exists('object/path')
swift._get_connection().head_object.assert_called_with('container-name', 'basepathobject/path')
def test_delete_unknown_path(self):
swift = SwiftStorage(**self.base_args)
with self.assertRaises(IOError):
swift.remove('someunknownpath')
class FakeSwiftStorage(SwiftStorage):
def __init__(self, *args, **kwargs):
@ -88,9 +94,26 @@ class FakeSwift(object):
self.containers[container].pop(path, None)
class FakeQueue(object):
def __init__(self):
self.items = []
def get(self):
if not self.items:
return None
return self.items.pop()
def put(self, names, item, available_after=0):
self.items.append({
'names': names,
'item': item,
'available_after': available_after,
})
class FakeSwiftTests(unittest.TestCase):
base_args = {
'metric_queue': None,
'context': StorageContext('nyc', None, None),
'swift_container': 'container-name',
'storage_path': '/basepath',
'auth_url': 'https://auth.com',
@ -165,6 +188,36 @@ class FakeSwiftTests(unittest.TestCase):
for segment in SwiftStorage._segment_list_from_metadata(metadata):
self.assertFalse(swift.exists(segment.path))
def test_empty_chunks_queued_for_deletion(self):
chunk_cleanup_queue = FakeQueue()
args = dict(self.base_args)
args['context'] = StorageContext('nyc', None, chunk_cleanup_queue)
swift = FakeSwiftStorage(**args)
uuid, metadata = swift.initiate_chunked_upload()
chunks = ['this', '', 'is', 'some', '', 'chunked', 'data', '']
offset = 0
for chunk in chunks:
length = len(chunk)
if length == 0:
length = 1
bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, length,
io.BytesIO(chunk), metadata)
self.assertIsNone(error)
self.assertEquals(bytes_written, len(chunk))
offset += len(chunk)
swift.complete_chunked_upload(uuid, 'somepath', metadata)
self.assertEquals(''.join(chunks), swift.get_content('somepath'))
# Check the chunk deletion queue and ensure we have the last chunk queued.
found = chunk_cleanup_queue.get()
self.assertIsNotNone(found)
found2 = chunk_cleanup_queue.get()
self.assertIsNone(found2)
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,39 @@
import logging
import time
from app import app, storage, chunk_cleanup_queue
from workers.queueworker import QueueWorker, JobException
logger = logging.getLogger(__name__)
POLL_PERIOD_SECONDS = 10
class ChunkCleanupWorker(QueueWorker):
""" Worker which cleans up chunks enqueued by the storage engine(s). This is typically used to
cleanup empty chunks which are no longer needed.
"""
def process_queue_item(self, job_details):
logger.debug('Got chunk cleanup queue item: %s', job_details)
storage_location = job_details['location']
storage_path = job_details['path']
try:
storage.remove([storage_location], storage_path)
except IOError:
raise JobException()
if __name__ == "__main__":
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
engines = set([config[0] for config in app.config.get('DISTRIBUTED_STORAGE_CONFIG', {}).values()])
if 'SwiftStorage' not in engines:
logger.debug('Swift storage not detected; sleeping')
while True:
time.sleep(10000)
logger.debug('Starting chunk cleanup worker')
worker = ChunkCleanupWorker(chunk_cleanup_queue, poll_period_seconds=POLL_PERIOD_SECONDS)
worker.start()