Merge pull request #1149 from jakedt/notsoswift

Improve swift path computations
This commit is contained in:
Jake Moshenko 2016-01-15 15:54:35 -05:00
commit 5f10c3f7ed
20 changed files with 133 additions and 18553 deletions

4
app.py
View file

@ -131,7 +131,8 @@ Principal(app, use_sessions=False)
avatar = Avatar(app) avatar = Avatar(app)
login_manager = LoginManager(app) login_manager = LoginManager(app)
mail = Mail(app) mail = Mail(app)
storage = Storage(app) metric_queue = MetricQueue()
storage = Storage(app, metric_queue)
userfiles = Userfiles(app, storage) userfiles = Userfiles(app, storage)
log_archive = LogArchive(app, storage) log_archive = LogArchive(app, storage)
analytics = Analytics(app) analytics = Analytics(app)
@ -142,7 +143,6 @@ authentication = UserAuthentication(app, OVERRIDE_CONFIG_DIRECTORY)
userevents = UserEventsBuilderModule(app) userevents = UserEventsBuilderModule(app)
superusers = SuperUserManager(app) superusers = SuperUserManager(app)
signer = Signer(app, OVERRIDE_CONFIG_DIRECTORY) signer = Signer(app, OVERRIDE_CONFIG_DIRECTORY)
metric_queue = MetricQueue()
start_cloudwatch_sender(metric_queue, app) start_cloudwatch_sender(metric_queue, app)
tf = app.config['DB_TRANSACTION_FACTORY'] tf = app.config['DB_TRANSACTION_FACTORY']

View file

@ -4,28 +4,26 @@ import hashlib
from flask import redirect, Blueprint, abort, send_file, make_response from flask import redirect, Blueprint, abort, send_file, make_response
from app import app, signer, storage import features
from app import app, signer, storage, metric_queue
from auth.auth import process_auth from auth.auth import process_auth
from auth.auth_context import get_authenticated_user from auth.auth_context import get_authenticated_user
from auth.permissions import ReadRepositoryPermission from auth.permissions import ReadRepositoryPermission
from data import model, database from data import model, database
from endpoints.trackhelper import track_and_log from endpoints.trackhelper import track_and_log
from endpoints.decorators import anon_protect from endpoints.decorators import anon_protect
from storage import Storage
from util.registry.queuefile import QueueFile from util.registry.queuefile import QueueFile
from util.registry.queueprocess import QueueProcess from util.registry.queueprocess import QueueProcess
from util.registry.torrent import (make_torrent, per_user_torrent_filename, public_torrent_filename, from util.registry.torrent import (make_torrent, per_user_torrent_filename, public_torrent_filename,
PieceHasher) PieceHasher)
from util.registry.filelike import wrap_with_handler from util.registry.filelike import wrap_with_handler
from util.registry.gzipstream import SizeInfo
from formats.squashed import SquashedDockerImage from formats.squashed import SquashedDockerImage
from formats.aci import ACIImage from formats.aci import ACIImage
from storage import Storage
from endpoints.v2.blob import BLOB_DIGEST_ROUTE from endpoints.v2.blob import BLOB_DIGEST_ROUTE
from endpoints.common import route_show_if from endpoints.common import route_show_if
import features
verbs = Blueprint('verbs', __name__) verbs = Blueprint('verbs', __name__)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -33,8 +31,9 @@ logger = logging.getLogger(__name__)
def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image, def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, image_json, repo_image,
handlers): handlers):
store = Storage(app) """ This method generates a stream of data which will be replicated and read from the queue files.
This method runs in a separate process.
"""
# For performance reasons, we load the full image list here, cache it, then disconnect from # For performance reasons, we load the full image list here, cache it, then disconnect from
# the database. # the database.
with database.UseThenDisconnect(app.config): with database.UseThenDisconnect(app.config):
@ -49,6 +48,8 @@ def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, imag
yield current_image yield current_image
def get_next_layer(): def get_next_layer():
# Re-Initialize the storage engine because some may not respond well to forking (e.g. S3)
store = Storage(app, metric_queue)
for current_image_entry in image_list: for current_image_entry in image_list:
current_image_path = model.storage.get_layer_path(current_image_entry.storage) current_image_path = model.storage.get_layer_path(current_image_entry.storage)
current_image_stream = store.stream_read_file(current_image_entry.storage.locations, current_image_stream = store.stream_read_file(current_image_entry.storage.locations,
@ -69,6 +70,8 @@ def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, imag
def _sign_synthetic_image(verb, linked_storage_uuid, queue_file): def _sign_synthetic_image(verb, linked_storage_uuid, queue_file):
""" Read from the queue file and sign the contents which are generated. This method runs in a
separate process. """
signature = None signature = None
try: try:
signature = signer.detached_sign(queue_file) signature = signer.detached_sign(queue_file)
@ -92,8 +95,9 @@ def _sign_synthetic_image(verb, linked_storage_uuid, queue_file):
def _write_synthetic_image_to_storage(verb, linked_storage_uuid, linked_locations, queue_file): def _write_synthetic_image_to_storage(verb, linked_storage_uuid, linked_locations, queue_file):
store = Storage(app) """ Read from the generated stream and write it back to the storage engine. This method runs in a
separate process.
"""
def handle_exception(ex): def handle_exception(ex):
logger.debug('Exception when building %s image %s: %s', verb, linked_storage_uuid, ex) logger.debug('Exception when building %s image %s: %s', verb, linked_storage_uuid, ex)
@ -102,6 +106,8 @@ def _write_synthetic_image_to_storage(verb, linked_storage_uuid, linked_location
queue_file.add_exception_handler(handle_exception) queue_file.add_exception_handler(handle_exception)
# Re-Initialize the storage engine because some may not respond well to forking (e.g. S3)
store = Storage(app, metric_queue)
image_path = store.v1_image_layer_path(linked_storage_uuid) image_path = store.v1_image_layer_path(linked_storage_uuid)
store.stream_write(linked_locations, image_path, queue_file) store.stream_write(linked_locations, image_path, queue_file)
queue_file.close() queue_file.close()
@ -147,8 +153,7 @@ def _verify_repo_verb(store, namespace, repository, tag, verb, checker=None):
def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwargs): def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwargs):
# Verify that the image exists and that we have access to it. # Verify that the image exists and that we have access to it.
store = Storage(app) result = _verify_repo_verb(storage, namespace, repository, tag, verb, checker)
result = _verify_repo_verb(store, namespace, repository, tag, verb, checker)
(repo_image, _, _) = result (repo_image, _, _) = result
# Lookup the derived image storage for the verb. # Lookup the derived image storage for the verb.
@ -171,8 +176,7 @@ def _repo_verb_signature(namespace, repository, tag, verb, checker=None, **kwarg
def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=None, **kwargs): def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=None, **kwargs):
# Verify that the image exists and that we have access to it. # Verify that the image exists and that we have access to it.
store = Storage(app) result = _verify_repo_verb(storage, namespace, repository, tag, verb, checker)
result = _verify_repo_verb(store, namespace, repository, tag, verb, checker)
(repo_image, tag_image, image_json) = result (repo_image, tag_image, image_json) = result
# Log the action. # Log the action.
@ -180,12 +184,12 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
# Lookup/create the derived image storage for the verb and repo image. # Lookup/create the derived image storage for the verb and repo image.
derived = model.image.find_or_create_derived_storage(repo_image, verb, derived = model.image.find_or_create_derived_storage(repo_image, verb,
store.preferred_locations[0]) storage.preferred_locations[0])
if not derived.uploading: if not derived.uploading:
logger.debug('Derived %s image %s exists in storage', verb, derived.uuid) logger.debug('Derived %s image %s exists in storage', verb, derived.uuid)
derived_layer_path = model.storage.get_layer_path(derived) derived_layer_path = model.storage.get_layer_path(derived)
download_url = store.get_direct_download_url(derived.locations, derived_layer_path) download_url = storage.get_direct_download_url(derived.locations, derived_layer_path)
if download_url: if download_url:
logger.debug('Redirecting to download URL for derived %s image %s', verb, derived.uuid) logger.debug('Redirecting to download URL for derived %s image %s', verb, derived.uuid)
return redirect(download_url) return redirect(download_url)
@ -194,7 +198,7 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
database.close_db_filter(None) database.close_db_filter(None)
logger.debug('Sending cached derived %s image %s', verb, derived.uuid) logger.debug('Sending cached derived %s image %s', verb, derived.uuid)
return send_file(store.stream_read_file(derived.locations, derived_layer_path)) return send_file(storage.stream_read_file(derived.locations, derived_layer_path))
logger.debug('Building and returning derived %s image %s', verb, derived.uuid) logger.debug('Building and returning derived %s image %s', verb, derived.uuid)

View file

@ -13,27 +13,27 @@ STORAGE_DRIVER_CLASSES = {
'SwiftStorage': SwiftStorage, 'SwiftStorage': SwiftStorage,
} }
def get_storage_driver(storage_params): def get_storage_driver(metric_queue, storage_params):
""" Returns a storage driver class for the given storage configuration """ Returns a storage driver class for the given storage configuration
(a pair of string name and a dict of parameters). """ (a pair of string name and a dict of parameters). """
driver = storage_params[0] driver = storage_params[0]
parameters = storage_params[1] parameters = storage_params[1]
driver_class = STORAGE_DRIVER_CLASSES.get(driver, FakeStorage) driver_class = STORAGE_DRIVER_CLASSES.get(driver, FakeStorage)
return driver_class(**parameters) return driver_class(metric_queue, **parameters)
class Storage(object): class Storage(object):
def __init__(self, app=None): def __init__(self, app=None, metric_queue=None):
self.app = app self.app = app
if app is not None: if app is not None and metric_queue is not None:
self.state = self.init_app(app) self.state = self.init_app(app, metric_queue)
else: else:
self.state = None self.state = None
def init_app(self, app): def init_app(self, app, metric_queue):
storages = {} storages = {}
for location, storage_params in app.config.get('DISTRIBUTED_STORAGE_CONFIG').items(): for location, storage_params in app.config.get('DISTRIBUTED_STORAGE_CONFIG').items():
storages[location] = get_storage_driver(storage_params) storages[location] = get_storage_driver(metric_queue, storage_params)
preference = app.config.get('DISTRIBUTED_STORAGE_PREFERENCE', None) preference = app.config.get('DISTRIBUTED_STORAGE_PREFERENCE', None)
if not preference: if not preference:

View file

@ -74,9 +74,6 @@ class BaseStorage(StoragePaths):
def stream_write(self, path, fp, content_type=None, content_encoding=None): def stream_write(self, path, fp, content_type=None, content_encoding=None):
raise NotImplementedError raise NotImplementedError
def list_directory(self, path=None):
raise NotImplementedError
def exists(self, path): def exists(self, path):
raise NotImplementedError raise NotImplementedError

View file

@ -15,7 +15,6 @@ from collections import namedtuple
from util.registry import filelike from util.registry import filelike
from storage.basestorage import BaseStorageV2, InvalidChunkException from storage.basestorage import BaseStorageV2, InvalidChunkException
import app
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -48,8 +47,8 @@ class StreamReadKeyAsFile(BufferedIOBase):
class _CloudStorage(BaseStorageV2): class _CloudStorage(BaseStorageV2):
def __init__(self, connection_class, key_class, connect_kwargs, upload_params, storage_path, def __init__(self, metric_queue, connection_class, key_class, connect_kwargs, upload_params,
access_key, secret_key, bucket_name): storage_path, access_key, secret_key, bucket_name):
super(_CloudStorage, self).__init__() super(_CloudStorage, self).__init__()
self.automatic_chunk_size = 5 * 1024 * 1024 self.automatic_chunk_size = 5 * 1024 * 1024
@ -65,6 +64,7 @@ class _CloudStorage(BaseStorageV2):
self._connect_kwargs = connect_kwargs self._connect_kwargs = connect_kwargs
self._cloud_conn = None self._cloud_conn = None
self._cloud_bucket = None self._cloud_bucket = None
self._metric_queue = metric_queue
def _initialize_cloud_conn(self): def _initialize_cloud_conn(self):
if not self._initialized: if not self._initialized:
@ -161,7 +161,7 @@ class _CloudStorage(BaseStorageV2):
if content_encoding is not None: if content_encoding is not None:
metadata['Content-Encoding'] = content_encoding metadata['Content-Encoding'] = content_encoding
app.metric_queue.put('MultipartUploadStart', 1) self._metric_queue.put('MultipartUploadStart', 1)
return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata, return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
**self._upload_params) **self._upload_params)
@ -198,7 +198,7 @@ class _CloudStorage(BaseStorageV2):
except IOError as ex: except IOError as ex:
logger.warn('stream write error: %s', ex) logger.warn('stream write error: %s', ex)
error = ex error = ex
app.metric_queue.put('MultipartUploadFailure', 1) self._metric_queue.put('MultipartUploadFailure', 1)
if cancel_on_error: if cancel_on_error:
mp.cancel_upload() mp.cancel_upload()
return 0, error return 0, error
@ -206,31 +206,10 @@ class _CloudStorage(BaseStorageV2):
break break
if total_bytes_written > 0: if total_bytes_written > 0:
app.metric_queue.put('MultipartUploadSuccess', 1) self._metric_queue.put('MultipartUploadSuccess', 1)
mp.complete_upload() mp.complete_upload()
return total_bytes_written, error return total_bytes_written, error
def list_directory(self, path=None):
self._initialize_cloud_conn()
path = self._init_path(path)
if not path.endswith('/'):
path += '/'
ln = 0
if self._root_path != '/':
ln = len(self._root_path)
exists = False
for key in self._cloud_bucket.list(prefix=path, delimiter='/'):
exists = True
name = key.name
if name.endswith('/'):
yield name[ln:-1]
else:
yield name[ln:]
if exists is False:
# In order to be compliant with the LocalStorage API. Even though
# S3 does not have a concept of folders.
raise OSError('No such directory: \'{0}\''.format(path))
def exists(self, path): def exists(self, path):
self._initialize_cloud_conn() self._initialize_cloud_conn()
path = self._init_path(path) path = self._init_path(path)
@ -401,7 +380,8 @@ class _CloudStorage(BaseStorageV2):
class S3Storage(_CloudStorage): class S3Storage(_CloudStorage):
def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket, host=None): def __init__(self, metric_queue, storage_path, s3_access_key, s3_secret_key, s3_bucket,
host=None):
upload_params = { upload_params = {
'encrypt_key': True, 'encrypt_key': True,
} }
@ -411,7 +391,7 @@ class S3Storage(_CloudStorage):
raise ValueError('host name must not start with http:// or https://') raise ValueError('host name must not start with http:// or https://')
connect_kwargs['host'] = host connect_kwargs['host'] = host
super(S3Storage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key, super(S3Storage, self).__init__(metric_queue, boto.s3.connection.S3Connection, boto.s3.key.Key,
connect_kwargs, upload_params, storage_path, s3_access_key, connect_kwargs, upload_params, storage_path, s3_access_key,
s3_secret_key, s3_bucket) s3_secret_key, s3_bucket)
@ -435,12 +415,12 @@ class S3Storage(_CloudStorage):
</CORSConfiguration>""") </CORSConfiguration>""")
class GoogleCloudStorage(_CloudStorage): class GoogleCloudStorage(_CloudStorage):
def __init__(self, storage_path, access_key, secret_key, bucket_name): def __init__(self, metric_queue, storage_path, access_key, secret_key, bucket_name):
upload_params = {} upload_params = {}
connect_kwargs = {} connect_kwargs = {}
super(GoogleCloudStorage, self).__init__(boto.gs.connection.GSConnection, boto.gs.key.Key, super(GoogleCloudStorage, self).__init__(metric_queue, boto.gs.connection.GSConnection,
connect_kwargs, upload_params, storage_path, boto.gs.key.Key, connect_kwargs, upload_params,
access_key, secret_key, bucket_name) storage_path, access_key, secret_key, bucket_name)
def setup(self): def setup(self):
self.get_cloud_bucket().set_cors_xml("""<?xml version="1.0" encoding="UTF-8"?> self.get_cloud_bucket().set_cors_xml("""<?xml version="1.0" encoding="UTF-8"?>
@ -495,16 +475,17 @@ class GoogleCloudStorage(_CloudStorage):
class RadosGWStorage(_CloudStorage): class RadosGWStorage(_CloudStorage):
def __init__(self, hostname, is_secure, storage_path, access_key, secret_key, bucket_name): def __init__(self, metric_queue, hostname, is_secure, storage_path, access_key, secret_key,
bucket_name):
upload_params = {} upload_params = {}
connect_kwargs = { connect_kwargs = {
'host': hostname, 'host': hostname,
'is_secure': is_secure, 'is_secure': is_secure,
'calling_format': boto.s3.connection.OrdinaryCallingFormat(), 'calling_format': boto.s3.connection.OrdinaryCallingFormat(),
} }
super(RadosGWStorage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key, super(RadosGWStorage, self).__init__(metric_queue, boto.s3.connection.S3Connection,
connect_kwargs, upload_params, storage_path, access_key, boto.s3.key.Key, connect_kwargs, upload_params,
secret_key, bucket_name) storage_path, access_key, secret_key, bucket_name)
# TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624 # TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624
def get_direct_download_url(self, path, expires_in=60, requires_cors=False): def get_direct_download_url(self, path, expires_in=60, requires_cors=False):

View file

@ -44,7 +44,6 @@ class DistributedStorage(StoragePaths):
stream_read = _location_aware(BaseStorage.stream_read) stream_read = _location_aware(BaseStorage.stream_read)
stream_read_file = _location_aware(BaseStorage.stream_read_file) stream_read_file = _location_aware(BaseStorage.stream_read_file)
stream_write = _location_aware(BaseStorage.stream_write) stream_write = _location_aware(BaseStorage.stream_write)
list_directory = _location_aware(BaseStorage.list_directory)
exists = _location_aware(BaseStorage.exists) exists = _location_aware(BaseStorage.exists)
remove = _location_aware(BaseStorage.remove) remove = _location_aware(BaseStorage.remove)
get_checksum = _location_aware(BaseStorage.get_checksum) get_checksum = _location_aware(BaseStorage.get_checksum)

View file

@ -9,6 +9,9 @@ from storage.basestorage import BaseStorageV2
_FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO) _FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO)
class FakeStorage(BaseStorageV2): class FakeStorage(BaseStorageV2):
def __init__(self, metric_queue):
super(FakeStorage, self).__init__()
def _init_path(self, path=None, create=False): def _init_path(self, path=None, create=False):
return path return path

View file

@ -14,7 +14,7 @@ logger = logging.getLogger(__name__)
class LocalStorage(BaseStorageV2): class LocalStorage(BaseStorageV2):
def __init__(self, storage_path): def __init__(self, metric_queue, storage_path):
super(LocalStorage, self).__init__() super(LocalStorage, self).__init__()
self._root_path = storage_path self._root_path = storage_path
@ -56,18 +56,6 @@ class LocalStorage(BaseStorageV2):
with open(path, mode='wb') as out_fp: with open(path, mode='wb') as out_fp:
self.stream_write_to_fp(fp, out_fp) self.stream_write_to_fp(fp, out_fp)
def list_directory(self, path=None):
path = self._init_path(path)
prefix = path[len(self._root_path) + 1:] + '/'
exists = False
for d in os.listdir(path):
exists = True
yield prefix + d
if exists is False:
# Raises OSError even when the directory is empty
# (to be consistent with S3)
raise OSError('No such directory: \'{0}\''.format(path))
def exists(self, path): def exists(self, path):
path = self._init_path(path) path = self._init_path(path)
return os.path.exists(path) return os.path.exists(path)

View file

@ -2,22 +2,25 @@
Uses: http://docs.openstack.org/developer/swift/overview_large_objects.html Uses: http://docs.openstack.org/developer/swift/overview_large_objects.html
""" """
from swiftclient.client import Connection, ClientException import os.path
from storage.basestorage import BaseStorage
from util.registry.generatorfile import GeneratorFile
from urlparse import urlparse
from random import SystemRandom
from hashlib import sha1
from time import time
from collections import namedtuple
from util.registry import filelike
import copy import copy
import hmac import hmac
import string import string
import logging import logging
from uuid import uuid4 from uuid import uuid4
from swiftclient.client import Connection, ClientException
from urlparse import urlparse
from random import SystemRandom
from hashlib import sha1
from time import time
from collections import namedtuple
from util.registry import filelike
from storage.basestorage import BaseStorage
from util.registry.generatorfile import GeneratorFile
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length']) _PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length'])
@ -26,12 +29,14 @@ _SEGMENT_DIRECTORY = 'segments'
_MAXIMUM_SEGMENT_SIZE = 5000000000 # 5 GB _MAXIMUM_SEGMENT_SIZE = 5000000000 # 5 GB
class SwiftStorage(BaseStorage): class SwiftStorage(BaseStorage):
def __init__(self, swift_container, storage_path, auth_url, swift_user, def __init__(self, metric_queue, swift_container, storage_path, auth_url, swift_user,
swift_password, auth_version=None, os_options=None, ca_cert_path=None, swift_password, auth_version=None, os_options=None, ca_cert_path=None,
temp_url_key=None): temp_url_key=None, simple_path_concat=False):
super(SwiftStorage, self).__init__() super(SwiftStorage, self).__init__()
self._swift_container = swift_container self._swift_container = swift_container
self._storage_path = storage_path
self._storage_path = storage_path.lstrip('/')
self._simple_path_concat = simple_path_concat
self._auth_url = auth_url self._auth_url = auth_url
self._ca_cert_path = ca_cert_path self._ca_cert_path = ca_cert_path
@ -52,61 +57,30 @@ class SwiftStorage(BaseStorage):
def _get_connection(self): def _get_connection(self):
return Connection( return Connection(
authurl=self._auth_url, authurl=self._auth_url,
cacert=self._ca_cert_path, cacert=self._ca_cert_path,
user=self._swift_user, user=self._swift_user,
key=self._swift_password, key=self._swift_password,
auth_version=self._auth_version, auth_version=self._auth_version,
os_options=self._os_options) os_options=self._os_options
)
def _get_relative_path(self, path): def _normalize_path(self, object_path):
if path.startswith(self._storage_path): """ No matter what inputs we get, we are going to return a path without a leading or trailing
path = path[len(self._storage_path)] '/'
"""
if path.endswith('/'): if self._simple_path_concat:
path = path[:-1] return (self._storage_path + object_path).rstrip('/')
else:
return path return os.path.join(self._storage_path, object_path).rstrip('/')
def _normalize_path(self, object_path=None):
path = self._storage_path
if not path.endswith('/'):
path = path + '/'
path = path + (object_path or '')
# Openstack does not like paths starting with '/' and we always normalize
# to remove trailing '/'
if path.startswith('/'):
path = path[1:]
if path.endswith('/'):
path = path[:-1]
return path
def _get_container(self, path):
path = self._normalize_path(path)
if path and not path.endswith('/'):
path += '/'
try:
_, container = self._get_connection().get_container(
container=self._swift_container,
prefix=path, delimiter='/')
return container
except:
logger.exception('Could not get container: %s', path)
raise IOError('Unknown path: %s' % path)
def _get_object(self, path, chunk_size=None): def _get_object(self, path, chunk_size=None):
path = self._normalize_path(path) path = self._normalize_path(path)
try: try:
_, obj = self._get_connection().get_object(self._swift_container, path, _, obj = self._get_connection().get_object(self._swift_container, path,
resp_chunk_size=chunk_size) resp_chunk_size=chunk_size)
return obj return obj
except Exception: except Exception:
logger.exception('Could not get object: %s', path) logger.exception('Could not get object: %s', path)
@ -157,18 +131,15 @@ class SwiftStorage(BaseStorage):
object_url = urlparse(object_url_value) object_url = urlparse(object_url_value)
scheme = object_url.scheme scheme = object_url.scheme
path = object_url.path path = object_url.path.rstrip('/')
hostname = object_url.netloc hostname = object_url.netloc
if not path.endswith('/'):
path = path + '/'
object_path = self._normalize_path(object_path) object_path = self._normalize_path(object_path)
# Generate the signed HMAC body. # Generate the signed HMAC body.
method = 'GET' method = 'GET'
expires = int(time() + expires_in) expires = int(time() + expires_in)
full_path = '%s%s/%s' % (path, self._swift_container, object_path) full_path = '%s/%s/%s' % (path, self._swift_container, object_path)
hmac_body = '%s\n%s\n%s' % (method, expires, full_path) hmac_body = '%s\n%s\n%s' % (method, expires, full_path)
sig = hmac.new(self._temp_url_key.encode('utf-8'), hmac_body.encode('utf-8'), sha1).hexdigest() sig = hmac.new(self._temp_url_key.encode('utf-8'), hmac_body.encode('utf-8'), sha1).hexdigest()
@ -218,22 +189,6 @@ class SwiftStorage(BaseStorage):
self._put_object(path, fp, self.buffer_size, content_type=content_type, self._put_object(path, fp, self.buffer_size, content_type=content_type,
content_encoding=content_encoding) content_encoding=content_encoding)
def list_directory(self, path=None):
container = self._get_container(path)
if not container:
raise OSError('Unknown path: %s' % path)
for entry in container:
param = None
if 'name' in entry:
param = 'name'
elif 'subdir' in entry:
param = 'subdir'
else:
continue
yield self._get_relative_path(entry[param])
def exists(self, path): def exists(self, path):
return bool(self._head_object(path)) return bool(self._head_object(path))
@ -280,7 +235,8 @@ class SwiftStorage(BaseStorage):
while True: while True:
try: try:
bytes_written, storage_metadata = self._stream_upload_segment(uuid, offset, length, in_fp, bytes_written, storage_metadata = self._stream_upload_segment(uuid, offset, length, in_fp,
storage_metadata, content_type) storage_metadata,
content_type)
except IOError as ex: except IOError as ex:
logger.warn('stream write error: %s', ex) logger.warn('stream write error: %s', ex)
error = ex error = ex
@ -327,7 +283,7 @@ class SwiftStorage(BaseStorage):
""" """
# Finally, we write an empty file at the proper location with a X-Object-Manifest # Finally, we write an empty file at the proper location with a X-Object-Manifest
# header pointing to the prefix for the segments. # header pointing to the prefix for the segments.
segments_prefix_path = self._normalize_path('%s/%s/' % (_SEGMENT_DIRECTORY, uuid)) segments_prefix_path = self._normalize_path('%s/%s' % (_SEGMENT_DIRECTORY, uuid))
contained_segments_prefix_path = '%s/%s' % (self._swift_container, segments_prefix_path) contained_segments_prefix_path = '%s/%s' % (self._swift_container, segments_prefix_path)
self._put_object(final_path, '', headers={'X-Object-Manifest': contained_segments_prefix_path}) self._put_object(final_path, '', headers={'X-Object-Manifest': contained_segments_prefix_path})

View file

@ -1,5 +0,0 @@
{
"removed": [],
"added": [],
"changed": []
}

File diff suppressed because it is too large Load diff

View file

@ -1,38 +0,0 @@
{
"removed": [],
"added": [
"/var/lib/apt/lists/archive.ubuntu.com_ubuntu_dists_raring_Release",
"/var/lib/apt/lists/archive.ubuntu.com_ubuntu_dists_raring_Release.gpg",
"/var/lib/apt/lists/archive.ubuntu.com_ubuntu_dists_raring_main_binary-amd64_Packages",
"/var/lib/apt/lists/archive.ubuntu.com_ubuntu_dists_raring_main_i18n_Translation-en",
"/var/lib/apt/lists/archive.ubuntu.com_ubuntu_dists_raring_main_source_Sources",
"/var/lib/apt/lists/archive.ubuntu.com_ubuntu_dists_raring_multiverse_binary-amd64_Packages",
"/var/lib/apt/lists/archive.ubuntu.com_ubuntu_dists_raring_multiverse_i18n_Translation-en",
"/var/lib/apt/lists/archive.ubuntu.com_ubuntu_dists_raring_multiverse_source_Sources",
"/var/lib/apt/lists/archive.ubuntu.com_ubuntu_dists_raring_restricted_binary-amd64_Packages",
"/var/lib/apt/lists/archive.ubuntu.com_ubuntu_dists_raring_restricted_i18n_Translation-en",
"/var/lib/apt/lists/archive.ubuntu.com_ubuntu_dists_raring_restricted_source_Sources",
"/var/lib/apt/lists/archive.ubuntu.com_ubuntu_dists_raring_universe_binary-amd64_Packages",
"/var/lib/apt/lists/archive.ubuntu.com_ubuntu_dists_raring_universe_i18n_Translation-en",
"/var/lib/apt/lists/archive.ubuntu.com_ubuntu_dists_raring_universe_source_Sources",
"/var/lib/apt/lists/lock",
"/var/lib/apt/lists/security.ubuntu.com_ubuntu_dists_raring-security_Release",
"/var/lib/apt/lists/security.ubuntu.com_ubuntu_dists_raring-security_Release.gpg",
"/var/lib/apt/lists/security.ubuntu.com_ubuntu_dists_raring-security_main_binary-amd64_Packages",
"/var/lib/apt/lists/security.ubuntu.com_ubuntu_dists_raring-security_main_i18n_Translation-en",
"/var/lib/apt/lists/security.ubuntu.com_ubuntu_dists_raring-security_main_source_Sources",
"/var/lib/apt/lists/security.ubuntu.com_ubuntu_dists_raring-security_multiverse_binary-amd64_Packages",
"/var/lib/apt/lists/security.ubuntu.com_ubuntu_dists_raring-security_multiverse_i18n_Translation-en",
"/var/lib/apt/lists/security.ubuntu.com_ubuntu_dists_raring-security_multiverse_source_Sources",
"/var/lib/apt/lists/security.ubuntu.com_ubuntu_dists_raring-security_restricted_binary-amd64_Packages",
"/var/lib/apt/lists/security.ubuntu.com_ubuntu_dists_raring-security_restricted_i18n_Translation-en",
"/var/lib/apt/lists/security.ubuntu.com_ubuntu_dists_raring-security_restricted_source_Sources",
"/var/lib/apt/lists/security.ubuntu.com_ubuntu_dists_raring-security_universe_binary-amd64_Packages",
"/var/lib/apt/lists/security.ubuntu.com_ubuntu_dists_raring-security_universe_i18n_Translation-en",
"/var/lib/apt/lists/security.ubuntu.com_ubuntu_dists_raring-security_universe_source_Sources"
],
"changed": [
"/var/cache/apt/pkgcache.bin",
"/var/cache/apt/srcpkgcache.bin"
]
}

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -1,45 +0,0 @@
{
"removed": [],
"added": [
"/opt/elasticsearch-0.90.5/LICENSE.txt",
"/opt/elasticsearch-0.90.5/NOTICE.txt",
"/opt/elasticsearch-0.90.5/README.textile",
"/opt/elasticsearch-0.90.5/bin/elasticsearch",
"/opt/elasticsearch-0.90.5/bin/elasticsearch.in.sh",
"/opt/elasticsearch-0.90.5/bin/plugin",
"/opt/elasticsearch-0.90.5/config/elasticsearch.yml",
"/opt/elasticsearch-0.90.5/config/logging.yml",
"/opt/elasticsearch-0.90.5/lib/elasticsearch-0.90.5.jar",
"/opt/elasticsearch-0.90.5/lib/jna-3.3.0.jar",
"/opt/elasticsearch-0.90.5/lib/jts-1.12.jar",
"/opt/elasticsearch-0.90.5/lib/log4j-1.2.17.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-analyzers-common-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-codecs-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-core-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-grouping-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-highlighter-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-join-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-memory-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-misc-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-queries-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-queryparser-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-sandbox-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-spatial-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-suggest-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-amd64-freebsd-6.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-amd64-linux.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-amd64-solaris.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-ia64-linux.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-sparc-solaris.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-sparc64-solaris.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-universal-macosx.dylib",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-universal64-macosx.dylib",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-x86-freebsd-5.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-x86-freebsd-6.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-x86-linux.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-x86-solaris.so",
"/opt/elasticsearch-0.90.5/lib/sigar/sigar-1.6.4.jar",
"/opt/elasticsearch-0.90.5/lib/spatial4j-0.3.jar"
],
"changed": []
}

View file

@ -1,5 +0,0 @@
{
"removed": [],
"added": [],
"changed": []
}

View file

@ -1,7 +0,0 @@
{
"removed": [],
"added": [
"/elasticsearch-0.90.5.tar.gz"
],
"changed": []
}

View file

@ -1,8 +0,0 @@
{
"removed": [],
"added": [
"/root/.bash_history",
"/usr/sbin/policy-rc.d"
],
"changed": []
}

View file

@ -1,85 +0,0 @@
{
"removed": [
"/opt/elasticsearch-0.90.5/LICENSE.txt",
"/opt/elasticsearch-0.90.5/NOTICE.txt",
"/opt/elasticsearch-0.90.5/README.textile",
"/opt/elasticsearch-0.90.5/bin/elasticsearch",
"/opt/elasticsearch-0.90.5/bin/elasticsearch.in.sh",
"/opt/elasticsearch-0.90.5/bin/plugin",
"/opt/elasticsearch-0.90.5/config/elasticsearch.yml",
"/opt/elasticsearch-0.90.5/config/logging.yml",
"/opt/elasticsearch-0.90.5/lib/elasticsearch-0.90.5.jar",
"/opt/elasticsearch-0.90.5/lib/jna-3.3.0.jar",
"/opt/elasticsearch-0.90.5/lib/jts-1.12.jar",
"/opt/elasticsearch-0.90.5/lib/log4j-1.2.17.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-analyzers-common-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-codecs-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-core-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-grouping-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-highlighter-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-join-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-memory-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-misc-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-queries-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-queryparser-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-sandbox-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-spatial-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/lucene-suggest-4.4.0.jar",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-amd64-freebsd-6.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-amd64-linux.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-amd64-solaris.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-ia64-linux.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-sparc-solaris.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-sparc64-solaris.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-universal-macosx.dylib",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-universal64-macosx.dylib",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-x86-freebsd-5.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-x86-freebsd-6.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-x86-linux.so",
"/opt/elasticsearch-0.90.5/lib/sigar/libsigar-x86-solaris.so",
"/opt/elasticsearch-0.90.5/lib/sigar/sigar-1.6.4.jar",
"/opt/elasticsearch-0.90.5/lib/spatial4j-0.3.jar"
],
"added": [
"/opt/elasticsearch/LICENSE.txt",
"/opt/elasticsearch/NOTICE.txt",
"/opt/elasticsearch/README.textile",
"/opt/elasticsearch/bin/elasticsearch",
"/opt/elasticsearch/bin/elasticsearch.in.sh",
"/opt/elasticsearch/bin/plugin",
"/opt/elasticsearch/config/elasticsearch.yml",
"/opt/elasticsearch/config/logging.yml",
"/opt/elasticsearch/lib/elasticsearch-0.90.5.jar",
"/opt/elasticsearch/lib/jna-3.3.0.jar",
"/opt/elasticsearch/lib/jts-1.12.jar",
"/opt/elasticsearch/lib/log4j-1.2.17.jar",
"/opt/elasticsearch/lib/lucene-analyzers-common-4.4.0.jar",
"/opt/elasticsearch/lib/lucene-codecs-4.4.0.jar",
"/opt/elasticsearch/lib/lucene-core-4.4.0.jar",
"/opt/elasticsearch/lib/lucene-grouping-4.4.0.jar",
"/opt/elasticsearch/lib/lucene-highlighter-4.4.0.jar",
"/opt/elasticsearch/lib/lucene-join-4.4.0.jar",
"/opt/elasticsearch/lib/lucene-memory-4.4.0.jar",
"/opt/elasticsearch/lib/lucene-misc-4.4.0.jar",
"/opt/elasticsearch/lib/lucene-queries-4.4.0.jar",
"/opt/elasticsearch/lib/lucene-queryparser-4.4.0.jar",
"/opt/elasticsearch/lib/lucene-sandbox-4.4.0.jar",
"/opt/elasticsearch/lib/lucene-spatial-4.4.0.jar",
"/opt/elasticsearch/lib/lucene-suggest-4.4.0.jar",
"/opt/elasticsearch/lib/sigar/libsigar-amd64-freebsd-6.so",
"/opt/elasticsearch/lib/sigar/libsigar-amd64-linux.so",
"/opt/elasticsearch/lib/sigar/libsigar-amd64-solaris.so",
"/opt/elasticsearch/lib/sigar/libsigar-ia64-linux.so",
"/opt/elasticsearch/lib/sigar/libsigar-sparc-solaris.so",
"/opt/elasticsearch/lib/sigar/libsigar-sparc64-solaris.so",
"/opt/elasticsearch/lib/sigar/libsigar-universal-macosx.dylib",
"/opt/elasticsearch/lib/sigar/libsigar-universal64-macosx.dylib",
"/opt/elasticsearch/lib/sigar/libsigar-x86-freebsd-5.so",
"/opt/elasticsearch/lib/sigar/libsigar-x86-freebsd-6.so",
"/opt/elasticsearch/lib/sigar/libsigar-x86-linux.so",
"/opt/elasticsearch/lib/sigar/libsigar-x86-solaris.so",
"/opt/elasticsearch/lib/sigar/sigar-1.6.4.jar",
"/opt/elasticsearch/lib/spatial4j-0.3.jar"
],
"changed": []
}

42
test/test_swift.py Normal file
View file

@ -0,0 +1,42 @@
import unittest
from storage.swift import SwiftStorage
from mock import MagicMock
class TestSwiftStorage(SwiftStorage):
def __init__(self, *args, **kwargs):
super(TestSwiftStorage, self).__init__(*args, **kwargs)
self._connection = MagicMock()
def _get_connection(self):
return self._connection
class SwiftTests(unittest.TestCase):
base_args = {
'metric_queue': None,
'swift_container': 'container-name',
'storage_path': '/basepath',
'auth_url': 'https://auth.com',
'swift_user': 'root',
'swift_password': 'password',
}
def test_fixed_path_concat(self):
swift = TestSwiftStorage(**self.base_args)
swift.exists('object/path')
swift._get_connection().head_object.assert_called_with('container-name', 'basepath/object/path')
def test_simple_path_concat(self):
simple_concat_args = dict(self.base_args)
simple_concat_args['simple_path_concat'] = True
swift = TestSwiftStorage(**simple_concat_args)
swift.exists('object/path')
swift._get_connection().head_object.assert_called_with('container-name', 'basepathobject/path')
if __name__ == '__main__':
unittest.main()