diff --git a/data/database.py b/data/database.py index ccfad8b82..cad01b4ac 100644 --- a/data/database.py +++ b/data/database.py @@ -11,6 +11,7 @@ from random import SystemRandom from datetime import datetime from peewee import * from data.read_slave import ReadSlaveModel +from data.fields import ResumableSHAField, JSONField from sqlalchemy.engine.url import make_url from collections import defaultdict @@ -750,35 +751,13 @@ class RepositoryAuthorizedEmail(BaseModel): ) -class ResumableSHAField(TextField): - def db_value(self, value): - sha_state = value.state() - - # One of the fields is a byte string, let's base64 encode it to make sure - # we can store and fetch it regardless of default collocation - sha_state[3] = base64.b64encode(sha_state[3]) - - return json.dumps(sha_state) - - def python_value(self, value): - to_resume = resumablehashlib.sha256() - if value is None: - return to_resume - - sha_state = json.loads(value) - - # We need to base64 decode the data bytestring - sha_state[3] = base64.b64decode(sha_state[3]) - to_resume.set_state(sha_state) - return to_resume - - class BlobUpload(BaseModel): repository = ForeignKeyField(Repository, index=True) uuid = CharField(index=True, unique=True) byte_count = IntegerField(default=0) sha_state = ResumableSHAField(null=True, default=resumablehashlib.sha256) location = ForeignKeyField(ImageStorageLocation) + storage_metadata = JSONField(null=True, default={}) class Meta: database = db diff --git a/data/fields.py b/data/fields.py new file mode 100644 index 000000000..123811ccd --- /dev/null +++ b/data/fields.py @@ -0,0 +1,38 @@ +import base64 +import resumablehashlib +import json + +from peewee import TextField + + +class ResumableSHAField(TextField): + def db_value(self, value): + sha_state = value.state() + + # One of the fields is a byte string, let's base64 encode it to make sure + # we can store and fetch it regardless of default collocation. + sha_state[3] = base64.b64encode(sha_state[3]) + + return json.dumps(sha_state) + + def python_value(self, value): + to_resume = resumablehashlib.sha256() + if value is None: + return to_resume + + sha_state = json.loads(value) + + # We need to base64 decode the data bytestring. + sha_state[3] = base64.b64decode(sha_state[3]) + to_resume.set_state(sha_state) + return to_resume + + +class JSONField(TextField): + def db_value(self, value): + return json.dumps(value) + + def python_value(self, value): + if value is None or value == "": + return {} + return json.loads(value) diff --git a/data/model/blob.py b/data/model/blob.py index 5820ba3b1..a547a6f97 100644 --- a/data/model/blob.py +++ b/data/model/blob.py @@ -67,7 +67,8 @@ def get_blob_upload(namespace, repo_name, upload_uuid): raise InvalidBlobUpload() -def initiate_upload(namespace, repo_name, uuid, location_name): +def initiate_upload(namespace, repo_name, uuid, location_name, storage_metadata): repo = _basequery.get_existing_repository(namespace, repo_name) location = ImageStorageLocation.get(name=location_name) - return BlobUpload.create(repository=repo, location=location, uuid=uuid) + return BlobUpload.create(repository=repo, location=location, uuid=uuid, + storage_metadata=storage_metadata) diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 705794b6b..9867917a1 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -119,8 +119,8 @@ def _render_range(num_uploaded_bytes, with_bytes_prefix=True): @anon_protect def start_blob_upload(namespace, repo_name): location_name = storage.preferred_locations[0] - new_upload_uuid = storage.initiate_chunked_upload(location_name) - model.blob.initiate_upload(namespace, repo_name, new_upload_uuid, location_name) + new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name) + model.blob.initiate_upload(namespace, repo_name, new_upload_uuid, location_name, upload_metadata) digest = request.args.get('digest', None) if digest is None: @@ -205,11 +205,13 @@ def _upload_chunk(namespace, repo_name, upload_uuid): input_fp = wrap_with_handler(input_fp, found.sha_state.update) try: - length_written = storage.stream_upload_chunk({found.location.name}, upload_uuid, start_offset, - length, input_fp) + length_written, new_metadata = storage.stream_upload_chunk({found.location.name}, upload_uuid, + start_offset, length, input_fp, + found.storage_metadata) except InvalidChunkException: _range_not_satisfiable(found.byte_count) + found.storage_metadata = new_metadata found.byte_count += length_written return found @@ -222,7 +224,8 @@ def _finish_upload(namespace, repo_name, upload_obj, expected_digest): # Mark the blob as uploaded. final_blob_location = digest_tools.content_path(expected_digest) - storage.complete_chunked_upload({upload_obj.location.name}, upload_obj.uuid, final_blob_location) + storage.complete_chunked_upload({upload_obj.location.name}, upload_obj.uuid, final_blob_location, + upload_obj.storage_metadata) model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest, upload_obj.location, upload_obj.byte_count, app.config['PUSH_TEMP_TAG_EXPIRATION_SEC']) @@ -278,6 +281,6 @@ def cancel_upload(namespace, repo_name, upload_uuid): # We delete the record for the upload first, since if the partial upload in # storage fails to delete, it doesn't break anything found.delete_instance() - storage.cancel_chunked_upload({found.location.name}, found.uuid) + storage.cancel_chunked_upload({found.location.name}, found.uuid, found.storage_metadata) return make_response('', 204) diff --git a/storage/basestorage.py b/storage/basestorage.py index 55035901c..af51a45e3 100644 --- a/storage/basestorage.py +++ b/storage/basestorage.py @@ -42,18 +42,9 @@ class StoragePaths(object): class BaseStorage(StoragePaths): - """Storage is organized as follow: - $ROOT/images//json - $ROOT/images//layer - $ROOT/repositories/// - """ - - # Useful if we want to change those locations later without rewriting - # the code which uses Storage - repositories = 'repositories' - images = 'images' - # Set the IO buffer to 64kB - buffer_size = 64 * 1024 + def __init__(self): + # Set the IO buffer to 64kB + self.buffer_size = 64 * 1024 def setup(self): """ Called to perform any storage system setup. """ @@ -99,31 +90,55 @@ class BaseStorage(StoragePaths): def get_checksum(self, path): raise NotImplementedError + def stream_write_to_fp(self, in_fp, out_fp, num_bytes=-1): + """ Copy the specified number of bytes from the input file stream to the output stream. If + num_bytes < 0 copy until the stream ends. + """ + bytes_copied = 0 + while bytes_copied < num_bytes or num_bytes == -1: + size_to_read = min(num_bytes - bytes_copied, self.buffer_size) + if size_to_read < 0: + size_to_read = self.buffer_size + + try: + buf = in_fp.read(size_to_read) + if not buf: + break + out_fp.write(buf) + bytes_copied += len(buf) + except IOError: + break + + return bytes_copied + class InvalidChunkException(RuntimeError): pass class BaseStorageV2(BaseStorage): - def initiate_chunked_upload(self, upload_uuid): - """ Start a new chunked upload + def initiate_chunked_upload(self): + """ Start a new chunked upload, returning the uuid and any associated storage metadata """ raise NotImplementedError - def stream_upload_chunk(self, uuid, offset, length, in_fp, hash_obj): + def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata): """ Upload the specified amount of data from the given file pointer to the chunked destination - specified, starting at the given offset. Raises InvalidChunkException if the offset or - length can not be accepted. + specified, starting at the given offset. Returns the number of bytes uploaded, and a new + version of the storage_metadata. Raises InvalidChunkException if the offset or length can + not be accepted. """ raise NotImplementedError - def complete_chunked_upload(self, uuid, final_path): + def complete_chunked_upload(self, uuid, final_path, storage_metadata): """ Complete the chunked upload and store the final results in the path indicated. + Returns nothing. """ raise NotImplementedError - def cancel_chunked_upload(self, uuid): + def cancel_chunked_upload(self, uuid, storage_metadata): """ Cancel the chunked upload and clean up any outstanding partially uploaded data. + Returns nothing. """ raise NotImplementedError diff --git a/storage/cloud.py b/storage/cloud.py index 91dadfb3e..91d2b3fdc 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -3,18 +3,25 @@ import os import logging import boto.s3.connection +import boto.s3.multipart import boto.gs.connection import boto.s3.key import boto.gs.key from io import BufferedIOBase +from uuid import uuid4 -from storage.basestorage import BaseStorage +from storage.basestorage import BaseStorageV2, InvalidChunkException logger = logging.getLogger(__name__) +_MULTIPART_UPLOAD_ID_KEY = 'upload_id' +_LAST_PART_KEY = 'last_part_num' +_LAST_CHUNK_ENCOUNTERED = 'last_chunk_encountered' + + class StreamReadKeyAsFile(BufferedIOBase): def __init__(self, key): self._key = key @@ -37,9 +44,13 @@ class StreamReadKeyAsFile(BufferedIOBase): self._key.close(fast=True) -class _CloudStorage(BaseStorage): +class _CloudStorage(BaseStorageV2): def __init__(self, connection_class, key_class, connect_kwargs, upload_params, storage_path, access_key, secret_key, bucket_name): + super(_CloudStorage, self).__init__() + + self.upload_chunk_size = 5 * 1024 * 1024 + self._initialized = False self._bucket_name = bucket_name self._access_key = access_key @@ -135,12 +146,9 @@ class _CloudStorage(BaseStorage): raise IOError('No such key: \'{0}\''.format(path)) return StreamReadKeyAsFile(key) - def stream_write(self, path, fp, content_type=None, content_encoding=None): + def __initiate_multipart_upload(self, path, content_type, content_encoding): # Minimum size of upload part size on S3 is 5MB self._initialize_cloud_conn() - buffer_size = 5 * 1024 * 1024 - if self.buffer_size > buffer_size: - buffer_size = self.buffer_size path = self._init_path(path) metadata = {} @@ -150,16 +158,20 @@ class _CloudStorage(BaseStorage): if content_encoding is not None: metadata['Content-Encoding'] = content_encoding - mp = self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata, - **self._upload_params) + return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata, + **self._upload_params) + + def stream_write(self, path, fp, content_type=None, content_encoding=None): + mp = self.__initiate_multipart_upload(path, content_type, content_encoding) num_part = 1 while True: try: - buf = fp.read(buffer_size) - if not buf: + buf = StringIO.StringIO() + bytes_written = self.stream_write_to_fp(fp, buf, self.upload_chunk_size) + if bytes_written == 0: break - io = StringIO.StringIO(buf) - mp.upload_part_from_file(io, num_part) + + mp.upload_part_from_file(buf, num_part) num_part += 1 io.close() except IOError: @@ -217,6 +229,57 @@ class _CloudStorage(BaseStorage): return k.etag[1:-1][:7] + def _rel_upload_path(self, uuid): + return 'uploads/{0}'.format(uuid) + + def initiate_chunked_upload(self): + self._initialize_cloud_conn() + random_uuid = str(uuid4()) + path = self._init_path(self._rel_upload_path(random_uuid)) + mpu = self.__initiate_multipart_upload(path, content_type=None, content_encoding=None) + + metadata = { + _MULTIPART_UPLOAD_ID_KEY: mpu.id, + _LAST_PART_KEY: 0, + _LAST_CHUNK_ENCOUNTERED: False, + } + + return mpu.id, metadata + + def _get_multipart_upload_key(self, uuid, storage_metadata): + mpu = boto.s3.multipart.MultiPartUpload(self._cloud_bucket) + mpu.id = storage_metadata[_MULTIPART_UPLOAD_ID_KEY] + mpu.key = self._init_path(self._rel_upload_path(uuid)) + return mpu + + def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata): + self._initialize_cloud_conn() + mpu = self._get_multipart_upload_key(uuid, storage_metadata) + last_part_num = storage_metadata[_LAST_PART_KEY] + + if storage_metadata[_LAST_CHUNK_ENCOUNTERED] and length != 0: + msg = 'Length must be at least the the upload chunk size: %s' % self.upload_chunk_size + raise InvalidChunkException(msg) + + part_num = last_part_num + 1 + mpu.upload_part_from_file(in_fp, part_num, length) + + new_metadata = { + _MULTIPART_UPLOAD_ID_KEY: mpu.id, + _LAST_PART_KEY: part_num, + _LAST_CHUNK_ENCOUNTERED: True, + } + + return length, new_metadata + + def complete_chunked_upload(self, uuid, final_path, storage_metadata): + mpu = self._get_multipart_upload_key(uuid, storage_metadata) + mpu.complete_upload() + + def cancel_chunked_upload(self, uuid, storage_metadata): + mpu = self._get_multipart_upload_key(uuid, storage_metadata) + mpu.cancel_multipart_upload() + class S3Storage(_CloudStorage): def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket): diff --git a/storage/fakestorage.py b/storage/fakestorage.py index f351ca150..d72b5ddc4 100644 --- a/storage/fakestorage.py +++ b/storage/fakestorage.py @@ -1,8 +1,14 @@ -from storage.basestorage import BaseStorage +import cStringIO as StringIO +import hashlib -_FAKE_STORAGE_MAP = {} +from collections import defaultdict +from uuid import uuid4 -class FakeStorage(BaseStorage): +from storage.basestorage import BaseStorageV2 + +_FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO) + +class FakeStorage(BaseStorageV2): def _init_path(self, path=None, create=False): return path @@ -10,16 +16,26 @@ class FakeStorage(BaseStorage): if not path in _FAKE_STORAGE_MAP: raise IOError('Fake file %s not found' % path) - return _FAKE_STORAGE_MAP.get(path) + _FAKE_STORAGE_MAP.get(path).seek(0) + return _FAKE_STORAGE_MAP.get(path).read() def put_content(self, path, content): - _FAKE_STORAGE_MAP[path] = content + _FAKE_STORAGE_MAP.pop(path, None) + _FAKE_STORAGE_MAP[path].write(content) def stream_read(self, path): - yield _FAKE_STORAGE_MAP[path] + io_obj = _FAKE_STORAGE_MAP[path] + io_obj.seek(0) + while True: + buf = io_obj.read(self.buffer_size) + if not buf: + break + yield buf def stream_write(self, path, fp, content_type=None, content_encoding=None): - _FAKE_STORAGE_MAP[path] = fp.read() + out_fp = _FAKE_STORAGE_MAP[path] + out_fp.seek(0) + self.stream_write_to_fp(fp, out_fp) def remove(self, path): _FAKE_STORAGE_MAP.pop(path, None) @@ -28,4 +44,21 @@ class FakeStorage(BaseStorage): return path in _FAKE_STORAGE_MAP def get_checksum(self, path): - return path + return hashlib.sha256(_FAKE_STORAGE_MAP[path].read()).hexdigest()[:7] + + def initiate_chunked_upload(self): + new_uuid = str(uuid4()) + _FAKE_STORAGE_MAP[new_uuid].seek(0) + return new_uuid, {} + + def stream_upload_chunk(self, uuid, offset, length, in_fp, _): + upload_storage = _FAKE_STORAGE_MAP[uuid] + upload_storage.seek(offset) + return self.stream_write_to_fp(in_fp, upload_storage, length) + + def complete_chunked_upload(self, uuid, final_path, _): + _FAKE_STORAGE_MAP[final_path] = _FAKE_STORAGE_MAP[uuid] + _FAKE_STORAGE_MAP.pop(uuid, None) + + def cancel_chunked_upload(self, uuid, _): + _FAKE_STORAGE_MAP.pop(uuid, None) diff --git a/storage/local.py b/storage/local.py index 9495aed9c..1753e4d85 100644 --- a/storage/local.py +++ b/storage/local.py @@ -14,8 +14,8 @@ logger = logging.getLogger(__name__) class LocalStorage(BaseStorageV2): - def __init__(self, storage_path): + super(LocalStorage, self).__init__() self._root_path = storage_path def _init_path(self, path=None, create=False): @@ -54,28 +54,7 @@ class LocalStorage(BaseStorageV2): # Size is mandatory path = self._init_path(path, create=True) with open(path, mode='wb') as out_fp: - self._stream_write_to_fp(fp, out_fp) - - def _stream_write_to_fp(self, in_fp, out_fp, num_bytes=-1): - """ Copy the specified number of bytes from the input file stream to the output stream. If - num_bytes < 0 copy until the stream ends. - """ - bytes_copied = 0 - while bytes_copied < num_bytes or num_bytes == -1: - size_to_read = min(num_bytes - bytes_copied, self.buffer_size) - if size_to_read < 0: - size_to_read = self.buffer_size - - try: - buf = in_fp.read(size_to_read) - if not buf: - break - out_fp.write(buf) - bytes_copied += len(buf) - except IOError: - break - - return bytes_copied + self.stream_write_to_fp(fp, out_fp) def list_directory(self, path=None): path = self._init_path(path) @@ -124,14 +103,14 @@ class LocalStorage(BaseStorageV2): with open(self._init_path(self._rel_upload_path(new_uuid), create=True), 'w'): pass - return new_uuid + return new_uuid, {} - def stream_upload_chunk(self, uuid, offset, length, in_fp): + def stream_upload_chunk(self, uuid, offset, length, in_fp, _): with open(self._init_path(self._rel_upload_path(uuid)), 'r+b') as upload_storage: upload_storage.seek(offset) - return self._stream_write_to_fp(in_fp, upload_storage, length) + return self.stream_write_to_fp(in_fp, upload_storage, length), {} - def complete_chunked_upload(self, uuid, final_path): + def complete_chunked_upload(self, uuid, final_path, _): content_path = self._rel_upload_path(uuid) final_path_abs = self._init_path(final_path, create=True) if not self.exists(final_path_abs): @@ -140,7 +119,7 @@ class LocalStorage(BaseStorageV2): else: logger.debug('Content already exists at path: %s', final_path_abs) - def cancel_chunked_upload(self, uuid): + def cancel_chunked_upload(self, uuid, _): content_path = self._init_path(self._rel_upload_path(uuid)) os.remove(content_path) diff --git a/storage/swift.py b/storage/swift.py index 73f743290..a3e3bcfb0 100644 --- a/storage/swift.py +++ b/storage/swift.py @@ -13,6 +13,8 @@ logger = logging.getLogger(__name__) class SwiftStorage(BaseStorage): def __init__(self, swift_container, storage_path, auth_url, swift_user, swift_password, auth_version=None, os_options=None, ca_cert_path=None): + super(SwiftStorage, self).__init__() + self._swift_container = swift_container self._storage_path = storage_path diff --git a/test/data/test.db b/test/data/test.db index e0423a2c8..4bc9e762c 100644 Binary files a/test/data/test.db and b/test/data/test.db differ