""" Swift storage driver. Uses: http://docs.openstack.org/developer/swift/overview_large_objects.html """ from swiftclient.client import Connection, ClientException from storage.basestorage import BaseStorage from util.registry.generatorfile import GeneratorFile from urlparse import urlparse from random import SystemRandom from hashlib import sha1 from time import time from collections import namedtuple from util.registry import filelike import copy import hmac import string import logging from uuid import uuid4 logger = logging.getLogger(__name__) _PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length']) _SEGMENTS_KEY = 'segments' _SEGMENT_DIRECTORY = 'segments' _MAXIMUM_SEGMENT_SIZE = 5000000000 # 5 GB class SwiftStorage(BaseStorage): def __init__(self, swift_container, storage_path, auth_url, swift_user, swift_password, auth_version=None, os_options=None, ca_cert_path=None, temp_url_key=None): super(SwiftStorage, self).__init__() self._swift_container = swift_container self._storage_path = storage_path self._auth_url = auth_url self._ca_cert_path = ca_cert_path self._swift_user = swift_user self._swift_password = swift_password self._temp_url_key = temp_url_key try: self._auth_version = int(auth_version or '2') except ValueError: self._auth_version = 2 self._os_options = os_options or {} self._initialized = False def _get_connection(self): return Connection( authurl=self._auth_url, cacert=self._ca_cert_path, user=self._swift_user, key=self._swift_password, auth_version=self._auth_version, os_options=self._os_options) def _get_relative_path(self, path): if path.startswith(self._storage_path): path = path[len(self._storage_path)] if path.endswith('/'): path = path[:-1] return path def _normalize_path(self, object_path=None): path = self._storage_path if not path.endswith('/'): path = path + '/' path = path + (object_path or '') # Openstack does not like paths starting with '/' and we always normalize # to remove trailing '/' if path.startswith('/'): path = path[1:] if path.endswith('/'): path = path[:-1] return path def _get_container(self, path): path = self._normalize_path(path) if path and not path.endswith('/'): path += '/' try: _, container = self._get_connection().get_container( container=self._swift_container, prefix=path, delimiter='/') return container except: logger.exception('Could not get container: %s', path) raise IOError('Unknown path: %s' % path) def _get_object(self, path, chunk_size=None): path = self._normalize_path(path) try: _, obj = self._get_connection().get_object(self._swift_container, path, resp_chunk_size=chunk_size) return obj except Exception: logger.exception('Could not get object: %s', path) raise IOError('Path %s not found' % path) def _put_object(self, path, content, chunk=None, content_type=None, content_encoding=None, headers=None): path = self._normalize_path(path) headers = headers or {} if content_encoding is not None: headers['Content-Encoding'] = content_encoding try: self._get_connection().put_object(self._swift_container, path, content, chunk_size=chunk, content_type=content_type, headers=headers) except ClientException: # We re-raise client exception here so that validation of config during setup can see # the client exception messages. raise except Exception: logger.exception('Could not put object: %s', path) raise IOError("Could not put content: %s" % path) def _head_object(self, path): path = self._normalize_path(path) try: return self._get_connection().head_object(self._swift_container, path) except Exception: logger.exception('Could not head object: %s', path) return None def get_direct_download_url(self, object_path, expires_in=60, requires_cors=False): if requires_cors: return None # Reference: http://docs.openstack.org/juno/config-reference/content/object-storage-tempurl.html if not self._temp_url_key: return None # Retrieve the auth details for the connection. try: object_url_value, _ = self._get_connection().get_auth() except ClientException: logger.exception('Got client exception when trying to load Swift auth') return None object_url = urlparse(object_url_value) scheme = object_url.scheme path = object_url.path hostname = object_url.netloc if not path.endswith('/'): path = path + '/' object_path = self._normalize_path(object_path) # Generate the signed HMAC body. method = 'GET' expires = int(time() + expires_in) full_path = '%s%s/%s' % (path, self._swift_container, object_path) hmac_body = '%s\n%s\n%s' % (method, expires, full_path) sig = hmac.new(self._temp_url_key.encode('utf-8'), hmac_body.encode('utf-8'), sha1).hexdigest() surl = '{scheme}://{host}{full_path}?temp_url_sig={sig}&temp_url_expires={expires}' return surl.format(scheme=scheme, host=hostname, full_path=full_path, sig=sig, expires=expires) def validate(self, client): if self._temp_url_key: # Add a file to test direct download. self.put_content('dd_path', 'testing 3456') # Generate a direct download URL. dd_url = self.get_direct_download_url('dd_path') if not dd_url: self.remove('dd_path') raise Exception('Could not validate direct download URL; the token may be invalid.') # Try to retrieve the direct download URL. response = client.get(dd_url, timeout=2) # Remove the test file. self.remove('dd_path') if response.status_code != 200: logger.debug('Direct download failure: %s => %s with body %s', dd_url, response.status_code, response.text) msg = 'Direct download URL failed with status code %s. Please check your temp-url-key.' raise Exception(msg % response.status_code) def get_content(self, path): return self._get_object(path) def put_content(self, path, content): self._put_object(path, content) def stream_read(self, path): for data in self._get_object(path, self.buffer_size): yield data def stream_read_file(self, path): return GeneratorFile(self.stream_read(path)) def stream_write(self, path, fp, content_type=None, content_encoding=None): self._put_object(path, fp, self.buffer_size, content_type=content_type, content_encoding=content_encoding) def list_directory(self, path=None): container = self._get_container(path) if not container: raise OSError('Unknown path: %s' % path) for entry in container: param = None if 'name' in entry: param = 'name' elif 'subdir' in entry: param = 'subdir' else: continue yield self._get_relative_path(entry[param]) def exists(self, path): return bool(self._head_object(path)) def remove(self, path): path = self._normalize_path(path) try: self._get_connection().delete_object(self._swift_container, path) except Exception: raise IOError('Cannot delete path: %s' % path) def _random_checksum(self, count): chars = string.ascii_uppercase + string.digits return ''.join(SystemRandom().choice(chars) for _ in range(count)) def get_checksum(self, path): headers = self._head_object(path) if not headers: raise IOError('Cannot lookup path: %s' % path) return headers.get('etag', '')[1:-1][:7] or self._random_checksum(7) @staticmethod def _segment_list_from_metadata(storage_metadata): return [_PartUploadMetadata(*segment_args) for segment_args in storage_metadata[_SEGMENTS_KEY]] def initiate_chunked_upload(self): random_uuid = str(uuid4()) metadata = { _SEGMENTS_KEY: [], } return random_uuid, metadata def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata): if length == 0: return 0, storage_metadata # Note: Swift limits segments to a maximum of 5GB, so we keep writing segments until we # are finished hitting the data limit. total_bytes_written = 0 while True: bytes_written, storage_metadata = self._stream_upload_segment(uuid, offset, length, in_fp, storage_metadata) if length != filelike.READ_UNTIL_END: length = length - bytes_written offset = offset + bytes_written total_bytes_written = total_bytes_written + bytes_written if bytes_written == 0 or length <= 0: return total_bytes_written, storage_metadata def _stream_upload_segment(self, uuid, offset, length, in_fp, storage_metadata): updated_metadata = copy.deepcopy(storage_metadata) segment_count = len(updated_metadata[_SEGMENTS_KEY]) segment_path = '%s/%s/%s' % (_SEGMENT_DIRECTORY, uuid, segment_count) # Track the number of bytes read and if an explicit length is specified, limit the # file stream to that length. if length == filelike.READ_UNTIL_END: length = _MAXIMUM_SEGMENT_SIZE else: length = min(_MAXIMUM_SEGMENT_SIZE, length) limiting_fp = filelike.LimitingStream(in_fp, length) # Write the segment to Swift. self.stream_write(segment_path, limiting_fp) # We are only going to track keys to which data was confirmed written. bytes_written = limiting_fp.tell() if bytes_written > 0: updated_metadata[_SEGMENTS_KEY].append(_PartUploadMetadata(segment_path, offset, bytes_written)) return bytes_written, updated_metadata def complete_chunked_upload(self, uuid, final_path, storage_metadata): """ Complete the chunked upload and store the final results in the path indicated. Returns nothing. """ # Finally, we write an empty file at the proper location with a X-Object-Manifest # header pointing to the prefix for the segments. segments_prefix_path = self._normalize_path('%s/%s/' % (_SEGMENT_DIRECTORY, uuid)) contained_segments_prefix_path = '%s/%s' % (self._swift_container, segments_prefix_path) self._put_object(final_path, '', headers={'X-Object-Manifest': contained_segments_prefix_path}) def cancel_chunked_upload(self, uuid, storage_metadata): """ Cancel the chunked upload and clean up any outstanding partially uploaded data. Returns nothing. """ # Delete all the uploaded segments. for segment in SwiftStorage._segment_list_from_metadata(storage_metadata): self.remove(segment.path)