diff --git a/app.py b/app.py index 160a10301..cf4a055cb 100644 --- a/app.py +++ b/app.py @@ -172,13 +172,17 @@ app.url_map.converters['apirepopath'] = APIRepositoryPathConverter Principal(app, use_sessions=False) +tf = app.config['DB_TRANSACTION_FACTORY'] + +chunk_cleanup_queue = WorkQueue(app.config['CHUNK_CLEANUP_QUEUE_NAME'], tf) + avatar = Avatar(app) login_manager = LoginManager(app) mail = Mail(app) prometheus = PrometheusPlugin(app) metric_queue = MetricQueue(prometheus) instance_keys = InstanceKeys(app) -storage = Storage(app, metric_queue, instance_keys) +storage = Storage(app, metric_queue, chunk_cleanup_queue, instance_keys) userfiles = Userfiles(app, storage) log_archive = LogArchive(app, storage) analytics = Analytics(app) @@ -198,8 +202,6 @@ license_validator.start() start_cloudwatch_sender(metric_queue, app) -tf = app.config['DB_TRANSACTION_FACTORY'] - github_login = GithubOAuthConfig(app.config, 'GITHUB_LOGIN_CONFIG') github_trigger = GithubOAuthConfig(app.config, 'GITHUB_TRIGGER_CONFIG') gitlab_trigger = GitLabOAuthConfig(app.config, 'GITLAB_TRIGGER_CONFIG') @@ -217,7 +219,7 @@ secscan_notification_queue = WorkQueue(app.config['SECSCAN_NOTIFICATION_QUEUE_NA has_namespace=False) all_queues = [image_replication_queue, dockerfile_build_queue, notification_queue, - secscan_notification_queue] + secscan_notification_queue, chunk_cleanup_queue] secscan_api = SecurityScannerAPI(app, app.config, storage) diff --git a/conf/init/service/chunkcleanupworker/log/run b/conf/init/service/chunkcleanupworker/log/run new file mode 100755 index 000000000..0f1d084cc --- /dev/null +++ b/conf/init/service/chunkcleanupworker/log/run @@ -0,0 +1,7 @@ +#!/bin/sh + +# Ensure dependencies start before the logger +sv check syslog-ng > /dev/null || exit 1 + +# Start the logger +exec logger -i -t chunkcleanupworker diff --git a/conf/init/service/chunkcleanupworker/run b/conf/init/service/chunkcleanupworker/run new file mode 100755 index 000000000..57c9c5aa8 --- /dev/null +++ b/conf/init/service/chunkcleanupworker/run @@ -0,0 +1,8 @@ +#! /bin/bash + +echo 'Starting chunk cleanup worker' + +cd / +venv/bin/python -m workers.chunkcleanupworker 2>&1 + +echo 'Chunk cleanup worker exited' \ No newline at end of file diff --git a/config.py b/config.py index 351bec63d..42b6aa491 100644 --- a/config.py +++ b/config.py @@ -137,6 +137,7 @@ class DefaultConfig(object): DOCKERFILE_BUILD_QUEUE_NAME = 'dockerfilebuild' REPLICATION_QUEUE_NAME = 'imagestoragereplication' SECSCAN_NOTIFICATION_QUEUE_NAME = 'security_notification' + CHUNK_CLEANUP_QUEUE_NAME = 'chunk_cleanup' # Super user config. Note: This MUST BE an empty list for the default config. SUPER_USERS = [] diff --git a/storage/__init__.py b/storage/__init__.py index fd29b2ac6..09497df99 100644 --- a/storage/__init__.py +++ b/storage/__init__.py @@ -15,27 +15,36 @@ STORAGE_DRIVER_CLASSES = { 'SwiftStorage': SwiftStorage, } -def get_storage_driver(metric_queue, storage_params): +def get_storage_driver(location, metric_queue, chunk_cleanup_queue, storage_params): """ Returns a storage driver class for the given storage configuration (a pair of string name and a dict of parameters). """ driver = storage_params[0] parameters = storage_params[1] driver_class = STORAGE_DRIVER_CLASSES.get(driver, FakeStorage) - return driver_class(metric_queue, **parameters) + context = StorageContext(location, metric_queue, chunk_cleanup_queue) + return driver_class(context, **parameters) + + +class StorageContext(object): + def __init__(self, location, metric_queue, chunk_cleanup_queue): + self.location = location + self.metric_queue = metric_queue + self.chunk_cleanup_queue = chunk_cleanup_queue class Storage(object): - def __init__(self, app=None, metric_queue=None, instance_keys=None): + def __init__(self, app=None, metric_queue=None, chunk_cleanup_queue=None, instance_keys=None): self.app = app if app is not None: - self.state = self.init_app(app, metric_queue, instance_keys) + self.state = self.init_app(app, metric_queue, chunk_cleanup_queue, instance_keys) else: self.state = None - def init_app(self, app, metric_queue, instance_keys): + def init_app(self, app, metric_queue, chunk_cleanup_queue, instance_keys): storages = {} for location, storage_params in app.config.get('DISTRIBUTED_STORAGE_CONFIG').items(): - storages[location] = get_storage_driver(metric_queue, storage_params) + storages[location] = get_storage_driver(location, metric_queue, chunk_cleanup_queue, + storage_params) preference = app.config.get('DISTRIBUTED_STORAGE_PREFERENCE', None) if not preference: diff --git a/storage/cloud.py b/storage/cloud.py index e4646ca5f..c1ca25eb5 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -49,7 +49,7 @@ class StreamReadKeyAsFile(BufferedIOBase): class _CloudStorage(BaseStorageV2): - def __init__(self, metric_queue, connection_class, key_class, connect_kwargs, upload_params, + def __init__(self, context, connection_class, key_class, connect_kwargs, upload_params, storage_path, bucket_name, access_key=None, secret_key=None): super(_CloudStorage, self).__init__() @@ -67,7 +67,7 @@ class _CloudStorage(BaseStorageV2): self._connect_kwargs = connect_kwargs self._cloud_conn = None self._cloud_bucket = None - self._metric_queue = metric_queue + self._context = context def _initialize_cloud_conn(self): if not self._initialized: @@ -166,9 +166,9 @@ class _CloudStorage(BaseStorageV2): if content_encoding is not None: metadata['Content-Encoding'] = content_encoding - if self._metric_queue is not None: - self._metric_queue.put_deprecated('MultipartUploadStart', 1) - self._metric_queue.multipart_upload_start.Inc() + if self._context.metric_queue is not None: + self._context.metric_queue.put_deprecated('MultipartUploadStart', 1) + self._context.metric_queue.multipart_upload_start.Inc() return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata, **self._upload_params) @@ -207,9 +207,9 @@ class _CloudStorage(BaseStorageV2): logger.warn('Error when writing to stream in stream_write_internal at path %s: %s', path, e) write_error = e - if self._metric_queue is not None: - self._metric_queue.put_deprecated('MultipartUploadFailure', 1) - self._metric_queue.multipart_upload_end.Inc(labelvalues=['failure']) + if self._context.metric_queue is not None: + self._context.metric_queue.put_deprecated('MultipartUploadFailure', 1) + self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['failure']) if cancel_on_error: mp.cancel_upload() @@ -218,9 +218,9 @@ class _CloudStorage(BaseStorageV2): break if total_bytes_written > 0: - if self._metric_queue is not None: - self._metric_queue.put_deprecated('MultipartUploadSuccess', 1) - self._metric_queue.multipart_upload_end.Inc(labelvalues=['success']) + if self._context.metric_queue is not None: + self._context.metric_queue.put_deprecated('MultipartUploadSuccess', 1) + self._context.metric_queue.multipart_upload_end.Inc(labelvalues=['success']) self._perform_action_with_retry(mp.complete_upload) @@ -436,8 +436,8 @@ class _CloudStorage(BaseStorageV2): class S3Storage(_CloudStorage): - def __init__(self, metric_queue, storage_path, s3_bucket, s3_access_key=None, s3_secret_key=None, - host=None): + def __init__(self, context, storage_path, s3_bucket, s3_access_key=None, + s3_secret_key=None, host=None): upload_params = { 'encrypt_key': True, } @@ -447,7 +447,7 @@ class S3Storage(_CloudStorage): raise ValueError('host name must not start with http:// or https://') connect_kwargs['host'] = host - super(S3Storage, self).__init__(metric_queue, boto.s3.connection.S3Connection, boto.s3.key.Key, + super(S3Storage, self).__init__(context, boto.s3.connection.S3Connection, boto.s3.key.Key, connect_kwargs, upload_params, storage_path, s3_bucket, access_key=s3_access_key or None, secret_key=s3_secret_key or None) @@ -474,10 +474,10 @@ class S3Storage(_CloudStorage): """) class GoogleCloudStorage(_CloudStorage): - def __init__(self, metric_queue, storage_path, access_key, secret_key, bucket_name): + def __init__(self, context, storage_path, access_key, secret_key, bucket_name): upload_params = {} connect_kwargs = {} - super(GoogleCloudStorage, self).__init__(metric_queue, boto.gs.connection.GSConnection, + super(GoogleCloudStorage, self).__init__(context, boto.gs.connection.GSConnection, boto.gs.key.Key, connect_kwargs, upload_params, storage_path, bucket_name, access_key, secret_key) @@ -534,7 +534,7 @@ class GoogleCloudStorage(_CloudStorage): class RadosGWStorage(_CloudStorage): - def __init__(self, metric_queue, hostname, is_secure, storage_path, access_key, secret_key, + def __init__(self, context, hostname, is_secure, storage_path, access_key, secret_key, bucket_name): upload_params = {} connect_kwargs = { @@ -543,7 +543,7 @@ class RadosGWStorage(_CloudStorage): 'calling_format': boto.s3.connection.OrdinaryCallingFormat(), } - super(RadosGWStorage, self).__init__(metric_queue, boto.s3.connection.S3Connection, + super(RadosGWStorage, self).__init__(context, boto.s3.connection.S3Connection, boto.s3.key.Key, connect_kwargs, upload_params, storage_path, bucket_name, access_key, secret_key) diff --git a/storage/fakestorage.py b/storage/fakestorage.py index 894b6d261..649d4ccab 100644 --- a/storage/fakestorage.py +++ b/storage/fakestorage.py @@ -9,7 +9,7 @@ from storage.basestorage import BaseStorageV2 _FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO) class FakeStorage(BaseStorageV2): - def __init__(self, metric_queue): + def __init__(self, context): super(FakeStorage, self).__init__() def _init_path(self, path=None, create=False): diff --git a/storage/local.py b/storage/local.py index 62f36adc3..d6e2eb5da 100644 --- a/storage/local.py +++ b/storage/local.py @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) class LocalStorage(BaseStorageV2): - def __init__(self, metric_queue, storage_path): + def __init__(self, context, storage_path): super(LocalStorage, self).__init__() self._root_path = storage_path diff --git a/storage/swift.py b/storage/swift.py index d8264b936..b98b3db89 100644 --- a/storage/swift.py +++ b/storage/swift.py @@ -14,7 +14,7 @@ from swiftclient.client import Connection, ClientException from urlparse import urlparse from random import SystemRandom from hashlib import sha1 -from time import time, sleep +from time import time from collections import namedtuple from util.registry import filelike @@ -26,17 +26,20 @@ logger = logging.getLogger(__name__) _PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length']) _SEGMENTS_KEY = 'segments' +_EMPTY_SEGMENTS_KEY = 'emptysegments' _SEGMENT_DIRECTORY = 'segments' _MAXIMUM_SEGMENT_SIZE = 200000000 # ~200 MB _DEFAULT_SWIFT_CONNECT_TIMEOUT = 5 # seconds +_CHUNK_CLEANUP_DELAY = 30 # seconds class SwiftStorage(BaseStorage): - def __init__(self, metric_queue, swift_container, storage_path, auth_url, swift_user, - swift_password, auth_version=None, os_options=None, ca_cert_path=None, - temp_url_key=None, simple_path_concat=False, connect_timeout=None, - retry_count=None, retry_on_ratelimit=True): + def __init__(self, context, swift_container, storage_path, auth_url, swift_user, swift_password, + auth_version=None, os_options=None, ca_cert_path=None, temp_url_key=None, + simple_path_concat=False, connect_timeout=None, retry_count=None, + retry_on_ratelimit=True): super(SwiftStorage, self).__init__() self._swift_container = swift_container + self._context = context self._storage_path = storage_path.lstrip('/') self._simple_path_concat = simple_path_concat @@ -205,7 +208,8 @@ class SwiftStorage(BaseStorage): path = self._normalize_path(path) try: self._get_connection().delete_object(self._swift_container, path) - except Exception: + except Exception as ex: + logger.warning('Could not delete path %s: %s', path, str(ex)) raise IOError('Cannot delete path: %s' % path) def _random_checksum(self, count): @@ -220,14 +224,15 @@ class SwiftStorage(BaseStorage): return headers.get('etag', '')[1:-1][:7] or self._random_checksum(7) @staticmethod - def _segment_list_from_metadata(storage_metadata): - return [_PartUploadMetadata(*segment_args) for segment_args in storage_metadata[_SEGMENTS_KEY]] + def _segment_list_from_metadata(storage_metadata, key=_SEGMENTS_KEY): + return [_PartUploadMetadata(*segment_args) for segment_args in storage_metadata[key]] def initiate_chunked_upload(self): random_uuid = str(uuid4()) metadata = { _SEGMENTS_KEY: [], + _EMPTY_SEGMENTS_KEY: [], } return random_uuid, metadata @@ -292,18 +297,8 @@ class SwiftStorage(BaseStorage): updated_metadata[_SEGMENTS_KEY].append(_PartUploadMetadata(segment_path, offset, bytes_written)) else: - # Try to delete the empty segment, as it is not needed. This will occasionally fail - # due to Swift's eventual consistency, so we retry a few times and then just leave it be. - for remaining_retries in range(2, -1, -1): - try: - self.remove(segment_path) - except IOError: - if remaining_retries: - sleep(0.25) - continue - - # Otherwise, ignore it. - break + updated_metadata[_EMPTY_SEGMENTS_KEY].append(_PartUploadMetadata(segment_path, offset, + bytes_written)) return bytes_written, updated_metadata @@ -311,6 +306,26 @@ class SwiftStorage(BaseStorage): """ Complete the chunked upload and store the final results in the path indicated. Returns nothing. """ + # Check all potentially empty segments against the segments that were uploaded; if the path + # is still empty, then we queue the segment to be deleted. + if self._context.chunk_cleanup_queue is not None: + nonempty_segments = SwiftStorage._segment_list_from_metadata(storage_metadata, + key=_SEGMENTS_KEY) + potentially_empty_segments = SwiftStorage._segment_list_from_metadata(storage_metadata, + key=_EMPTY_SEGMENTS_KEY) + + nonempty_paths = set([segment.path for segment in nonempty_segments]) + for segment in potentially_empty_segments: + if segment.path in nonempty_paths: + continue + + # Queue the chunk to be deleted, as it is empty and therefore unused. + self._context.chunk_cleanup_queue.put(['segment/%s/%s' % (self._context.location, uuid)], { + 'location': self._context.location, + 'uuid': uuid, + 'path': segment.path, + }, available_after=_CHUNK_CLEANUP_DELAY) + # Finally, we write an empty file at the proper location with a X-Object-Manifest # header pointing to the prefix for the segments. segments_prefix_path = self._normalize_path('%s/%s' % (_SEGMENT_DIRECTORY, uuid)) @@ -323,5 +338,5 @@ class SwiftStorage(BaseStorage): Returns nothing. """ # Delete all the uploaded segments. - for segment in SwiftStorage._segment_list_from_metadata(storage_metadata): + for segment in SwiftStorage._segment_list_from_metadata(storage_metadata, key=_SEGMENTS_KEY): self.remove(segment.path) diff --git a/test/test_cloud_storage.py b/test/test_cloud_storage.py index 549207027..50cfa6c1c 100644 --- a/test/test_cloud_storage.py +++ b/test/test_cloud_storage.py @@ -3,7 +3,7 @@ import moto import boto import os -from storage import S3Storage +from storage import S3Storage, StorageContext from storage.cloud import _CloudStorage, _PartUploadMetadata from storage.cloud import _CHUNKS_KEY from StringIO import StringIO @@ -13,6 +13,7 @@ _TEST_BUCKET = 'some_bucket' _TEST_USER = 'someuser' _TEST_PASSWORD = 'somepassword' _TEST_PATH = 'some/cool/path' +_TEST_CONTEXT = StorageContext('nyc', None, None) class TestCloudStorage(unittest.TestCase): def setUp(self): @@ -21,7 +22,7 @@ class TestCloudStorage(unittest.TestCase): # Create a test bucket and put some test content. boto.connect_s3().create_bucket(_TEST_BUCKET) - self.engine = S3Storage(None, 'some/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD) + self.engine = S3Storage(_TEST_CONTEXT, 'some/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD) self.engine.put_content(_TEST_PATH, _TEST_CONTENT) def tearDown(self): @@ -51,7 +52,8 @@ class TestCloudStorage(unittest.TestCase): def test_copy_samecreds(self): # Copy the content to another engine. - another_engine = S3Storage(None, 'another/path', _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD) + another_engine = S3Storage(_TEST_CONTEXT, 'another/path', _TEST_BUCKET, _TEST_USER, + _TEST_PASSWORD) self.engine.copy_to(another_engine, _TEST_PATH) # Verify it can be retrieved. @@ -59,7 +61,8 @@ class TestCloudStorage(unittest.TestCase): def test_copy_differentcreds(self): # Copy the content to another engine. - another_engine = S3Storage(None, 'another/path', 'another_bucket', 'blech', 'password') + another_engine = S3Storage(_TEST_CONTEXT, 'another/path', 'another_bucket', 'blech', + 'password') boto.connect_s3().create_bucket('another_bucket') self.engine.copy_to(another_engine, _TEST_PATH) diff --git a/test/test_swift.py b/test/test_swift.py index 9953a70c7..318f0a1fc 100644 --- a/test/test_swift.py +++ b/test/test_swift.py @@ -2,10 +2,11 @@ import io import unittest from collections import defaultdict - -from storage.swift import SwiftStorage from mock import MagicMock +from storage import StorageContext +from storage.swift import SwiftStorage + class MockSwiftStorage(SwiftStorage): def __init__(self, *args, **kwargs): @@ -18,7 +19,7 @@ class MockSwiftStorage(SwiftStorage): class MockSwiftTests(unittest.TestCase): base_args = { - 'metric_queue': None, + 'context': StorageContext('nyc', None, None), 'swift_container': 'container-name', 'storage_path': '/basepath', 'auth_url': 'https://auth.com', @@ -93,9 +94,26 @@ class FakeSwift(object): self.containers[container].pop(path, None) +class FakeQueue(object): + def __init__(self): + self.items = [] + + def get(self): + if not self.items: + return None + + return self.items.pop() + + def put(self, names, item, available_after=0): + self.items.append({ + 'names': names, + 'item': item, + 'available_after': available_after, + }) + class FakeSwiftTests(unittest.TestCase): base_args = { - 'metric_queue': None, + 'context': StorageContext('nyc', None, None), 'swift_container': 'container-name', 'storage_path': '/basepath', 'auth_url': 'https://auth.com', @@ -170,6 +188,36 @@ class FakeSwiftTests(unittest.TestCase): for segment in SwiftStorage._segment_list_from_metadata(metadata): self.assertFalse(swift.exists(segment.path)) + def test_empty_chunks_queued_for_deletion(self): + chunk_cleanup_queue = FakeQueue() + args = dict(self.base_args) + args['context'] = StorageContext('nyc', None, chunk_cleanup_queue) + + swift = FakeSwiftStorage(**args) + uuid, metadata = swift.initiate_chunked_upload() + + chunks = ['this', '', 'is', 'some', '', 'chunked', 'data', ''] + offset = 0 + for chunk in chunks: + length = len(chunk) + if length == 0: + length = 1 + + bytes_written, metadata, error = swift.stream_upload_chunk(uuid, offset, length, + io.BytesIO(chunk), metadata) + self.assertIsNone(error) + self.assertEquals(bytes_written, len(chunk)) + offset += len(chunk) + + swift.complete_chunked_upload(uuid, 'somepath', metadata) + self.assertEquals(''.join(chunks), swift.get_content('somepath')) + + # Check the chunk deletion queue and ensure we have the last chunk queued. + found = chunk_cleanup_queue.get() + self.assertIsNotNone(found) + + found2 = chunk_cleanup_queue.get() + self.assertIsNone(found2) if __name__ == '__main__': unittest.main() diff --git a/workers/chunkcleanupworker.py b/workers/chunkcleanupworker.py new file mode 100644 index 000000000..d8f8ed384 --- /dev/null +++ b/workers/chunkcleanupworker.py @@ -0,0 +1,39 @@ +import logging +import time + +from app import app, storage, chunk_cleanup_queue +from workers.queueworker import QueueWorker, JobException + +logger = logging.getLogger(__name__) + + +POLL_PERIOD_SECONDS = 10 + + +class ChunkCleanupWorker(QueueWorker): + """ Worker which cleans up chunks enqueued by the storage engine(s). This is typically used to + cleanup empty chunks which are no longer needed. + """ + def process_queue_item(self, job_details): + logger.debug('Got chunk cleanup queue item: %s', job_details) + storage_location = job_details['location'] + storage_path = job_details['path'] + + try: + storage.remove([storage_location], storage_path) + except IOError: + raise JobException() + + +if __name__ == "__main__": + logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) + + engines = set([config[0] for config in app.config.get('DISTRIBUTED_STORAGE_CONFIG', {}).values()]) + if 'SwiftStorage' not in engines: + logger.debug('Swift storage not detected; sleeping') + while True: + time.sleep(10000) + + logger.debug('Starting chunk cleanup worker') + worker = ChunkCleanupWorker(chunk_cleanup_queue, poll_period_seconds=POLL_PERIOD_SECONDS) + worker.start()