Implement some new methods on the storage engines.
This commit is contained in:
10 changed files with 211 additions and 98 deletions
@ -11,6 +11,7 @@ from random import SystemRandom
from datetime import datetime
from peewee import *
from data.read_slave import ReadSlaveModel
from data.fields import ResumableSHAField, JSONField
from sqlalchemy.engine.url import make_url
from collections import defaultdict
@ -750,35 +751,13 @@ class RepositoryAuthorizedEmail(BaseModel):
class ResumableSHAField(TextField):
def db_value(self, value):
sha_state = value.state()
# One of the fields is a byte string, let's base64 encode it to make sure
# we can store and fetch it regardless of default collocation
sha_state[3] = base64.b64encode(sha_state[3])
return json.dumps(sha_state)
def python_value(self, value):
to_resume = resumablehashlib.sha256()
if value is None:
return to_resume
sha_state = json.loads(value)
# We need to base64 decode the data bytestring
sha_state[3] = base64.b64decode(sha_state[3])
return to_resume
class BlobUpload(BaseModel):
repository = ForeignKeyField(Repository, index=True)
uuid = CharField(index=True, unique=True)
byte_count = IntegerField(default=0)
sha_state = ResumableSHAField(null=True, default=resumablehashlib.sha256)
location = ForeignKeyField(ImageStorageLocation)
storage_metadata = JSONField(null=True, default={})
class Meta:
database = db
Normal file
Normal file
@ -0,0 +1,38 @@
import base64
import resumablehashlib
import json
from peewee import TextField
class ResumableSHAField(TextField):
def db_value(self, value):
sha_state = value.state()
# One of the fields is a byte string, let's base64 encode it to make sure
# we can store and fetch it regardless of default collocation.
sha_state[3] = base64.b64encode(sha_state[3])
return json.dumps(sha_state)
def python_value(self, value):
to_resume = resumablehashlib.sha256()
if value is None:
return to_resume
sha_state = json.loads(value)
# We need to base64 decode the data bytestring.
sha_state[3] = base64.b64decode(sha_state[3])
return to_resume
class JSONField(TextField):
def db_value(self, value):
return json.dumps(value)
def python_value(self, value):
if value is None or value == "":
return {}
return json.loads(value)
@ -67,7 +67,8 @@ def get_blob_upload(namespace, repo_name, upload_uuid):
raise InvalidBlobUpload()
def initiate_upload(namespace, repo_name, uuid, location_name):
def initiate_upload(namespace, repo_name, uuid, location_name, storage_metadata):
repo = _basequery.get_existing_repository(namespace, repo_name)
location = ImageStorageLocation.get(name=location_name)
return BlobUpload.create(repository=repo, location=location, uuid=uuid)
return BlobUpload.create(repository=repo, location=location, uuid=uuid,
@ -119,8 +119,8 @@ def _render_range(num_uploaded_bytes, with_bytes_prefix=True):
def start_blob_upload(namespace, repo_name):
location_name = storage.preferred_locations[0]
new_upload_uuid = storage.initiate_chunked_upload(location_name)
model.blob.initiate_upload(namespace, repo_name, new_upload_uuid, location_name)
new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name)
model.blob.initiate_upload(namespace, repo_name, new_upload_uuid, location_name, upload_metadata)
digest = request.args.get('digest', None)
if digest is None:
@ -205,11 +205,13 @@ def _upload_chunk(namespace, repo_name, upload_uuid):
input_fp = wrap_with_handler(input_fp, found.sha_state.update)
length_written = storage.stream_upload_chunk({}, upload_uuid, start_offset,
length, input_fp)
length_written, new_metadata = storage.stream_upload_chunk({}, upload_uuid,
start_offset, length, input_fp,
except InvalidChunkException:
found.storage_metadata = new_metadata
found.byte_count += length_written
return found
@ -222,7 +224,8 @@ def _finish_upload(namespace, repo_name, upload_obj, expected_digest):
# Mark the blob as uploaded.
final_blob_location = digest_tools.content_path(expected_digest)
storage.complete_chunked_upload({}, upload_obj.uuid, final_blob_location)
storage.complete_chunked_upload({}, upload_obj.uuid, final_blob_location,
model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest,
upload_obj.location, upload_obj.byte_count,
@ -278,6 +281,6 @@ def cancel_upload(namespace, repo_name, upload_uuid):
# We delete the record for the upload first, since if the partial upload in
# storage fails to delete, it doesn't break anything
storage.cancel_chunked_upload({}, found.uuid)
storage.cancel_chunked_upload({}, found.uuid, found.storage_metadata)
return make_response('', 204)
@ -42,18 +42,9 @@ class StoragePaths(object):
class BaseStorage(StoragePaths):
"""Storage is organized as follow:
# Useful if we want to change those locations later without rewriting
# the code which uses Storage
repositories = 'repositories'
images = 'images'
# Set the IO buffer to 64kB
buffer_size = 64 * 1024
def __init__(self):
# Set the IO buffer to 64kB
self.buffer_size = 64 * 1024
def setup(self):
""" Called to perform any storage system setup. """
@ -99,31 +90,55 @@ class BaseStorage(StoragePaths):
def get_checksum(self, path):
raise NotImplementedError
def stream_write_to_fp(self, in_fp, out_fp, num_bytes=-1):
""" Copy the specified number of bytes from the input file stream to the output stream. If
num_bytes < 0 copy until the stream ends.
bytes_copied = 0
while bytes_copied < num_bytes or num_bytes == -1:
size_to_read = min(num_bytes - bytes_copied, self.buffer_size)
if size_to_read < 0:
size_to_read = self.buffer_size
buf =
if not buf:
bytes_copied += len(buf)
except IOError:
return bytes_copied
class InvalidChunkException(RuntimeError):
class BaseStorageV2(BaseStorage):
def initiate_chunked_upload(self, upload_uuid):
""" Start a new chunked upload
def initiate_chunked_upload(self):
""" Start a new chunked upload, returning the uuid and any associated storage metadata
raise NotImplementedError
def stream_upload_chunk(self, uuid, offset, length, in_fp, hash_obj):
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata):
""" Upload the specified amount of data from the given file pointer to the chunked destination
specified, starting at the given offset. Raises InvalidChunkException if the offset or
length can not be accepted.
specified, starting at the given offset. Returns the number of bytes uploaded, and a new
version of the storage_metadata. Raises InvalidChunkException if the offset or length can
not be accepted.
raise NotImplementedError
def complete_chunked_upload(self, uuid, final_path):
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
""" Complete the chunked upload and store the final results in the path indicated.
Returns nothing.
raise NotImplementedError
def cancel_chunked_upload(self, uuid):
def cancel_chunked_upload(self, uuid, storage_metadata):
""" Cancel the chunked upload and clean up any outstanding partially uploaded data.
Returns nothing.
raise NotImplementedError
@ -3,18 +3,25 @@ import os
import logging
import boto.s3.connection
import boto.s3.multipart
import boto.s3.key
from io import BufferedIOBase
from uuid import uuid4
from storage.basestorage import BaseStorage
from storage.basestorage import BaseStorageV2, InvalidChunkException
logger = logging.getLogger(__name__)
_LAST_PART_KEY = 'last_part_num'
_LAST_CHUNK_ENCOUNTERED = 'last_chunk_encountered'
class StreamReadKeyAsFile(BufferedIOBase):
def __init__(self, key):
self._key = key
@ -37,9 +44,13 @@ class StreamReadKeyAsFile(BufferedIOBase):
class _CloudStorage(BaseStorage):
class _CloudStorage(BaseStorageV2):
def __init__(self, connection_class, key_class, connect_kwargs, upload_params, storage_path,
access_key, secret_key, bucket_name):
super(_CloudStorage, self).__init__()
self.upload_chunk_size = 5 * 1024 * 1024
self._initialized = False
self._bucket_name = bucket_name
self._access_key = access_key
@ -135,12 +146,9 @@ class _CloudStorage(BaseStorage):
raise IOError('No such key: \'{0}\''.format(path))
return StreamReadKeyAsFile(key)
def stream_write(self, path, fp, content_type=None, content_encoding=None):
def __initiate_multipart_upload(self, path, content_type, content_encoding):
# Minimum size of upload part size on S3 is 5MB
buffer_size = 5 * 1024 * 1024
if self.buffer_size > buffer_size:
buffer_size = self.buffer_size
path = self._init_path(path)
metadata = {}
@ -150,16 +158,20 @@ class _CloudStorage(BaseStorage):
if content_encoding is not None:
metadata['Content-Encoding'] = content_encoding
mp = self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
def stream_write(self, path, fp, content_type=None, content_encoding=None):
mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
num_part = 1
while True:
buf =
if not buf:
buf = StringIO.StringIO()
bytes_written = self.stream_write_to_fp(fp, buf, self.upload_chunk_size)
if bytes_written == 0:
io = StringIO.StringIO(buf)
mp.upload_part_from_file(io, num_part)
mp.upload_part_from_file(buf, num_part)
num_part += 1
except IOError:
@ -217,6 +229,57 @@ class _CloudStorage(BaseStorage):
return k.etag[1:-1][:7]
def _rel_upload_path(self, uuid):
return 'uploads/{0}'.format(uuid)
def initiate_chunked_upload(self):
random_uuid = str(uuid4())
path = self._init_path(self._rel_upload_path(random_uuid))
mpu = self.__initiate_multipart_upload(path, content_type=None, content_encoding=None)
metadata = {
return, metadata
def _get_multipart_upload_key(self, uuid, storage_metadata):
mpu = boto.s3.multipart.MultiPartUpload(self._cloud_bucket)
|||| = storage_metadata[_MULTIPART_UPLOAD_ID_KEY]
mpu.key = self._init_path(self._rel_upload_path(uuid))
return mpu
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata):
mpu = self._get_multipart_upload_key(uuid, storage_metadata)
last_part_num = storage_metadata[_LAST_PART_KEY]
if storage_metadata[_LAST_CHUNK_ENCOUNTERED] and length != 0:
msg = 'Length must be at least the the upload chunk size: %s' % self.upload_chunk_size
raise InvalidChunkException(msg)
part_num = last_part_num + 1
mpu.upload_part_from_file(in_fp, part_num, length)
new_metadata = {
_LAST_PART_KEY: part_num,
return length, new_metadata
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
mpu = self._get_multipart_upload_key(uuid, storage_metadata)
def cancel_chunked_upload(self, uuid, storage_metadata):
mpu = self._get_multipart_upload_key(uuid, storage_metadata)
class S3Storage(_CloudStorage):
def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket):
@ -1,8 +1,14 @@
from storage.basestorage import BaseStorage
import cStringIO as StringIO
import hashlib
from collections import defaultdict
from uuid import uuid4
class FakeStorage(BaseStorage):
from storage.basestorage import BaseStorageV2
_FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO)
class FakeStorage(BaseStorageV2):
def _init_path(self, path=None, create=False):
return path
@ -10,16 +16,26 @@ class FakeStorage(BaseStorage):
if not path in _FAKE_STORAGE_MAP:
raise IOError('Fake file %s not found' % path)
return _FAKE_STORAGE_MAP.get(path)
return _FAKE_STORAGE_MAP.get(path).read()
def put_content(self, path, content):
_FAKE_STORAGE_MAP[path] = content
_FAKE_STORAGE_MAP.pop(path, None)
def stream_read(self, path):
yield _FAKE_STORAGE_MAP[path]
io_obj = _FAKE_STORAGE_MAP[path]
while True:
buf =
if not buf:
yield buf
def stream_write(self, path, fp, content_type=None, content_encoding=None):
out_fp = _FAKE_STORAGE_MAP[path]
self.stream_write_to_fp(fp, out_fp)
def remove(self, path):
_FAKE_STORAGE_MAP.pop(path, None)
@ -28,4 +44,21 @@ class FakeStorage(BaseStorage):
return path in _FAKE_STORAGE_MAP
def get_checksum(self, path):
return path
return hashlib.sha256(_FAKE_STORAGE_MAP[path].read()).hexdigest()[:7]
def initiate_chunked_upload(self):
new_uuid = str(uuid4())
return new_uuid, {}
def stream_upload_chunk(self, uuid, offset, length, in_fp, _):
upload_storage = _FAKE_STORAGE_MAP[uuid]
return self.stream_write_to_fp(in_fp, upload_storage, length)
def complete_chunked_upload(self, uuid, final_path, _):
_FAKE_STORAGE_MAP.pop(uuid, None)
def cancel_chunked_upload(self, uuid, _):
_FAKE_STORAGE_MAP.pop(uuid, None)
@ -14,8 +14,8 @@ logger = logging.getLogger(__name__)
class LocalStorage(BaseStorageV2):
def __init__(self, storage_path):
super(LocalStorage, self).__init__()
self._root_path = storage_path
def _init_path(self, path=None, create=False):
@ -54,28 +54,7 @@ class LocalStorage(BaseStorageV2):
# Size is mandatory
path = self._init_path(path, create=True)
with open(path, mode='wb') as out_fp:
self._stream_write_to_fp(fp, out_fp)
def _stream_write_to_fp(self, in_fp, out_fp, num_bytes=-1):
""" Copy the specified number of bytes from the input file stream to the output stream. If
num_bytes < 0 copy until the stream ends.
bytes_copied = 0
while bytes_copied < num_bytes or num_bytes == -1:
size_to_read = min(num_bytes - bytes_copied, self.buffer_size)
if size_to_read < 0:
size_to_read = self.buffer_size
buf =
if not buf:
bytes_copied += len(buf)
except IOError:
return bytes_copied
self.stream_write_to_fp(fp, out_fp)
def list_directory(self, path=None):
path = self._init_path(path)
@ -124,14 +103,14 @@ class LocalStorage(BaseStorageV2):
with open(self._init_path(self._rel_upload_path(new_uuid), create=True), 'w'):
return new_uuid
return new_uuid, {}
def stream_upload_chunk(self, uuid, offset, length, in_fp):
def stream_upload_chunk(self, uuid, offset, length, in_fp, _):
with open(self._init_path(self._rel_upload_path(uuid)), 'r+b') as upload_storage:
return self._stream_write_to_fp(in_fp, upload_storage, length)
return self.stream_write_to_fp(in_fp, upload_storage, length), {}
def complete_chunked_upload(self, uuid, final_path):
def complete_chunked_upload(self, uuid, final_path, _):
content_path = self._rel_upload_path(uuid)
final_path_abs = self._init_path(final_path, create=True)
if not self.exists(final_path_abs):
@ -140,7 +119,7 @@ class LocalStorage(BaseStorageV2):
logger.debug('Content already exists at path: %s', final_path_abs)
def cancel_chunked_upload(self, uuid):
def cancel_chunked_upload(self, uuid, _):
content_path = self._init_path(self._rel_upload_path(uuid))
@ -13,6 +13,8 @@ logger = logging.getLogger(__name__)
class SwiftStorage(BaseStorage):
def __init__(self, swift_container, storage_path, auth_url, swift_user,
swift_password, auth_version=None, os_options=None, ca_cert_path=None):
super(SwiftStorage, self).__init__()
self._swift_container = swift_container
self._storage_path = storage_path
Binary file not shown.
Reference in a new issue