From 8269d4ac9019a548b4eac147350aeaf2223f2991 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Wed, 2 Sep 2015 17:31:44 -0400 Subject: [PATCH] Checkpoint implementing PATCH according to Docker --- endpoints/v2/blob.py | 25 ++++++- storage/basestorage.py | 8 +- storage/cloud.py | 150 +++++++++++++++++++++++++++----------- util/registry/filelike.py | 99 ++++++++++++++++++++++--- 4 files changed, 223 insertions(+), 59 deletions(-) diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 590a8421f..69303772a 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -14,7 +14,7 @@ from endpoints.v2.errors import BlobUnknown, BlobUploadInvalid, BlobUploadUnknow from auth.jwt_auth import process_jwt_auth from endpoints.decorators import anon_protect from util.cache import cache_control -from util.registry.filelike import wrap_with_handler +from util.registry.filelike import wrap_with_handler, StreamSlice from storage.basestorage import InvalidChunkException @@ -201,7 +201,15 @@ def _upload_chunk(namespace, repo_name, upload_uuid): except _InvalidRangeHeader: _range_not_satisfiable(found.byte_count) + if start_offset > 0 and start_offset > found.byte_count: + _range_not_satisfiable(found.byte_count) + input_fp = get_input_stream(request) + if start_offset > 0 and start_offset < found.byte_count: + # Skip the bytes which were received on a previous push, which are already stored and + # included in the sha calculation + input_fp = StreamSlice(input_fp, found.byte_count - start_offset) + input_fp = wrap_with_handler(input_fp, found.sha_state.update) try: @@ -222,10 +230,21 @@ def _finish_upload(namespace, repo_name, upload_obj, expected_digest): if not digest_tools.digests_equal(computed_digest, expected_digest): raise BlobUploadInvalid() - # Mark the blob as uploaded. + # Move the storage into place, or if this was a re-upload, cancel it final_blob_location = digest_tools.content_path(expected_digest) - storage.complete_chunked_upload({upload_obj.location.name}, upload_obj.uuid, final_blob_location, + + if storage.exists({upload_obj.location.name}, final_blob_location): + # It already existed, clean up our upload which served as proof that we had the file + storage.cancel_chunked_upload({upload_obj.location.name}, upload_obj.uuid, upload_obj.storage_metadata) + + else: + # We were the first ones to upload this image (at least to this location) + # Let's copy it into place + storage.complete_chunked_upload({upload_obj.location.name}, upload_obj.uuid, + final_blob_location, upload_obj.storage_metadata) + + # Mark the blob as uploaded. model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest, upload_obj.location, upload_obj.byte_count, app.config['PUSH_TEMP_TAG_EXPIRATION_SEC']) diff --git a/storage/basestorage.py b/storage/basestorage.py index af51a45e3..cf344a6ba 100644 --- a/storage/basestorage.py +++ b/storage/basestorage.py @@ -1,7 +1,7 @@ import tempfile from digest.digest_tools import content_path - +from util.registry.filelike import READ_UNTIL_END class StoragePaths(object): shared_images = 'sharedimages' @@ -90,12 +90,12 @@ class BaseStorage(StoragePaths): def get_checksum(self, path): raise NotImplementedError - def stream_write_to_fp(self, in_fp, out_fp, num_bytes=-1): + def stream_write_to_fp(self, in_fp, out_fp, num_bytes=READ_UNTIL_END): """ Copy the specified number of bytes from the input file stream to the output stream. If num_bytes < 0 copy until the stream ends. """ bytes_copied = 0 - while bytes_copied < num_bytes or num_bytes == -1: + while bytes_copied < num_bytes or num_bytes == READ_UNTIL_END: size_to_read = min(num_bytes - bytes_copied, self.buffer_size) if size_to_read < 0: size_to_read = self.buffer_size @@ -126,7 +126,7 @@ class BaseStorageV2(BaseStorage): """ Upload the specified amount of data from the given file pointer to the chunked destination specified, starting at the given offset. Returns the number of bytes uploaded, and a new version of the storage_metadata. Raises InvalidChunkException if the offset or length can - not be accepted. + not be accepted. Pass length as -1 to upload as much data from the in_fp as possible. """ raise NotImplementedError diff --git a/storage/cloud.py b/storage/cloud.py index 91d2b3fdc..923cccdab 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -1,6 +1,7 @@ import cStringIO as StringIO import os import logging +import copy import boto.s3.connection import boto.s3.multipart @@ -10,6 +11,9 @@ import boto.gs.key from io import BufferedIOBase from uuid import uuid4 +from collections import namedtuple + +from util.registry import filelike from storage.basestorage import BaseStorageV2, InvalidChunkException @@ -17,9 +21,8 @@ from storage.basestorage import BaseStorageV2, InvalidChunkException logger = logging.getLogger(__name__) -_MULTIPART_UPLOAD_ID_KEY = 'upload_id' -_LAST_PART_KEY = 'last_part_num' -_LAST_CHUNK_ENCOUNTERED = 'last_chunk_encountered' +_PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length']) +_CHUNKS_KEY = 'chunks' class StreamReadKeyAsFile(BufferedIOBase): @@ -49,7 +52,7 @@ class _CloudStorage(BaseStorageV2): access_key, secret_key, bucket_name): super(_CloudStorage, self).__init__() - self.upload_chunk_size = 5 * 1024 * 1024 + self.automatic_chunk_size = 5 * 1024 * 1024 self._initialized = False self._bucket_name = bucket_name @@ -162,21 +165,42 @@ class _CloudStorage(BaseStorageV2): **self._upload_params) def stream_write(self, path, fp, content_type=None, content_encoding=None): + return self._stream_write_internal(path, fp, content_type, content_encoding) + + def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None, + cancel_on_error=True, size=filelike.READ_UNTIL_END): mp = self.__initiate_multipart_upload(path, content_type, content_encoding) + + # We are going to reuse this but be VERY careful to only read the number of bytes written to it + buf = StringIO.StringIO() + num_part = 1 - while True: + total_bytes_written = 0 + while size == filelike.READ_UNTIL_END or total_bytes_written < size: + bytes_to_copy = self.automatic_chunk_size + if size != filelike.READ_UNTIL_END: + # We never want to ask for more bytes than our caller has indicated to copy + bytes_to_copy = min(bytes_to_copy, size - total_bytes_written) + + buf.seek(0) try: - buf = StringIO.StringIO() - bytes_written = self.stream_write_to_fp(fp, buf, self.upload_chunk_size) - if bytes_written == 0: + # Stage the bytes into the buffer for use with the multipart upload file API + bytes_staged = self.stream_write_to_fp(fp, buf, bytes_to_copy) + if bytes_staged == 0: break - mp.upload_part_from_file(buf, num_part) + buf.seek(0) + mp.upload_part_from_file(buf, num_part, size=bytes_staged) + total_bytes_written += bytes_staged num_part += 1 - io.close() except IOError: - break - mp.complete_upload() + if cancel_on_error: + mp.cancel_upload() + return 0 + + if total_bytes_written > 0: + mp.complete_upload() + return total_bytes_written def list_directory(self, path=None): self._initialize_cloud_conn() @@ -235,50 +259,87 @@ class _CloudStorage(BaseStorageV2): def initiate_chunked_upload(self): self._initialize_cloud_conn() random_uuid = str(uuid4()) - path = self._init_path(self._rel_upload_path(random_uuid)) - mpu = self.__initiate_multipart_upload(path, content_type=None, content_encoding=None) metadata = { - _MULTIPART_UPLOAD_ID_KEY: mpu.id, - _LAST_PART_KEY: 0, - _LAST_CHUNK_ENCOUNTERED: False, + _CHUNKS_KEY: [], } - return mpu.id, metadata - - def _get_multipart_upload_key(self, uuid, storage_metadata): - mpu = boto.s3.multipart.MultiPartUpload(self._cloud_bucket) - mpu.id = storage_metadata[_MULTIPART_UPLOAD_ID_KEY] - mpu.key = self._init_path(self._rel_upload_path(uuid)) - return mpu + return random_uuid, metadata def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata): self._initialize_cloud_conn() - mpu = self._get_multipart_upload_key(uuid, storage_metadata) - last_part_num = storage_metadata[_LAST_PART_KEY] - if storage_metadata[_LAST_CHUNK_ENCOUNTERED] and length != 0: - msg = 'Length must be at least the the upload chunk size: %s' % self.upload_chunk_size - raise InvalidChunkException(msg) + # We are going to upload each chunk to a separate key + chunk_path = self._rel_upload_path(str(uuid4())) + bytes_written = self._stream_write_internal(chunk_path, in_fp, cancel_on_error=False, + size=length) - part_num = last_part_num + 1 - mpu.upload_part_from_file(in_fp, part_num, length) + new_metadata = copy.deepcopy(storage_metadata) - new_metadata = { - _MULTIPART_UPLOAD_ID_KEY: mpu.id, - _LAST_PART_KEY: part_num, - _LAST_CHUNK_ENCOUNTERED: True, - } + # We are only going to track keys to which data was confirmed written + if bytes_written > 0: + new_metadata[_CHUNKS_KEY].append(_PartUploadMetadata(chunk_path, offset, bytes_written)) - return length, new_metadata + return bytes_written, new_metadata + + def _chunk_generator(self, chunk_list): + for chunk in chunk_list: + yield filelike.StreamSlice(self.stream_read_file(chunk.path), 0, chunk.length) + + @staticmethod + def _chunk_list_from_metadata(storage_metadata): + return [_PartUploadMetadata(*chunk_args) for chunk_args in storage_metadata[_CHUNKS_KEY]] def complete_chunked_upload(self, uuid, final_path, storage_metadata): - mpu = self._get_multipart_upload_key(uuid, storage_metadata) - mpu.complete_upload() + self._initialize_cloud_conn() + + # Here is where things get interesting: we are going to try to assemble this server side + # In order to be a candidate all parts (after offsets have been computed) must be at least 5MB + server_side_assembly = True + chunk_list = self._chunk_list_from_metadata(storage_metadata) + for chunk_offset, chunk in enumerate(chunk_list): + # If the chunk is both too small, and not the last chunk, we rule out server side assembly + if chunk.length < self.automatic_chunk_size and (chunk_offset + 1) < len(chunk_list): + server_side_assembly = False + break + + if server_side_assembly: + logger.debug('Performing server side assembly of multi-part upload for: %s', final_path) + try: + # Awesome, we can do this completely server side, now we have to start a new multipart + # upload and use copy_part_from_key to set all of the chunks. + mpu = self.__initiate_multipart_upload(final_path, content_type=None, content_encoding=None) + + for chunk_offset, chunk in enumerate(chunk_list): + abs_chunk_path = self._init_path(chunk.path) + part_num = chunk_offset + 1 + chunk_end_offset_inclusive = chunk.length - 1 + mpu.copy_part_from_key(self.get_cloud_bucket().name, abs_chunk_path, part_num, 0, + chunk_end_offset_inclusive) + mpu.complete_upload() + + except IOError as ioe: + # Something bad happened, log it and then give up + msg = 'Exception when attempting server-side assembly for: %s' + logger.exception(msg, final_path) + mpu.cancel_upload() + raise ioe + + else: + logger.warning('Performing client side assmebly of multi-part upload for: %s', final_path) + + # We are going to turn all of the server side objects into a single file-like stream, and + # pass that to stream_write to chunk and upload the final object. + concatenated = filelike.FilelikeStreamConcat(self._chunk_generator(chunk_list)) + self.stream_write(final_path, concatenated) + def cancel_chunked_upload(self, uuid, storage_metadata): - mpu = self._get_multipart_upload_key(uuid, storage_metadata) - mpu.cancel_multipart_upload() + self._initialize_cloud_conn() + + # We have to go through and delete all of the uploaded chunks + for chunk in self._chunk_list_from_metadata(storage_metadata): + self.remove(chunk.path) class S3Storage(_CloudStorage): @@ -337,7 +398,8 @@ class GoogleCloudStorage(_CloudStorage): """) - def stream_write(self, path, fp, content_type=None, content_encoding=None): + def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None, + cancel_on_error=True, size=filelike.READ_UNTIL_END): # Minimum size of upload part size on S3 is 5MB self._initialize_cloud_conn() path = self._init_path(path) @@ -349,6 +411,10 @@ class GoogleCloudStorage(_CloudStorage): if content_encoding is not None: key.set_metadata('Content-Encoding', content_encoding) + if size != filelike.READ_UNTIL_END: + fp = filelike.StreamSlice(fp, 0, size) + + # TODO figure out how to handle cancel_on_error=False key.set_contents_from_stream(fp) diff --git a/util/registry/filelike.py b/util/registry/filelike.py index 73dc9b9d2..7acb45c4d 100644 --- a/util/registry/filelike.py +++ b/util/registry/filelike.py @@ -1,24 +1,103 @@ -class SocketReader(object): - def __init__(self, fp): - self._fp = fp +WHENCE_ABSOLUTE = 0 +WHENCE_RELATIVE = 1 +WHENCE_RELATIVE_END = 2 + +READ_UNTIL_END = -1 + + +class BaseStreamFilelike(object): + def __init__(self, fileobj): + self._fileobj = fileobj + self._cursor_position = 0 + + def read(self, size=READ_UNTIL_END): + buf = self._fileobj.read(size) + self._cursor_position += len(buf) + return buf + + def tell(self): + return self._cursor_position + + def seek(self, index, whence=WHENCE_ABSOLUTE): + num_bytes_to_ff = 0 + if whence == WHENCE_ABSOLUTE: + if index < self._cursor_position: + raise IOError('Cannot seek backwards') + num_bytes_to_ff = index - self._cursor_position + + elif whence == WHENCE_RELATIVE: + if index < 0: + raise IOError('Cannnot seek backwards') + num_bytes_to_ff = index + + elif whence == WHENCE_RELATIVE_END: + raise IOError('Stream does not have a known end point') + + while num_bytes_to_ff > 0: + buf = self._fileobj.read(num_bytes_to_ff) + if not buf: + raise IOError('Seek past end of file') + num_bytes_to_ff -= len(buf) + + +class SocketReader(BaseStreamFilelike): + def __init__(self, fileobj): + super(SocketReader, self).__init__(fileobj) self.handlers = [] def add_handler(self, handler): self.handlers.append(handler) - def read(self, n=-1): - buf = self._fp.read(n) - if not buf: - return '' + def read(self, size=READ_UNTIL_END): + buf = super(SocketReader, self).read(size) for handler in self.handlers: handler(buf) return buf - def tell(self): - raise IOError('Stream is not seekable.') - def wrap_with_handler(in_fp, handler): wrapper = SocketReader(in_fp) wrapper.add_handler(handler) return wrapper + + +class FilelikeStreamConcat(BaseStreamFilelike): + def __init__(self, file_generator): + super(FilelikeStreamConcat, self).__init__(self) + self._file_generator = file_generator + self._current_file = file_generator.next() + + def read(self, size=READ_UNTIL_END): + buf = self._current_file.read(size) + if buf: + self._cursor_position += len(buf) + return buf + + # That file was out of data, prime a new one + self._current_file.close() + try: + self._current_file = self._file_generator.next() + except StopIteration: + return '' + return self.read(size) + + +class StreamSlice(BaseStreamFilelike): + def __init__(self, fileobj, start_offset=0, end_offset_exclusive=READ_UNTIL_END): + super(StreamSlice, self).__init__(fileobj) + self._end_offset_exclusive = end_offset_exclusive + + if start_offset > 0: + self.seek(start_offset) + + def read(self, size=READ_UNTIL_END): + if self._end_offset_exclusive == READ_UNTIL_END: + # We weren't asked to limit the end of the stream + return super(StreamSlice, self).read(size) + + # Compute the max bytes to read until the end or until we reach the user requested max + max_bytes_to_read = self._end_offset_exclusive - self.tell() + if size != READ_UNTIL_END: + max_bytes_to_read = min(max_bytes_to_read, size) + + return super(StreamSlice, self).read(max_bytes_to_read)