From 6c5916152712b62c3dc09a26252eacd709a9605b Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 25 Sep 2015 14:57:14 -0400 Subject: [PATCH] Add V2 storage methods to Swift storage engine Fixes #508 --- storage/swift.py | 94 +++++++++++++++++++++++++++++++++++++-- util/registry/filelike.py | 21 +++++++++ 2 files changed, 112 insertions(+), 3 deletions(-) diff --git a/storage/swift.py b/storage/swift.py index 7d1fa05ca..211b311a8 100644 --- a/storage/swift.py +++ b/storage/swift.py @@ -1,4 +1,7 @@ -""" Swift storage driver. Based on: github.com/bacongobbler/docker-registry-driver-swift/ """ +""" Swift storage driver. + + Uses: http://docs.openstack.org/developer/swift/overview_large_objects.html +""" from swiftclient.client import Connection, ClientException from storage.basestorage import BaseStorage from util.registry.generatorfile import GeneratorFile @@ -6,13 +9,21 @@ from urlparse import urlparse from random import SystemRandom from hashlib import sha1 from time import time +from collections import namedtuple +from util.registry import filelike +import copy import hmac import string import logging +from uuid import uuid4 logger = logging.getLogger(__name__) +_PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length']) +_SEGMENTS_KEY = 'segments' +_SEGMENT_DIRECTORY = 'segments' +_MAXIMUM_SEGMENT_SIZE = 5000000000 # 5 GB class SwiftStorage(BaseStorage): def __init__(self, swift_container, storage_path, auth_url, swift_user, @@ -101,9 +112,10 @@ class SwiftStorage(BaseStorage): logger.exception('Could not get object: %s', path) raise IOError('Path %s not found' % path) - def _put_object(self, path, content, chunk=None, content_type=None, content_encoding=None): + def _put_object(self, path, content, chunk=None, content_type=None, content_encoding=None, + headers=None): path = self._normalize_path(path) - headers = {} + headers = headers or {} if content_encoding is not None: headers['Content-Encoding'] = content_encoding @@ -242,3 +254,79 @@ class SwiftStorage(BaseStorage): raise IOError('Cannot lookup path: %s' % path) return headers.get('etag', '')[1:-1][:7] or self._random_checksum(7) + + @staticmethod + def _segment_list_from_metadata(storage_metadata): + return [_PartUploadMetadata(*segment_args) for segment_args in storage_metadata[_SEGMENTS_KEY]] + + def initiate_chunked_upload(self): + random_uuid = str(uuid4()) + + metadata = { + _SEGMENTS_KEY: [], + } + + return random_uuid, metadata + + def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata): + if length == 0: + return 0, storage_metadata + + # Note: Swift limits segments to a maximum of 5GB, so we keep writing segments until we + # are finished hitting the data limit. + total_bytes_written = 0 + while True: + bytes_written, storage_metadata = self._stream_upload_segment(uuid, offset, length, in_fp, + storage_metadata) + + if length != filelike.READ_UNTIL_END: + length = length - bytes_written + + offset = offset + bytes_written + total_bytes_written = total_bytes_written + bytes_written + if bytes_written == 0 or length <= 0: + return total_bytes_written, storage_metadata + + def _stream_upload_segment(self, uuid, offset, length, in_fp, storage_metadata): + updated_metadata = copy.deepcopy(storage_metadata) + segment_count = len(updated_metadata[_SEGMENTS_KEY]) + segment_path = '%s/%s/%s' % (_SEGMENT_DIRECTORY, uuid, segment_count) + + # Track the number of bytes read and if an explicit length is specified, limit the + # file stream to that length. + if length == filelike.READ_UNTIL_END: + length = _MAXIMUM_SEGMENT_SIZE + else: + length = min(_MAXIMUM_SEGMENT_SIZE, length) + + limiting_fp = filelike.LimitingStream(in_fp, length) + + # Write the segment to Swift. + self.stream_write(segment_path, limiting_fp) + + # We are only going to track keys to which data was confirmed written. + bytes_written = limiting_fp.byte_count_read + if bytes_written > 0: + updated_metadata[_SEGMENTS_KEY].append(_PartUploadMetadata(segment_path, offset, + bytes_written)) + + return bytes_written, updated_metadata + + def complete_chunked_upload(self, uuid, final_path, storage_metadata): + """ Complete the chunked upload and store the final results in the path indicated. + Returns nothing. + """ + # Finally, we write an empty file at the proper location with a X-Object-Manifest + # header pointing to the prefix for the segments. + segments_prefix_path = self._normalize_path('%s/%s/' % (_SEGMENT_DIRECTORY, uuid)) + contained_segments_prefix_path = '%s/%s' % (self._swift_container, segments_prefix_path) + + self._put_object(final_path, '', headers={'X-Object-Manifest': contained_segments_prefix_path}) + + def cancel_chunked_upload(self, uuid, storage_metadata): + """ Cancel the chunked upload and clean up any outstanding partially uploaded data. + Returns nothing. + """ + # Delete all the uploaded segments. + for segment in SwiftStorage._segment_list_from_metadata(storage_metadata): + self.remove(segment.path) diff --git a/util/registry/filelike.py b/util/registry/filelike.py index 7acb45c4d..3e1526851 100644 --- a/util/registry/filelike.py +++ b/util/registry/filelike.py @@ -82,6 +82,27 @@ class FilelikeStreamConcat(BaseStreamFilelike): return self.read(size) +class LimitingStream(BaseStreamFilelike): + def __init__(self, fileobj, read_limit=READ_UNTIL_END): + super(LimitingStream, self).__init__(fileobj) + self._read_limit = read_limit + self.byte_count_read = 0 + + def read(self, size=READ_UNTIL_END): + max_bytes_to_read = -1 + + # If a read limit is specified, then determine the maximum number of bytes to return. + if self._read_limit != READ_UNTIL_END: + if size == READ_UNTIL_END: + size = self._read_limit + + max_bytes_to_read = min(self._read_limit - self.byte_count_read, size) + + byte_data_read = super(LimitingStream, self).read(max_bytes_to_read) + self.byte_count_read = self.byte_count_read + len(byte_data_read) + return byte_data_read + + class StreamSlice(BaseStreamFilelike): def __init__(self, fileobj, start_offset=0, end_offset_exclusive=READ_UNTIL_END): super(StreamSlice, self).__init__(fileobj)