Merge pull request #551 from coreos-inc/python-registry-v2-swift
Add V2 storage methods to Swift storage engine
This commit is contained in:
commit
41bfe2ffde
2 changed files with 112 additions and 3 deletions
|
@ -1,4 +1,7 @@
|
||||||
""" Swift storage driver. Based on: github.com/bacongobbler/docker-registry-driver-swift/ """
|
""" Swift storage driver.
|
||||||
|
|
||||||
|
Uses: http://docs.openstack.org/developer/swift/overview_large_objects.html
|
||||||
|
"""
|
||||||
from swiftclient.client import Connection, ClientException
|
from swiftclient.client import Connection, ClientException
|
||||||
from storage.basestorage import BaseStorage
|
from storage.basestorage import BaseStorage
|
||||||
from util.registry.generatorfile import GeneratorFile
|
from util.registry.generatorfile import GeneratorFile
|
||||||
|
@ -6,13 +9,21 @@ from urlparse import urlparse
|
||||||
from random import SystemRandom
|
from random import SystemRandom
|
||||||
from hashlib import sha1
|
from hashlib import sha1
|
||||||
from time import time
|
from time import time
|
||||||
|
from collections import namedtuple
|
||||||
|
from util.registry import filelike
|
||||||
|
|
||||||
|
import copy
|
||||||
import hmac
|
import hmac
|
||||||
import string
|
import string
|
||||||
import logging
|
import logging
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length'])
|
||||||
|
_SEGMENTS_KEY = 'segments'
|
||||||
|
_SEGMENT_DIRECTORY = 'segments'
|
||||||
|
_MAXIMUM_SEGMENT_SIZE = 5000000000 # 5 GB
|
||||||
|
|
||||||
class SwiftStorage(BaseStorage):
|
class SwiftStorage(BaseStorage):
|
||||||
def __init__(self, swift_container, storage_path, auth_url, swift_user,
|
def __init__(self, swift_container, storage_path, auth_url, swift_user,
|
||||||
|
@ -101,9 +112,10 @@ class SwiftStorage(BaseStorage):
|
||||||
logger.exception('Could not get object: %s', path)
|
logger.exception('Could not get object: %s', path)
|
||||||
raise IOError('Path %s not found' % path)
|
raise IOError('Path %s not found' % path)
|
||||||
|
|
||||||
def _put_object(self, path, content, chunk=None, content_type=None, content_encoding=None):
|
def _put_object(self, path, content, chunk=None, content_type=None, content_encoding=None,
|
||||||
|
headers=None):
|
||||||
path = self._normalize_path(path)
|
path = self._normalize_path(path)
|
||||||
headers = {}
|
headers = headers or {}
|
||||||
|
|
||||||
if content_encoding is not None:
|
if content_encoding is not None:
|
||||||
headers['Content-Encoding'] = content_encoding
|
headers['Content-Encoding'] = content_encoding
|
||||||
|
@ -242,3 +254,79 @@ class SwiftStorage(BaseStorage):
|
||||||
raise IOError('Cannot lookup path: %s' % path)
|
raise IOError('Cannot lookup path: %s' % path)
|
||||||
|
|
||||||
return headers.get('etag', '')[1:-1][:7] or self._random_checksum(7)
|
return headers.get('etag', '')[1:-1][:7] or self._random_checksum(7)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _segment_list_from_metadata(storage_metadata):
|
||||||
|
return [_PartUploadMetadata(*segment_args) for segment_args in storage_metadata[_SEGMENTS_KEY]]
|
||||||
|
|
||||||
|
def initiate_chunked_upload(self):
|
||||||
|
random_uuid = str(uuid4())
|
||||||
|
|
||||||
|
metadata = {
|
||||||
|
_SEGMENTS_KEY: [],
|
||||||
|
}
|
||||||
|
|
||||||
|
return random_uuid, metadata
|
||||||
|
|
||||||
|
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata):
|
||||||
|
if length == 0:
|
||||||
|
return 0, storage_metadata
|
||||||
|
|
||||||
|
# Note: Swift limits segments to a maximum of 5GB, so we keep writing segments until we
|
||||||
|
# are finished hitting the data limit.
|
||||||
|
total_bytes_written = 0
|
||||||
|
while True:
|
||||||
|
bytes_written, storage_metadata = self._stream_upload_segment(uuid, offset, length, in_fp,
|
||||||
|
storage_metadata)
|
||||||
|
|
||||||
|
if length != filelike.READ_UNTIL_END:
|
||||||
|
length = length - bytes_written
|
||||||
|
|
||||||
|
offset = offset + bytes_written
|
||||||
|
total_bytes_written = total_bytes_written + bytes_written
|
||||||
|
if bytes_written == 0 or length <= 0:
|
||||||
|
return total_bytes_written, storage_metadata
|
||||||
|
|
||||||
|
def _stream_upload_segment(self, uuid, offset, length, in_fp, storage_metadata):
|
||||||
|
updated_metadata = copy.deepcopy(storage_metadata)
|
||||||
|
segment_count = len(updated_metadata[_SEGMENTS_KEY])
|
||||||
|
segment_path = '%s/%s/%s' % (_SEGMENT_DIRECTORY, uuid, segment_count)
|
||||||
|
|
||||||
|
# Track the number of bytes read and if an explicit length is specified, limit the
|
||||||
|
# file stream to that length.
|
||||||
|
if length == filelike.READ_UNTIL_END:
|
||||||
|
length = _MAXIMUM_SEGMENT_SIZE
|
||||||
|
else:
|
||||||
|
length = min(_MAXIMUM_SEGMENT_SIZE, length)
|
||||||
|
|
||||||
|
limiting_fp = filelike.LimitingStream(in_fp, length)
|
||||||
|
|
||||||
|
# Write the segment to Swift.
|
||||||
|
self.stream_write(segment_path, limiting_fp)
|
||||||
|
|
||||||
|
# We are only going to track keys to which data was confirmed written.
|
||||||
|
bytes_written = limiting_fp.byte_count_read
|
||||||
|
if bytes_written > 0:
|
||||||
|
updated_metadata[_SEGMENTS_KEY].append(_PartUploadMetadata(segment_path, offset,
|
||||||
|
bytes_written))
|
||||||
|
|
||||||
|
return bytes_written, updated_metadata
|
||||||
|
|
||||||
|
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
|
||||||
|
""" Complete the chunked upload and store the final results in the path indicated.
|
||||||
|
Returns nothing.
|
||||||
|
"""
|
||||||
|
# Finally, we write an empty file at the proper location with a X-Object-Manifest
|
||||||
|
# header pointing to the prefix for the segments.
|
||||||
|
segments_prefix_path = self._normalize_path('%s/%s/' % (_SEGMENT_DIRECTORY, uuid))
|
||||||
|
contained_segments_prefix_path = '%s/%s' % (self._swift_container, segments_prefix_path)
|
||||||
|
|
||||||
|
self._put_object(final_path, '', headers={'X-Object-Manifest': contained_segments_prefix_path})
|
||||||
|
|
||||||
|
def cancel_chunked_upload(self, uuid, storage_metadata):
|
||||||
|
""" Cancel the chunked upload and clean up any outstanding partially uploaded data.
|
||||||
|
Returns nothing.
|
||||||
|
"""
|
||||||
|
# Delete all the uploaded segments.
|
||||||
|
for segment in SwiftStorage._segment_list_from_metadata(storage_metadata):
|
||||||
|
self.remove(segment.path)
|
||||||
|
|
|
@ -85,6 +85,27 @@ class FilelikeStreamConcat(BaseStreamFilelike):
|
||||||
return self.read(size)
|
return self.read(size)
|
||||||
|
|
||||||
|
|
||||||
|
class LimitingStream(BaseStreamFilelike):
|
||||||
|
def __init__(self, fileobj, read_limit=READ_UNTIL_END):
|
||||||
|
super(LimitingStream, self).__init__(fileobj)
|
||||||
|
self._read_limit = read_limit
|
||||||
|
self.byte_count_read = 0
|
||||||
|
|
||||||
|
def read(self, size=READ_UNTIL_END):
|
||||||
|
max_bytes_to_read = -1
|
||||||
|
|
||||||
|
# If a read limit is specified, then determine the maximum number of bytes to return.
|
||||||
|
if self._read_limit != READ_UNTIL_END:
|
||||||
|
if size == READ_UNTIL_END:
|
||||||
|
size = self._read_limit
|
||||||
|
|
||||||
|
max_bytes_to_read = min(self._read_limit - self.byte_count_read, size)
|
||||||
|
|
||||||
|
byte_data_read = super(LimitingStream, self).read(max_bytes_to_read)
|
||||||
|
self.byte_count_read = self.byte_count_read + len(byte_data_read)
|
||||||
|
return byte_data_read
|
||||||
|
|
||||||
|
|
||||||
class StreamSlice(BaseStreamFilelike):
|
class StreamSlice(BaseStreamFilelike):
|
||||||
def __init__(self, fileobj, start_offset=0, end_offset_exclusive=READ_UNTIL_END):
|
def __init__(self, fileobj, start_offset=0, end_offset_exclusive=READ_UNTIL_END):
|
||||||
super(StreamSlice, self).__init__(fileobj)
|
super(StreamSlice, self).__init__(fileobj)
|
||||||
|
|
Reference in a new issue