Merge pull request #443 from jakedt/python-registry-v2

Checkpoint implementing PATCH according to Docker
This commit is contained in:
Jake Moshenko 2015-09-03 16:26:32 -04:00
commit 6cf13c72ed
5 changed files with 223 additions and 60 deletions

View file

@ -14,7 +14,7 @@ from endpoints.v2.errors import BlobUnknown, BlobUploadInvalid, BlobUploadUnknow
from auth.jwt_auth import process_jwt_auth from auth.jwt_auth import process_jwt_auth
from endpoints.decorators import anon_protect from endpoints.decorators import anon_protect
from util.cache import cache_control from util.cache import cache_control
from util.registry.filelike import wrap_with_handler from util.registry.filelike import wrap_with_handler, StreamSlice
from storage.basestorage import InvalidChunkException from storage.basestorage import InvalidChunkException
@ -201,7 +201,15 @@ def _upload_chunk(namespace, repo_name, upload_uuid):
except _InvalidRangeHeader: except _InvalidRangeHeader:
_range_not_satisfiable(found.byte_count) _range_not_satisfiable(found.byte_count)
if start_offset > 0 and start_offset > found.byte_count:
_range_not_satisfiable(found.byte_count)
input_fp = get_input_stream(request) input_fp = get_input_stream(request)
if start_offset > 0 and start_offset < found.byte_count:
# Skip the bytes which were received on a previous push, which are already stored and
# included in the sha calculation
input_fp = StreamSlice(input_fp, found.byte_count - start_offset)
input_fp = wrap_with_handler(input_fp, found.sha_state.update) input_fp = wrap_with_handler(input_fp, found.sha_state.update)
try: try:
@ -222,10 +230,21 @@ def _finish_upload(namespace, repo_name, upload_obj, expected_digest):
if not digest_tools.digests_equal(computed_digest, expected_digest): if not digest_tools.digests_equal(computed_digest, expected_digest):
raise BlobUploadInvalid() raise BlobUploadInvalid()
# Mark the blob as uploaded. # Move the storage into place, or if this was a re-upload, cancel it
final_blob_location = digest_tools.content_path(expected_digest) final_blob_location = digest_tools.content_path(expected_digest)
storage.complete_chunked_upload({upload_obj.location.name}, upload_obj.uuid, final_blob_location,
if storage.exists({upload_obj.location.name}, final_blob_location):
# It already existed, clean up our upload which served as proof that we had the file
storage.cancel_chunked_upload({upload_obj.location.name}, upload_obj.uuid,
upload_obj.storage_metadata) upload_obj.storage_metadata)
else:
# We were the first ones to upload this image (at least to this location)
# Let's copy it into place
storage.complete_chunked_upload({upload_obj.location.name}, upload_obj.uuid,
final_blob_location, upload_obj.storage_metadata)
# Mark the blob as uploaded.
model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest, model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest,
upload_obj.location, upload_obj.byte_count, upload_obj.location, upload_obj.byte_count,
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC']) app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'])

View file

@ -1,7 +1,7 @@
import tempfile import tempfile
from digest.digest_tools import content_path from digest.digest_tools import content_path
from util.registry.filelike import READ_UNTIL_END
class StoragePaths(object): class StoragePaths(object):
shared_images = 'sharedimages' shared_images = 'sharedimages'
@ -90,12 +90,12 @@ class BaseStorage(StoragePaths):
def get_checksum(self, path): def get_checksum(self, path):
raise NotImplementedError raise NotImplementedError
def stream_write_to_fp(self, in_fp, out_fp, num_bytes=-1): def stream_write_to_fp(self, in_fp, out_fp, num_bytes=READ_UNTIL_END):
""" Copy the specified number of bytes from the input file stream to the output stream. If """ Copy the specified number of bytes from the input file stream to the output stream. If
num_bytes < 0 copy until the stream ends. num_bytes < 0 copy until the stream ends.
""" """
bytes_copied = 0 bytes_copied = 0
while bytes_copied < num_bytes or num_bytes == -1: while bytes_copied < num_bytes or num_bytes == READ_UNTIL_END:
size_to_read = min(num_bytes - bytes_copied, self.buffer_size) size_to_read = min(num_bytes - bytes_copied, self.buffer_size)
if size_to_read < 0: if size_to_read < 0:
size_to_read = self.buffer_size size_to_read = self.buffer_size
@ -126,7 +126,7 @@ class BaseStorageV2(BaseStorage):
""" Upload the specified amount of data from the given file pointer to the chunked destination """ Upload the specified amount of data from the given file pointer to the chunked destination
specified, starting at the given offset. Returns the number of bytes uploaded, and a new specified, starting at the given offset. Returns the number of bytes uploaded, and a new
version of the storage_metadata. Raises InvalidChunkException if the offset or length can version of the storage_metadata. Raises InvalidChunkException if the offset or length can
not be accepted. not be accepted. Pass length as -1 to upload as much data from the in_fp as possible.
""" """
raise NotImplementedError raise NotImplementedError

View file

@ -1,6 +1,7 @@
import cStringIO as StringIO import cStringIO as StringIO
import os import os
import logging import logging
import copy
import boto.s3.connection import boto.s3.connection
import boto.s3.multipart import boto.s3.multipart
@ -10,6 +11,9 @@ import boto.gs.key
from io import BufferedIOBase from io import BufferedIOBase
from uuid import uuid4 from uuid import uuid4
from collections import namedtuple
from util.registry import filelike
from storage.basestorage import BaseStorageV2, InvalidChunkException from storage.basestorage import BaseStorageV2, InvalidChunkException
@ -17,9 +21,8 @@ from storage.basestorage import BaseStorageV2, InvalidChunkException
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_MULTIPART_UPLOAD_ID_KEY = 'upload_id' _PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length'])
_LAST_PART_KEY = 'last_part_num' _CHUNKS_KEY = 'chunks'
_LAST_CHUNK_ENCOUNTERED = 'last_chunk_encountered'
class StreamReadKeyAsFile(BufferedIOBase): class StreamReadKeyAsFile(BufferedIOBase):
@ -49,7 +52,7 @@ class _CloudStorage(BaseStorageV2):
access_key, secret_key, bucket_name): access_key, secret_key, bucket_name):
super(_CloudStorage, self).__init__() super(_CloudStorage, self).__init__()
self.upload_chunk_size = 5 * 1024 * 1024 self.automatic_chunk_size = 5 * 1024 * 1024
self._initialized = False self._initialized = False
self._bucket_name = bucket_name self._bucket_name = bucket_name
@ -162,21 +165,42 @@ class _CloudStorage(BaseStorageV2):
**self._upload_params) **self._upload_params)
def stream_write(self, path, fp, content_type=None, content_encoding=None): def stream_write(self, path, fp, content_type=None, content_encoding=None):
return self._stream_write_internal(path, fp, content_type, content_encoding)
def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
cancel_on_error=True, size=filelike.READ_UNTIL_END):
mp = self.__initiate_multipart_upload(path, content_type, content_encoding) mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
# We are going to reuse this but be VERY careful to only read the number of bytes written to it
buf = StringIO.StringIO()
num_part = 1 num_part = 1
while True: total_bytes_written = 0
while size == filelike.READ_UNTIL_END or total_bytes_written < size:
bytes_to_copy = self.automatic_chunk_size
if size != filelike.READ_UNTIL_END:
# We never want to ask for more bytes than our caller has indicated to copy
bytes_to_copy = min(bytes_to_copy, size - total_bytes_written)
buf.seek(0)
try: try:
buf = StringIO.StringIO() # Stage the bytes into the buffer for use with the multipart upload file API
bytes_written = self.stream_write_to_fp(fp, buf, self.upload_chunk_size) bytes_staged = self.stream_write_to_fp(fp, buf, bytes_to_copy)
if bytes_written == 0: if bytes_staged == 0:
break break
mp.upload_part_from_file(buf, num_part) buf.seek(0)
mp.upload_part_from_file(buf, num_part, size=bytes_staged)
total_bytes_written += bytes_staged
num_part += 1 num_part += 1
io.close()
except IOError: except IOError:
break if cancel_on_error:
mp.complete_upload() mp.cancel_upload()
return 0
if total_bytes_written > 0:
mp.complete_upload()
return total_bytes_written
def list_directory(self, path=None): def list_directory(self, path=None):
self._initialize_cloud_conn() self._initialize_cloud_conn()
@ -235,50 +259,87 @@ class _CloudStorage(BaseStorageV2):
def initiate_chunked_upload(self): def initiate_chunked_upload(self):
self._initialize_cloud_conn() self._initialize_cloud_conn()
random_uuid = str(uuid4()) random_uuid = str(uuid4())
path = self._init_path(self._rel_upload_path(random_uuid))
mpu = self.__initiate_multipart_upload(path, content_type=None, content_encoding=None)
metadata = { metadata = {
_MULTIPART_UPLOAD_ID_KEY: mpu.id, _CHUNKS_KEY: [],
_LAST_PART_KEY: 0,
_LAST_CHUNK_ENCOUNTERED: False,
} }
return mpu.id, metadata return random_uuid, metadata
def _get_multipart_upload_key(self, uuid, storage_metadata):
mpu = boto.s3.multipart.MultiPartUpload(self._cloud_bucket)
mpu.id = storage_metadata[_MULTIPART_UPLOAD_ID_KEY]
mpu.key = self._init_path(self._rel_upload_path(uuid))
return mpu
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata): def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata):
self._initialize_cloud_conn() self._initialize_cloud_conn()
mpu = self._get_multipart_upload_key(uuid, storage_metadata)
last_part_num = storage_metadata[_LAST_PART_KEY]
if storage_metadata[_LAST_CHUNK_ENCOUNTERED] and length != 0: # We are going to upload each chunk to a separate key
msg = 'Length must be at least the the upload chunk size: %s' % self.upload_chunk_size chunk_path = self._rel_upload_path(str(uuid4()))
raise InvalidChunkException(msg) bytes_written = self._stream_write_internal(chunk_path, in_fp, cancel_on_error=False,
size=length)
part_num = last_part_num + 1 new_metadata = copy.deepcopy(storage_metadata)
mpu.upload_part_from_file(in_fp, part_num, length)
new_metadata = { # We are only going to track keys to which data was confirmed written
_MULTIPART_UPLOAD_ID_KEY: mpu.id, if bytes_written > 0:
_LAST_PART_KEY: part_num, new_metadata[_CHUNKS_KEY].append(_PartUploadMetadata(chunk_path, offset, bytes_written))
_LAST_CHUNK_ENCOUNTERED: True,
}
return length, new_metadata return bytes_written, new_metadata
def _chunk_generator(self, chunk_list):
for chunk in chunk_list:
yield filelike.StreamSlice(self.stream_read_file(chunk.path), 0, chunk.length)
@staticmethod
def _chunk_list_from_metadata(storage_metadata):
return [_PartUploadMetadata(*chunk_args) for chunk_args in storage_metadata[_CHUNKS_KEY]]
def complete_chunked_upload(self, uuid, final_path, storage_metadata): def complete_chunked_upload(self, uuid, final_path, storage_metadata):
mpu = self._get_multipart_upload_key(uuid, storage_metadata) self._initialize_cloud_conn()
mpu.complete_upload()
# Here is where things get interesting: we are going to try to assemble this server side
# In order to be a candidate all parts (after offsets have been computed) must be at least 5MB
server_side_assembly = True
chunk_list = self._chunk_list_from_metadata(storage_metadata)
for chunk_offset, chunk in enumerate(chunk_list):
# If the chunk is both too small, and not the last chunk, we rule out server side assembly
if chunk.length < self.automatic_chunk_size and (chunk_offset + 1) < len(chunk_list):
server_side_assembly = False
break
if server_side_assembly:
logger.debug('Performing server side assembly of multi-part upload for: %s', final_path)
try:
# Awesome, we can do this completely server side, now we have to start a new multipart
# upload and use copy_part_from_key to set all of the chunks.
mpu = self.__initiate_multipart_upload(final_path, content_type=None, content_encoding=None)
for chunk_offset, chunk in enumerate(chunk_list):
abs_chunk_path = self._init_path(chunk.path)
part_num = chunk_offset + 1
chunk_end_offset_inclusive = chunk.length - 1
mpu.copy_part_from_key(self.get_cloud_bucket().name, abs_chunk_path, part_num, 0,
chunk_end_offset_inclusive)
mpu.complete_upload()
except IOError as ioe:
# Something bad happened, log it and then give up
msg = 'Exception when attempting server-side assembly for: %s'
logger.exception(msg, final_path)
mpu.cancel_upload()
raise ioe
else:
logger.warning('Performing client side assmebly of multi-part upload for: %s', final_path)
# We are going to turn all of the server side objects into a single file-like stream, and
# pass that to stream_write to chunk and upload the final object.
concatenated = filelike.FilelikeStreamConcat(self._chunk_generator(chunk_list))
self.stream_write(final_path, concatenated)
def cancel_chunked_upload(self, uuid, storage_metadata): def cancel_chunked_upload(self, uuid, storage_metadata):
mpu = self._get_multipart_upload_key(uuid, storage_metadata) self._initialize_cloud_conn()
mpu.cancel_multipart_upload()
# We have to go through and delete all of the uploaded chunks
for chunk in self._chunk_list_from_metadata(storage_metadata):
self.remove(chunk.path)
class S3Storage(_CloudStorage): class S3Storage(_CloudStorage):
@ -337,7 +398,8 @@ class GoogleCloudStorage(_CloudStorage):
</Cors> </Cors>
</CorsConfig>""") </CorsConfig>""")
def stream_write(self, path, fp, content_type=None, content_encoding=None): def _stream_write_internal(self, path, fp, content_type=None, content_encoding=None,
cancel_on_error=True, size=filelike.READ_UNTIL_END):
# Minimum size of upload part size on S3 is 5MB # Minimum size of upload part size on S3 is 5MB
self._initialize_cloud_conn() self._initialize_cloud_conn()
path = self._init_path(path) path = self._init_path(path)
@ -349,6 +411,10 @@ class GoogleCloudStorage(_CloudStorage):
if content_encoding is not None: if content_encoding is not None:
key.set_metadata('Content-Encoding', content_encoding) key.set_metadata('Content-Encoding', content_encoding)
if size != filelike.READ_UNTIL_END:
fp = filelike.StreamSlice(fp, 0, size)
# TODO figure out how to handle cancel_on_error=False
key.set_contents_from_stream(fp) key.set_contents_from_stream(fp)

View file

@ -295,7 +295,6 @@ class RegistryTestCaseMixin(object):
def create_app(self): def create_app(self):
app.config['TESTING'] = True app.config['TESTING'] = True
app.config['DEBUG'] = True
return app return app
def setUp(self): def setUp(self):

View file

@ -1,24 +1,103 @@
class SocketReader(object): WHENCE_ABSOLUTE = 0
def __init__(self, fp): WHENCE_RELATIVE = 1
self._fp = fp WHENCE_RELATIVE_END = 2
READ_UNTIL_END = -1
class BaseStreamFilelike(object):
def __init__(self, fileobj):
self._fileobj = fileobj
self._cursor_position = 0
def read(self, size=READ_UNTIL_END):
buf = self._fileobj.read(size)
self._cursor_position += len(buf)
return buf
def tell(self):
return self._cursor_position
def seek(self, index, whence=WHENCE_ABSOLUTE):
num_bytes_to_ff = 0
if whence == WHENCE_ABSOLUTE:
if index < self._cursor_position:
raise IOError('Cannot seek backwards')
num_bytes_to_ff = index - self._cursor_position
elif whence == WHENCE_RELATIVE:
if index < 0:
raise IOError('Cannnot seek backwards')
num_bytes_to_ff = index
elif whence == WHENCE_RELATIVE_END:
raise IOError('Stream does not have a known end point')
while num_bytes_to_ff > 0:
buf = self._fileobj.read(num_bytes_to_ff)
if not buf:
raise IOError('Seek past end of file')
num_bytes_to_ff -= len(buf)
class SocketReader(BaseStreamFilelike):
def __init__(self, fileobj):
super(SocketReader, self).__init__(fileobj)
self.handlers = [] self.handlers = []
def add_handler(self, handler): def add_handler(self, handler):
self.handlers.append(handler) self.handlers.append(handler)
def read(self, n=-1): def read(self, size=READ_UNTIL_END):
buf = self._fp.read(n) buf = super(SocketReader, self).read(size)
if not buf:
return ''
for handler in self.handlers: for handler in self.handlers:
handler(buf) handler(buf)
return buf return buf
def tell(self):
raise IOError('Stream is not seekable.')
def wrap_with_handler(in_fp, handler): def wrap_with_handler(in_fp, handler):
wrapper = SocketReader(in_fp) wrapper = SocketReader(in_fp)
wrapper.add_handler(handler) wrapper.add_handler(handler)
return wrapper return wrapper
class FilelikeStreamConcat(BaseStreamFilelike):
def __init__(self, file_generator):
super(FilelikeStreamConcat, self).__init__(self)
self._file_generator = file_generator
self._current_file = file_generator.next()
def read(self, size=READ_UNTIL_END):
buf = self._current_file.read(size)
if buf:
self._cursor_position += len(buf)
return buf
# That file was out of data, prime a new one
self._current_file.close()
try:
self._current_file = self._file_generator.next()
except StopIteration:
return ''
return self.read(size)
class StreamSlice(BaseStreamFilelike):
def __init__(self, fileobj, start_offset=0, end_offset_exclusive=READ_UNTIL_END):
super(StreamSlice, self).__init__(fileobj)
self._end_offset_exclusive = end_offset_exclusive
if start_offset > 0:
self.seek(start_offset)
def read(self, size=READ_UNTIL_END):
if self._end_offset_exclusive == READ_UNTIL_END:
# We weren't asked to limit the end of the stream
return super(StreamSlice, self).read(size)
# Compute the max bytes to read until the end or until we reach the user requested max
max_bytes_to_read = self._end_offset_exclusive - self.tell()
if size != READ_UNTIL_END:
max_bytes_to_read = min(max_bytes_to_read, size)
return super(StreamSlice, self).read(max_bytes_to_read)