Checkpoint implementing PATCH according to Docker
This commit is contained in:
parent
35919b4cc8
commit
8269d4ac90
4 changed files with 223 additions and 59 deletions
150
storage/cloud.py
150
storage/cloud.py
|
@ -1,6 +1,7 @@
|
|||
import cStringIO as StringIO
|
||||
import os
|
||||
import logging
|
||||
import copy
|
||||
|
||||
import boto.s3.connection
|
||||
import boto.s3.multipart
|
||||
|
@ -10,6 +11,9 @@ import boto.gs.key
|
|||
|
||||
from io import BufferedIOBase
|
||||
from uuid import uuid4
|
||||
from collections import namedtuple
|
||||
|
||||
from util.registry import filelike
|
||||
|
||||
from storage.basestorage import BaseStorageV2, InvalidChunkException
|
||||
|
||||
|
@ -17,9 +21,8 @@ from storage.basestorage import BaseStorageV2, InvalidChunkException
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_MULTIPART_UPLOAD_ID_KEY = 'upload_id'
|
||||
_LAST_PART_KEY = 'last_part_num'
|
||||
_LAST_CHUNK_ENCOUNTERED = 'last_chunk_encountered'
|
||||
_PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length'])
|
||||
_CHUNKS_KEY = 'chunks'
|
||||
|
||||
|
||||
class StreamReadKeyAsFile(BufferedIOBase):
|
||||
|
@ -49,7 +52,7 @@ class _CloudStorage(BaseStorageV2):
|
|||
access_key, secret_key, bucket_name):
|
||||
super(_CloudStorage, self).__init__()
|
||||
|
||||
self.upload_chunk_size = 5 * 1024 * 1024
|
||||
self.automatic_chunk_size = 5 * 1024 * 1024
|
||||
|
||||
self._initialized = False
|
||||
self._bucket_name = bucket_name
|
||||
|
@ -162,21 +165,42 @@ class _CloudStorage(BaseStorageV2):
|
|||
**self._upload_params)
|
||||
|
||||
def stream_write(self, path, fp, content_type=None, content_encoding=None):
|
||||
return self._stream_write_internal(path, fp, content_type, content_encoding)
|
||||
|
||||
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
|
||||
cancel_on_error=True, size=filelike.READ_UNTIL_END):
|
||||
mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
|
||||
|
||||
# We are going to reuse this but be VERY careful to only read the number of bytes written to it
|
||||
buf = StringIO.StringIO()
|
||||
|
||||
num_part = 1
|
||||
while True:
|
||||
total_bytes_written = 0
|
||||
while size == filelike.READ_UNTIL_END or total_bytes_written < size:
|
||||
bytes_to_copy = self.automatic_chunk_size
|
||||
if size != filelike.READ_UNTIL_END:
|
||||
# We never want to ask for more bytes than our caller has indicated to copy
|
||||
bytes_to_copy = min(bytes_to_copy, size - total_bytes_written)
|
||||
|
||||
buf.seek(0)
|
||||
try:
|
||||
buf = StringIO.StringIO()
|
||||
bytes_written = self.stream_write_to_fp(fp, buf, self.upload_chunk_size)
|
||||
if bytes_written == 0:
|
||||
# Stage the bytes into the buffer for use with the multipart upload file API
|
||||
bytes_staged = self.stream_write_to_fp(fp, buf, bytes_to_copy)
|
||||
if bytes_staged == 0:
|
||||
break
|
||||
|
||||
mp.upload_part_from_file(buf, num_part)
|
||||
buf.seek(0)
|
||||
mp.upload_part_from_file(buf, num_part, size=bytes_staged)
|
||||
total_bytes_written += bytes_staged
|
||||
num_part += 1
|
||||
io.close()
|
||||
except IOError:
|
||||
break
|
||||
mp.complete_upload()
|
||||
if cancel_on_error:
|
||||
mp.cancel_upload()
|
||||
return 0
|
||||
|
||||
if total_bytes_written > 0:
|
||||
mp.complete_upload()
|
||||
return total_bytes_written
|
||||
|
||||
def list_directory(self, path=None):
|
||||
self._initialize_cloud_conn()
|
||||
|
@ -235,50 +259,87 @@ class _CloudStorage(BaseStorageV2):
|
|||
def initiate_chunked_upload(self):
|
||||
self._initialize_cloud_conn()
|
||||
random_uuid = str(uuid4())
|
||||
path = self._init_path(self._rel_upload_path(random_uuid))
|
||||
mpu = self.__initiate_multipart_upload(path, content_type=None, content_encoding=None)
|
||||
|
||||
metadata = {
|
||||
_MULTIPART_UPLOAD_ID_KEY: mpu.id,
|
||||
_LAST_PART_KEY: 0,
|
||||
_LAST_CHUNK_ENCOUNTERED: False,
|
||||
_CHUNKS_KEY: [],
|
||||
}
|
||||
|
||||
return mpu.id, metadata
|
||||
|
||||
def _get_multipart_upload_key(self, uuid, storage_metadata):
|
||||
mpu = boto.s3.multipart.MultiPartUpload(self._cloud_bucket)
|
||||
mpu.id = storage_metadata[_MULTIPART_UPLOAD_ID_KEY]
|
||||
mpu.key = self._init_path(self._rel_upload_path(uuid))
|
||||
return mpu
|
||||
return random_uuid, metadata
|
||||
|
||||
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata):
|
||||
self._initialize_cloud_conn()
|
||||
mpu = self._get_multipart_upload_key(uuid, storage_metadata)
|
||||
last_part_num = storage_metadata[_LAST_PART_KEY]
|
||||
|
||||
if storage_metadata[_LAST_CHUNK_ENCOUNTERED] and length != 0:
|
||||
msg = 'Length must be at least the the upload chunk size: %s' % self.upload_chunk_size
|
||||
raise InvalidChunkException(msg)
|
||||
# We are going to upload each chunk to a separate key
|
||||
chunk_path = self._rel_upload_path(str(uuid4()))
|
||||
bytes_written = self._stream_write_internal(chunk_path, in_fp, cancel_on_error=False,
|
||||
size=length)
|
||||
|
||||
part_num = last_part_num + 1
|
||||
mpu.upload_part_from_file(in_fp, part_num, length)
|
||||
new_metadata = copy.deepcopy(storage_metadata)
|
||||
|
||||
new_metadata = {
|
||||
_MULTIPART_UPLOAD_ID_KEY: mpu.id,
|
||||
_LAST_PART_KEY: part_num,
|
||||
_LAST_CHUNK_ENCOUNTERED: True,
|
||||
}
|
||||
# We are only going to track keys to which data was confirmed written
|
||||
if bytes_written > 0:
|
||||
new_metadata[_CHUNKS_KEY].append(_PartUploadMetadata(chunk_path, offset, bytes_written))
|
||||
|
||||
return length, new_metadata
|
||||
return bytes_written, new_metadata
|
||||
|
||||
def _chunk_generator(self, chunk_list):
|
||||
for chunk in chunk_list:
|
||||
yield filelike.StreamSlice(self.stream_read_file(chunk.path), 0, chunk.length)
|
||||
|
||||
@staticmethod
|
||||
def _chunk_list_from_metadata(storage_metadata):
|
||||
return [_PartUploadMetadata(*chunk_args) for chunk_args in storage_metadata[_CHUNKS_KEY]]
|
||||
|
||||
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
|
||||
mpu = self._get_multipart_upload_key(uuid, storage_metadata)
|
||||
mpu.complete_upload()
|
||||
self._initialize_cloud_conn()
|
||||
|
||||
# Here is where things get interesting: we are going to try to assemble this server side
|
||||
# In order to be a candidate all parts (after offsets have been computed) must be at least 5MB
|
||||
server_side_assembly = True
|
||||
chunk_list = self._chunk_list_from_metadata(storage_metadata)
|
||||
for chunk_offset, chunk in enumerate(chunk_list):
|
||||
# If the chunk is both too small, and not the last chunk, we rule out server side assembly
|
||||
if chunk.length < self.automatic_chunk_size and (chunk_offset + 1) < len(chunk_list):
|
||||
server_side_assembly = False
|
||||
break
|
||||
|
||||
if server_side_assembly:
|
||||
logger.debug('Performing server side assembly of multi-part upload for: %s', final_path)
|
||||
try:
|
||||
# Awesome, we can do this completely server side, now we have to start a new multipart
|
||||
# upload and use copy_part_from_key to set all of the chunks.
|
||||
mpu = self.__initiate_multipart_upload(final_path, content_type=None, content_encoding=None)
|
||||
|
||||
for chunk_offset, chunk in enumerate(chunk_list):
|
||||
abs_chunk_path = self._init_path(chunk.path)
|
||||
part_num = chunk_offset + 1
|
||||
chunk_end_offset_inclusive = chunk.length - 1
|
||||
mpu.copy_part_from_key(self.get_cloud_bucket().name, abs_chunk_path, part_num, 0,
|
||||
chunk_end_offset_inclusive)
|
||||
mpu.complete_upload()
|
||||
|
||||
except IOError as ioe:
|
||||
# Something bad happened, log it and then give up
|
||||
msg = 'Exception when attempting server-side assembly for: %s'
|
||||
logger.exception(msg, final_path)
|
||||
mpu.cancel_upload()
|
||||
raise ioe
|
||||
|
||||
else:
|
||||
logger.warning('Performing client side assmebly of multi-part upload for: %s', final_path)
|
||||
|
||||
# We are going to turn all of the server side objects into a single file-like stream, and
|
||||
# pass that to stream_write to chunk and upload the final object.
|
||||
concatenated = filelike.FilelikeStreamConcat(self._chunk_generator(chunk_list))
|
||||
self.stream_write(final_path, concatenated)
|
||||
|
||||
|
||||
def cancel_chunked_upload(self, uuid, storage_metadata):
|
||||
mpu = self._get_multipart_upload_key(uuid, storage_metadata)
|
||||
mpu.cancel_multipart_upload()
|
||||
self._initialize_cloud_conn()
|
||||
|
||||
# We have to go through and delete all of the uploaded chunks
|
||||
for chunk in self._chunk_list_from_metadata(storage_metadata):
|
||||
self.remove(chunk.path)
|
||||
|
||||
|
||||
class S3Storage(_CloudStorage):
|
||||
|
@ -337,7 +398,8 @@ class GoogleCloudStorage(_CloudStorage):
|
|||
</Cors>
|
||||
</CorsConfig>""")
|
||||
|
||||
def stream_write(self, path, fp, content_type=None, content_encoding=None):
|
||||
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
|
||||
cancel_on_error=True, size=filelike.READ_UNTIL_END):
|
||||
# Minimum size of upload part size on S3 is 5MB
|
||||
self._initialize_cloud_conn()
|
||||
path = self._init_path(path)
|
||||
|
@ -349,6 +411,10 @@ class GoogleCloudStorage(_CloudStorage):
|
|||
if content_encoding is not None:
|
||||
key.set_metadata('Content-Encoding', content_encoding)
|
||||
|
||||
if size != filelike.READ_UNTIL_END:
|
||||
fp = filelike.StreamSlice(fp, 0, size)
|
||||
|
||||
# TODO figure out how to handle cancel_on_error=False
|
||||
key.set_contents_from_stream(fp)
|
||||
|
||||
|
||||
|
|
Reference in a new issue