Add V2 storage methods to Swift storage engine

Fixes #508
This commit is contained in:
Joseph Schorr 2015-09-25 14:57:14 -04:00
parent 7bb15d8f19
commit 6c59161527
2 changed files with 112 additions and 3 deletions

View file

@ -1,4 +1,7 @@
""" Swift storage driver. Based on: github.com/bacongobbler/docker-registry-driver-swift/ """
""" Swift storage driver.
Uses: http://docs.openstack.org/developer/swift/overview_large_objects.html
"""
from swiftclient.client import Connection, ClientException
from storage.basestorage import BaseStorage
from util.registry.generatorfile import GeneratorFile
@ -6,13 +9,21 @@ from urlparse import urlparse
from random import SystemRandom
from hashlib import sha1
from time import time
from collections import namedtuple
from util.registry import filelike
import copy
import hmac
import string
import logging
from uuid import uuid4
logger = logging.getLogger(__name__)
_PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length'])
_SEGMENTS_KEY = 'segments'
_SEGMENT_DIRECTORY = 'segments'
_MAXIMUM_SEGMENT_SIZE = 5000000000 # 5 GB
class SwiftStorage(BaseStorage):
def __init__(self, swift_container, storage_path, auth_url, swift_user,
@ -101,9 +112,10 @@ class SwiftStorage(BaseStorage):
logger.exception('Could not get object: %s', path)
raise IOError('Path %s not found' % path)
def _put_object(self, path, content, chunk=None, content_type=None, content_encoding=None):
def _put_object(self, path, content, chunk=None, content_type=None, content_encoding=None,
headers=None):
path = self._normalize_path(path)
headers = {}
headers = headers or {}
if content_encoding is not None:
headers['Content-Encoding'] = content_encoding
@ -242,3 +254,79 @@ class SwiftStorage(BaseStorage):
raise IOError('Cannot lookup path: %s' % path)
return headers.get('etag', '')[1:-1][:7] or self._random_checksum(7)
@staticmethod
def _segment_list_from_metadata(storage_metadata):
return [_PartUploadMetadata(*segment_args) for segment_args in storage_metadata[_SEGMENTS_KEY]]
def initiate_chunked_upload(self):
random_uuid = str(uuid4())
metadata = {
_SEGMENTS_KEY: [],
}
return random_uuid, metadata
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata):
if length == 0:
return 0, storage_metadata
# Note: Swift limits segments to a maximum of 5GB, so we keep writing segments until we
# are finished hitting the data limit.
total_bytes_written = 0
while True:
bytes_written, storage_metadata = self._stream_upload_segment(uuid, offset, length, in_fp,
storage_metadata)
if length != filelike.READ_UNTIL_END:
length = length - bytes_written
offset = offset + bytes_written
total_bytes_written = total_bytes_written + bytes_written
if bytes_written == 0 or length <= 0:
return total_bytes_written, storage_metadata
def _stream_upload_segment(self, uuid, offset, length, in_fp, storage_metadata):
updated_metadata = copy.deepcopy(storage_metadata)
segment_count = len(updated_metadata[_SEGMENTS_KEY])
segment_path = '%s/%s/%s' % (_SEGMENT_DIRECTORY, uuid, segment_count)
# Track the number of bytes read and if an explicit length is specified, limit the
# file stream to that length.
if length == filelike.READ_UNTIL_END:
length = _MAXIMUM_SEGMENT_SIZE
else:
length = min(_MAXIMUM_SEGMENT_SIZE, length)
limiting_fp = filelike.LimitingStream(in_fp, length)
# Write the segment to Swift.
self.stream_write(segment_path, limiting_fp)
# We are only going to track keys to which data was confirmed written.
bytes_written = limiting_fp.byte_count_read
if bytes_written > 0:
updated_metadata[_SEGMENTS_KEY].append(_PartUploadMetadata(segment_path, offset,
bytes_written))
return bytes_written, updated_metadata
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
""" Complete the chunked upload and store the final results in the path indicated.
Returns nothing.
"""
# Finally, we write an empty file at the proper location with a X-Object-Manifest
# header pointing to the prefix for the segments.
segments_prefix_path = self._normalize_path('%s/%s/' % (_SEGMENT_DIRECTORY, uuid))
contained_segments_prefix_path = '%s/%s' % (self._swift_container, segments_prefix_path)
self._put_object(final_path, '', headers={'X-Object-Manifest': contained_segments_prefix_path})
def cancel_chunked_upload(self, uuid, storage_metadata):
""" Cancel the chunked upload and clean up any outstanding partially uploaded data.
Returns nothing.
"""
# Delete all the uploaded segments.
for segment in SwiftStorage._segment_list_from_metadata(storage_metadata):
self.remove(segment.path)

View file

@ -82,6 +82,27 @@ class FilelikeStreamConcat(BaseStreamFilelike):
return self.read(size)
class LimitingStream(BaseStreamFilelike):
def __init__(self, fileobj, read_limit=READ_UNTIL_END):
super(LimitingStream, self).__init__(fileobj)
self._read_limit = read_limit
self.byte_count_read = 0
def read(self, size=READ_UNTIL_END):
max_bytes_to_read = -1
# If a read limit is specified, then determine the maximum number of bytes to return.
if self._read_limit != READ_UNTIL_END:
if size == READ_UNTIL_END:
size = self._read_limit
max_bytes_to_read = min(self._read_limit - self.byte_count_read, size)
byte_data_read = super(LimitingStream, self).read(max_bytes_to_read)
self.byte_count_read = self.byte_count_read + len(byte_data_read)
return byte_data_read
class StreamSlice(BaseStreamFilelike):
def __init__(self, fileobj, start_offset=0, end_offset_exclusive=READ_UNTIL_END):
super(StreamSlice, self).__init__(fileobj)