Merge pull request #414 from jakedt/python-registry-v2
Implement some new methods on the storage engines.
This commit is contained in:
commit
afdd687192
10 changed files with 211 additions and 98 deletions
|
@ -11,6 +11,7 @@ from random import SystemRandom
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from peewee import *
|
from peewee import *
|
||||||
from data.read_slave import ReadSlaveModel
|
from data.read_slave import ReadSlaveModel
|
||||||
|
from data.fields import ResumableSHAField, JSONField
|
||||||
from sqlalchemy.engine.url import make_url
|
from sqlalchemy.engine.url import make_url
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
|
@ -750,35 +751,13 @@ class RepositoryAuthorizedEmail(BaseModel):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class ResumableSHAField(TextField):
|
|
||||||
def db_value(self, value):
|
|
||||||
sha_state = value.state()
|
|
||||||
|
|
||||||
# One of the fields is a byte string, let's base64 encode it to make sure
|
|
||||||
# we can store and fetch it regardless of default collocation
|
|
||||||
sha_state[3] = base64.b64encode(sha_state[3])
|
|
||||||
|
|
||||||
return json.dumps(sha_state)
|
|
||||||
|
|
||||||
def python_value(self, value):
|
|
||||||
to_resume = resumablehashlib.sha256()
|
|
||||||
if value is None:
|
|
||||||
return to_resume
|
|
||||||
|
|
||||||
sha_state = json.loads(value)
|
|
||||||
|
|
||||||
# We need to base64 decode the data bytestring
|
|
||||||
sha_state[3] = base64.b64decode(sha_state[3])
|
|
||||||
to_resume.set_state(sha_state)
|
|
||||||
return to_resume
|
|
||||||
|
|
||||||
|
|
||||||
class BlobUpload(BaseModel):
|
class BlobUpload(BaseModel):
|
||||||
repository = ForeignKeyField(Repository, index=True)
|
repository = ForeignKeyField(Repository, index=True)
|
||||||
uuid = CharField(index=True, unique=True)
|
uuid = CharField(index=True, unique=True)
|
||||||
byte_count = IntegerField(default=0)
|
byte_count = IntegerField(default=0)
|
||||||
sha_state = ResumableSHAField(null=True, default=resumablehashlib.sha256)
|
sha_state = ResumableSHAField(null=True, default=resumablehashlib.sha256)
|
||||||
location = ForeignKeyField(ImageStorageLocation)
|
location = ForeignKeyField(ImageStorageLocation)
|
||||||
|
storage_metadata = JSONField(null=True, default={})
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
database = db
|
database = db
|
||||||
|
|
38
data/fields.py
Normal file
38
data/fields.py
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
import base64
|
||||||
|
import resumablehashlib
|
||||||
|
import json
|
||||||
|
|
||||||
|
from peewee import TextField
|
||||||
|
|
||||||
|
|
||||||
|
class ResumableSHAField(TextField):
|
||||||
|
def db_value(self, value):
|
||||||
|
sha_state = value.state()
|
||||||
|
|
||||||
|
# One of the fields is a byte string, let's base64 encode it to make sure
|
||||||
|
# we can store and fetch it regardless of default collocation.
|
||||||
|
sha_state[3] = base64.b64encode(sha_state[3])
|
||||||
|
|
||||||
|
return json.dumps(sha_state)
|
||||||
|
|
||||||
|
def python_value(self, value):
|
||||||
|
to_resume = resumablehashlib.sha256()
|
||||||
|
if value is None:
|
||||||
|
return to_resume
|
||||||
|
|
||||||
|
sha_state = json.loads(value)
|
||||||
|
|
||||||
|
# We need to base64 decode the data bytestring.
|
||||||
|
sha_state[3] = base64.b64decode(sha_state[3])
|
||||||
|
to_resume.set_state(sha_state)
|
||||||
|
return to_resume
|
||||||
|
|
||||||
|
|
||||||
|
class JSONField(TextField):
|
||||||
|
def db_value(self, value):
|
||||||
|
return json.dumps(value)
|
||||||
|
|
||||||
|
def python_value(self, value):
|
||||||
|
if value is None or value == "":
|
||||||
|
return {}
|
||||||
|
return json.loads(value)
|
|
@ -67,7 +67,8 @@ def get_blob_upload(namespace, repo_name, upload_uuid):
|
||||||
raise InvalidBlobUpload()
|
raise InvalidBlobUpload()
|
||||||
|
|
||||||
|
|
||||||
def initiate_upload(namespace, repo_name, uuid, location_name):
|
def initiate_upload(namespace, repo_name, uuid, location_name, storage_metadata):
|
||||||
repo = _basequery.get_existing_repository(namespace, repo_name)
|
repo = _basequery.get_existing_repository(namespace, repo_name)
|
||||||
location = ImageStorageLocation.get(name=location_name)
|
location = ImageStorageLocation.get(name=location_name)
|
||||||
return BlobUpload.create(repository=repo, location=location, uuid=uuid)
|
return BlobUpload.create(repository=repo, location=location, uuid=uuid,
|
||||||
|
storage_metadata=storage_metadata)
|
||||||
|
|
|
@ -119,8 +119,8 @@ def _render_range(num_uploaded_bytes, with_bytes_prefix=True):
|
||||||
@anon_protect
|
@anon_protect
|
||||||
def start_blob_upload(namespace, repo_name):
|
def start_blob_upload(namespace, repo_name):
|
||||||
location_name = storage.preferred_locations[0]
|
location_name = storage.preferred_locations[0]
|
||||||
new_upload_uuid = storage.initiate_chunked_upload(location_name)
|
new_upload_uuid, upload_metadata = storage.initiate_chunked_upload(location_name)
|
||||||
model.blob.initiate_upload(namespace, repo_name, new_upload_uuid, location_name)
|
model.blob.initiate_upload(namespace, repo_name, new_upload_uuid, location_name, upload_metadata)
|
||||||
|
|
||||||
digest = request.args.get('digest', None)
|
digest = request.args.get('digest', None)
|
||||||
if digest is None:
|
if digest is None:
|
||||||
|
@ -205,11 +205,13 @@ def _upload_chunk(namespace, repo_name, upload_uuid):
|
||||||
input_fp = wrap_with_handler(input_fp, found.sha_state.update)
|
input_fp = wrap_with_handler(input_fp, found.sha_state.update)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
length_written = storage.stream_upload_chunk({found.location.name}, upload_uuid, start_offset,
|
length_written, new_metadata = storage.stream_upload_chunk({found.location.name}, upload_uuid,
|
||||||
length, input_fp)
|
start_offset, length, input_fp,
|
||||||
|
found.storage_metadata)
|
||||||
except InvalidChunkException:
|
except InvalidChunkException:
|
||||||
_range_not_satisfiable(found.byte_count)
|
_range_not_satisfiable(found.byte_count)
|
||||||
|
|
||||||
|
found.storage_metadata = new_metadata
|
||||||
found.byte_count += length_written
|
found.byte_count += length_written
|
||||||
return found
|
return found
|
||||||
|
|
||||||
|
@ -222,7 +224,8 @@ def _finish_upload(namespace, repo_name, upload_obj, expected_digest):
|
||||||
|
|
||||||
# Mark the blob as uploaded.
|
# Mark the blob as uploaded.
|
||||||
final_blob_location = digest_tools.content_path(expected_digest)
|
final_blob_location = digest_tools.content_path(expected_digest)
|
||||||
storage.complete_chunked_upload({upload_obj.location.name}, upload_obj.uuid, final_blob_location)
|
storage.complete_chunked_upload({upload_obj.location.name}, upload_obj.uuid, final_blob_location,
|
||||||
|
upload_obj.storage_metadata)
|
||||||
model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest,
|
model.blob.store_blob_record_and_temp_link(namespace, repo_name, expected_digest,
|
||||||
upload_obj.location, upload_obj.byte_count,
|
upload_obj.location, upload_obj.byte_count,
|
||||||
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'])
|
app.config['PUSH_TEMP_TAG_EXPIRATION_SEC'])
|
||||||
|
@ -278,6 +281,6 @@ def cancel_upload(namespace, repo_name, upload_uuid):
|
||||||
# We delete the record for the upload first, since if the partial upload in
|
# We delete the record for the upload first, since if the partial upload in
|
||||||
# storage fails to delete, it doesn't break anything
|
# storage fails to delete, it doesn't break anything
|
||||||
found.delete_instance()
|
found.delete_instance()
|
||||||
storage.cancel_chunked_upload({found.location.name}, found.uuid)
|
storage.cancel_chunked_upload({found.location.name}, found.uuid, found.storage_metadata)
|
||||||
|
|
||||||
return make_response('', 204)
|
return make_response('', 204)
|
||||||
|
|
|
@ -42,18 +42,9 @@ class StoragePaths(object):
|
||||||
|
|
||||||
|
|
||||||
class BaseStorage(StoragePaths):
|
class BaseStorage(StoragePaths):
|
||||||
"""Storage is organized as follow:
|
def __init__(self):
|
||||||
$ROOT/images/<image_id>/json
|
|
||||||
$ROOT/images/<image_id>/layer
|
|
||||||
$ROOT/repositories/<namespace>/<repository_name>/<tag_name>
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Useful if we want to change those locations later without rewriting
|
|
||||||
# the code which uses Storage
|
|
||||||
repositories = 'repositories'
|
|
||||||
images = 'images'
|
|
||||||
# Set the IO buffer to 64kB
|
# Set the IO buffer to 64kB
|
||||||
buffer_size = 64 * 1024
|
self.buffer_size = 64 * 1024
|
||||||
|
|
||||||
def setup(self):
|
def setup(self):
|
||||||
""" Called to perform any storage system setup. """
|
""" Called to perform any storage system setup. """
|
||||||
|
@ -99,31 +90,55 @@ class BaseStorage(StoragePaths):
|
||||||
def get_checksum(self, path):
|
def get_checksum(self, path):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def stream_write_to_fp(self, in_fp, out_fp, num_bytes=-1):
|
||||||
|
""" Copy the specified number of bytes from the input file stream to the output stream. If
|
||||||
|
num_bytes < 0 copy until the stream ends.
|
||||||
|
"""
|
||||||
|
bytes_copied = 0
|
||||||
|
while bytes_copied < num_bytes or num_bytes == -1:
|
||||||
|
size_to_read = min(num_bytes - bytes_copied, self.buffer_size)
|
||||||
|
if size_to_read < 0:
|
||||||
|
size_to_read = self.buffer_size
|
||||||
|
|
||||||
|
try:
|
||||||
|
buf = in_fp.read(size_to_read)
|
||||||
|
if not buf:
|
||||||
|
break
|
||||||
|
out_fp.write(buf)
|
||||||
|
bytes_copied += len(buf)
|
||||||
|
except IOError:
|
||||||
|
break
|
||||||
|
|
||||||
|
return bytes_copied
|
||||||
|
|
||||||
|
|
||||||
class InvalidChunkException(RuntimeError):
|
class InvalidChunkException(RuntimeError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class BaseStorageV2(BaseStorage):
|
class BaseStorageV2(BaseStorage):
|
||||||
def initiate_chunked_upload(self, upload_uuid):
|
def initiate_chunked_upload(self):
|
||||||
""" Start a new chunked upload
|
""" Start a new chunked upload, returning the uuid and any associated storage metadata
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def stream_upload_chunk(self, uuid, offset, length, in_fp, hash_obj):
|
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata):
|
||||||
""" Upload the specified amount of data from the given file pointer to the chunked destination
|
""" Upload the specified amount of data from the given file pointer to the chunked destination
|
||||||
specified, starting at the given offset. Raises InvalidChunkException if the offset or
|
specified, starting at the given offset. Returns the number of bytes uploaded, and a new
|
||||||
length can not be accepted.
|
version of the storage_metadata. Raises InvalidChunkException if the offset or length can
|
||||||
|
not be accepted.
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def complete_chunked_upload(self, uuid, final_path):
|
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
|
||||||
""" Complete the chunked upload and store the final results in the path indicated.
|
""" Complete the chunked upload and store the final results in the path indicated.
|
||||||
|
Returns nothing.
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def cancel_chunked_upload(self, uuid):
|
def cancel_chunked_upload(self, uuid, storage_metadata):
|
||||||
""" Cancel the chunked upload and clean up any outstanding partially uploaded data.
|
""" Cancel the chunked upload and clean up any outstanding partially uploaded data.
|
||||||
|
Returns nothing.
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
|
@ -3,18 +3,25 @@ import os
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
import boto.s3.connection
|
import boto.s3.connection
|
||||||
|
import boto.s3.multipart
|
||||||
import boto.gs.connection
|
import boto.gs.connection
|
||||||
import boto.s3.key
|
import boto.s3.key
|
||||||
import boto.gs.key
|
import boto.gs.key
|
||||||
|
|
||||||
from io import BufferedIOBase
|
from io import BufferedIOBase
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
from storage.basestorage import BaseStorage
|
from storage.basestorage import BaseStorageV2, InvalidChunkException
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
_MULTIPART_UPLOAD_ID_KEY = 'upload_id'
|
||||||
|
_LAST_PART_KEY = 'last_part_num'
|
||||||
|
_LAST_CHUNK_ENCOUNTERED = 'last_chunk_encountered'
|
||||||
|
|
||||||
|
|
||||||
class StreamReadKeyAsFile(BufferedIOBase):
|
class StreamReadKeyAsFile(BufferedIOBase):
|
||||||
def __init__(self, key):
|
def __init__(self, key):
|
||||||
self._key = key
|
self._key = key
|
||||||
|
@ -37,9 +44,13 @@ class StreamReadKeyAsFile(BufferedIOBase):
|
||||||
self._key.close(fast=True)
|
self._key.close(fast=True)
|
||||||
|
|
||||||
|
|
||||||
class _CloudStorage(BaseStorage):
|
class _CloudStorage(BaseStorageV2):
|
||||||
def __init__(self, connection_class, key_class, connect_kwargs, upload_params, storage_path,
|
def __init__(self, connection_class, key_class, connect_kwargs, upload_params, storage_path,
|
||||||
access_key, secret_key, bucket_name):
|
access_key, secret_key, bucket_name):
|
||||||
|
super(_CloudStorage, self).__init__()
|
||||||
|
|
||||||
|
self.upload_chunk_size = 5 * 1024 * 1024
|
||||||
|
|
||||||
self._initialized = False
|
self._initialized = False
|
||||||
self._bucket_name = bucket_name
|
self._bucket_name = bucket_name
|
||||||
self._access_key = access_key
|
self._access_key = access_key
|
||||||
|
@ -135,12 +146,9 @@ class _CloudStorage(BaseStorage):
|
||||||
raise IOError('No such key: \'{0}\''.format(path))
|
raise IOError('No such key: \'{0}\''.format(path))
|
||||||
return StreamReadKeyAsFile(key)
|
return StreamReadKeyAsFile(key)
|
||||||
|
|
||||||
def stream_write(self, path, fp, content_type=None, content_encoding=None):
|
def __initiate_multipart_upload(self, path, content_type, content_encoding):
|
||||||
# Minimum size of upload part size on S3 is 5MB
|
# Minimum size of upload part size on S3 is 5MB
|
||||||
self._initialize_cloud_conn()
|
self._initialize_cloud_conn()
|
||||||
buffer_size = 5 * 1024 * 1024
|
|
||||||
if self.buffer_size > buffer_size:
|
|
||||||
buffer_size = self.buffer_size
|
|
||||||
path = self._init_path(path)
|
path = self._init_path(path)
|
||||||
|
|
||||||
metadata = {}
|
metadata = {}
|
||||||
|
@ -150,16 +158,20 @@ class _CloudStorage(BaseStorage):
|
||||||
if content_encoding is not None:
|
if content_encoding is not None:
|
||||||
metadata['Content-Encoding'] = content_encoding
|
metadata['Content-Encoding'] = content_encoding
|
||||||
|
|
||||||
mp = self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
|
return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
|
||||||
**self._upload_params)
|
**self._upload_params)
|
||||||
|
|
||||||
|
def stream_write(self, path, fp, content_type=None, content_encoding=None):
|
||||||
|
mp = self.__initiate_multipart_upload(path, content_type, content_encoding)
|
||||||
num_part = 1
|
num_part = 1
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
buf = fp.read(buffer_size)
|
buf = StringIO.StringIO()
|
||||||
if not buf:
|
bytes_written = self.stream_write_to_fp(fp, buf, self.upload_chunk_size)
|
||||||
|
if bytes_written == 0:
|
||||||
break
|
break
|
||||||
io = StringIO.StringIO(buf)
|
|
||||||
mp.upload_part_from_file(io, num_part)
|
mp.upload_part_from_file(buf, num_part)
|
||||||
num_part += 1
|
num_part += 1
|
||||||
io.close()
|
io.close()
|
||||||
except IOError:
|
except IOError:
|
||||||
|
@ -217,6 +229,57 @@ class _CloudStorage(BaseStorage):
|
||||||
|
|
||||||
return k.etag[1:-1][:7]
|
return k.etag[1:-1][:7]
|
||||||
|
|
||||||
|
def _rel_upload_path(self, uuid):
|
||||||
|
return 'uploads/{0}'.format(uuid)
|
||||||
|
|
||||||
|
def initiate_chunked_upload(self):
|
||||||
|
self._initialize_cloud_conn()
|
||||||
|
random_uuid = str(uuid4())
|
||||||
|
path = self._init_path(self._rel_upload_path(random_uuid))
|
||||||
|
mpu = self.__initiate_multipart_upload(path, content_type=None, content_encoding=None)
|
||||||
|
|
||||||
|
metadata = {
|
||||||
|
_MULTIPART_UPLOAD_ID_KEY: mpu.id,
|
||||||
|
_LAST_PART_KEY: 0,
|
||||||
|
_LAST_CHUNK_ENCOUNTERED: False,
|
||||||
|
}
|
||||||
|
|
||||||
|
return mpu.id, metadata
|
||||||
|
|
||||||
|
def _get_multipart_upload_key(self, uuid, storage_metadata):
|
||||||
|
mpu = boto.s3.multipart.MultiPartUpload(self._cloud_bucket)
|
||||||
|
mpu.id = storage_metadata[_MULTIPART_UPLOAD_ID_KEY]
|
||||||
|
mpu.key = self._init_path(self._rel_upload_path(uuid))
|
||||||
|
return mpu
|
||||||
|
|
||||||
|
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata):
|
||||||
|
self._initialize_cloud_conn()
|
||||||
|
mpu = self._get_multipart_upload_key(uuid, storage_metadata)
|
||||||
|
last_part_num = storage_metadata[_LAST_PART_KEY]
|
||||||
|
|
||||||
|
if storage_metadata[_LAST_CHUNK_ENCOUNTERED] and length != 0:
|
||||||
|
msg = 'Length must be at least the the upload chunk size: %s' % self.upload_chunk_size
|
||||||
|
raise InvalidChunkException(msg)
|
||||||
|
|
||||||
|
part_num = last_part_num + 1
|
||||||
|
mpu.upload_part_from_file(in_fp, part_num, length)
|
||||||
|
|
||||||
|
new_metadata = {
|
||||||
|
_MULTIPART_UPLOAD_ID_KEY: mpu.id,
|
||||||
|
_LAST_PART_KEY: part_num,
|
||||||
|
_LAST_CHUNK_ENCOUNTERED: True,
|
||||||
|
}
|
||||||
|
|
||||||
|
return length, new_metadata
|
||||||
|
|
||||||
|
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
|
||||||
|
mpu = self._get_multipart_upload_key(uuid, storage_metadata)
|
||||||
|
mpu.complete_upload()
|
||||||
|
|
||||||
|
def cancel_chunked_upload(self, uuid, storage_metadata):
|
||||||
|
mpu = self._get_multipart_upload_key(uuid, storage_metadata)
|
||||||
|
mpu.cancel_multipart_upload()
|
||||||
|
|
||||||
|
|
||||||
class S3Storage(_CloudStorage):
|
class S3Storage(_CloudStorage):
|
||||||
def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket):
|
def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket):
|
||||||
|
|
|
@ -1,8 +1,14 @@
|
||||||
from storage.basestorage import BaseStorage
|
import cStringIO as StringIO
|
||||||
|
import hashlib
|
||||||
|
|
||||||
_FAKE_STORAGE_MAP = {}
|
from collections import defaultdict
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
class FakeStorage(BaseStorage):
|
from storage.basestorage import BaseStorageV2
|
||||||
|
|
||||||
|
_FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO)
|
||||||
|
|
||||||
|
class FakeStorage(BaseStorageV2):
|
||||||
def _init_path(self, path=None, create=False):
|
def _init_path(self, path=None, create=False):
|
||||||
return path
|
return path
|
||||||
|
|
||||||
|
@ -10,16 +16,26 @@ class FakeStorage(BaseStorage):
|
||||||
if not path in _FAKE_STORAGE_MAP:
|
if not path in _FAKE_STORAGE_MAP:
|
||||||
raise IOError('Fake file %s not found' % path)
|
raise IOError('Fake file %s not found' % path)
|
||||||
|
|
||||||
return _FAKE_STORAGE_MAP.get(path)
|
_FAKE_STORAGE_MAP.get(path).seek(0)
|
||||||
|
return _FAKE_STORAGE_MAP.get(path).read()
|
||||||
|
|
||||||
def put_content(self, path, content):
|
def put_content(self, path, content):
|
||||||
_FAKE_STORAGE_MAP[path] = content
|
_FAKE_STORAGE_MAP.pop(path, None)
|
||||||
|
_FAKE_STORAGE_MAP[path].write(content)
|
||||||
|
|
||||||
def stream_read(self, path):
|
def stream_read(self, path):
|
||||||
yield _FAKE_STORAGE_MAP[path]
|
io_obj = _FAKE_STORAGE_MAP[path]
|
||||||
|
io_obj.seek(0)
|
||||||
|
while True:
|
||||||
|
buf = io_obj.read(self.buffer_size)
|
||||||
|
if not buf:
|
||||||
|
break
|
||||||
|
yield buf
|
||||||
|
|
||||||
def stream_write(self, path, fp, content_type=None, content_encoding=None):
|
def stream_write(self, path, fp, content_type=None, content_encoding=None):
|
||||||
_FAKE_STORAGE_MAP[path] = fp.read()
|
out_fp = _FAKE_STORAGE_MAP[path]
|
||||||
|
out_fp.seek(0)
|
||||||
|
self.stream_write_to_fp(fp, out_fp)
|
||||||
|
|
||||||
def remove(self, path):
|
def remove(self, path):
|
||||||
_FAKE_STORAGE_MAP.pop(path, None)
|
_FAKE_STORAGE_MAP.pop(path, None)
|
||||||
|
@ -28,4 +44,21 @@ class FakeStorage(BaseStorage):
|
||||||
return path in _FAKE_STORAGE_MAP
|
return path in _FAKE_STORAGE_MAP
|
||||||
|
|
||||||
def get_checksum(self, path):
|
def get_checksum(self, path):
|
||||||
return path
|
return hashlib.sha256(_FAKE_STORAGE_MAP[path].read()).hexdigest()[:7]
|
||||||
|
|
||||||
|
def initiate_chunked_upload(self):
|
||||||
|
new_uuid = str(uuid4())
|
||||||
|
_FAKE_STORAGE_MAP[new_uuid].seek(0)
|
||||||
|
return new_uuid, {}
|
||||||
|
|
||||||
|
def stream_upload_chunk(self, uuid, offset, length, in_fp, _):
|
||||||
|
upload_storage = _FAKE_STORAGE_MAP[uuid]
|
||||||
|
upload_storage.seek(offset)
|
||||||
|
return self.stream_write_to_fp(in_fp, upload_storage, length)
|
||||||
|
|
||||||
|
def complete_chunked_upload(self, uuid, final_path, _):
|
||||||
|
_FAKE_STORAGE_MAP[final_path] = _FAKE_STORAGE_MAP[uuid]
|
||||||
|
_FAKE_STORAGE_MAP.pop(uuid, None)
|
||||||
|
|
||||||
|
def cancel_chunked_upload(self, uuid, _):
|
||||||
|
_FAKE_STORAGE_MAP.pop(uuid, None)
|
||||||
|
|
|
@ -14,8 +14,8 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class LocalStorage(BaseStorageV2):
|
class LocalStorage(BaseStorageV2):
|
||||||
|
|
||||||
def __init__(self, storage_path):
|
def __init__(self, storage_path):
|
||||||
|
super(LocalStorage, self).__init__()
|
||||||
self._root_path = storage_path
|
self._root_path = storage_path
|
||||||
|
|
||||||
def _init_path(self, path=None, create=False):
|
def _init_path(self, path=None, create=False):
|
||||||
|
@ -54,28 +54,7 @@ class LocalStorage(BaseStorageV2):
|
||||||
# Size is mandatory
|
# Size is mandatory
|
||||||
path = self._init_path(path, create=True)
|
path = self._init_path(path, create=True)
|
||||||
with open(path, mode='wb') as out_fp:
|
with open(path, mode='wb') as out_fp:
|
||||||
self._stream_write_to_fp(fp, out_fp)
|
self.stream_write_to_fp(fp, out_fp)
|
||||||
|
|
||||||
def _stream_write_to_fp(self, in_fp, out_fp, num_bytes=-1):
|
|
||||||
""" Copy the specified number of bytes from the input file stream to the output stream. If
|
|
||||||
num_bytes < 0 copy until the stream ends.
|
|
||||||
"""
|
|
||||||
bytes_copied = 0
|
|
||||||
while bytes_copied < num_bytes or num_bytes == -1:
|
|
||||||
size_to_read = min(num_bytes - bytes_copied, self.buffer_size)
|
|
||||||
if size_to_read < 0:
|
|
||||||
size_to_read = self.buffer_size
|
|
||||||
|
|
||||||
try:
|
|
||||||
buf = in_fp.read(size_to_read)
|
|
||||||
if not buf:
|
|
||||||
break
|
|
||||||
out_fp.write(buf)
|
|
||||||
bytes_copied += len(buf)
|
|
||||||
except IOError:
|
|
||||||
break
|
|
||||||
|
|
||||||
return bytes_copied
|
|
||||||
|
|
||||||
def list_directory(self, path=None):
|
def list_directory(self, path=None):
|
||||||
path = self._init_path(path)
|
path = self._init_path(path)
|
||||||
|
@ -124,14 +103,14 @@ class LocalStorage(BaseStorageV2):
|
||||||
with open(self._init_path(self._rel_upload_path(new_uuid), create=True), 'w'):
|
with open(self._init_path(self._rel_upload_path(new_uuid), create=True), 'w'):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
return new_uuid
|
return new_uuid, {}
|
||||||
|
|
||||||
def stream_upload_chunk(self, uuid, offset, length, in_fp):
|
def stream_upload_chunk(self, uuid, offset, length, in_fp, _):
|
||||||
with open(self._init_path(self._rel_upload_path(uuid)), 'r+b') as upload_storage:
|
with open(self._init_path(self._rel_upload_path(uuid)), 'r+b') as upload_storage:
|
||||||
upload_storage.seek(offset)
|
upload_storage.seek(offset)
|
||||||
return self._stream_write_to_fp(in_fp, upload_storage, length)
|
return self.stream_write_to_fp(in_fp, upload_storage, length), {}
|
||||||
|
|
||||||
def complete_chunked_upload(self, uuid, final_path):
|
def complete_chunked_upload(self, uuid, final_path, _):
|
||||||
content_path = self._rel_upload_path(uuid)
|
content_path = self._rel_upload_path(uuid)
|
||||||
final_path_abs = self._init_path(final_path, create=True)
|
final_path_abs = self._init_path(final_path, create=True)
|
||||||
if not self.exists(final_path_abs):
|
if not self.exists(final_path_abs):
|
||||||
|
@ -140,7 +119,7 @@ class LocalStorage(BaseStorageV2):
|
||||||
else:
|
else:
|
||||||
logger.debug('Content already exists at path: %s', final_path_abs)
|
logger.debug('Content already exists at path: %s', final_path_abs)
|
||||||
|
|
||||||
def cancel_chunked_upload(self, uuid):
|
def cancel_chunked_upload(self, uuid, _):
|
||||||
content_path = self._init_path(self._rel_upload_path(uuid))
|
content_path = self._init_path(self._rel_upload_path(uuid))
|
||||||
os.remove(content_path)
|
os.remove(content_path)
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,8 @@ logger = logging.getLogger(__name__)
|
||||||
class SwiftStorage(BaseStorage):
|
class SwiftStorage(BaseStorage):
|
||||||
def __init__(self, swift_container, storage_path, auth_url, swift_user,
|
def __init__(self, swift_container, storage_path, auth_url, swift_user,
|
||||||
swift_password, auth_version=None, os_options=None, ca_cert_path=None):
|
swift_password, auth_version=None, os_options=None, ca_cert_path=None):
|
||||||
|
super(SwiftStorage, self).__init__()
|
||||||
|
|
||||||
self._swift_container = swift_container
|
self._swift_container = swift_container
|
||||||
self._storage_path = storage_path
|
self._storage_path = storage_path
|
||||||
|
|
||||||
|
|
Binary file not shown.
Reference in a new issue