Add a new RadosGW storage engine. Allow engines to distinguish not only between those that can support direct uploads and downloads, but those that support doing it through the browser. Rename resumeable->resumable.

This commit is contained in:
Jake Moshenko 2014-09-09 15:54:03 -04:00
parent dd4037e324
commit 29d40db5ea
12 changed files with 147 additions and 176 deletions

2
app.py
View file

@ -88,7 +88,7 @@ Principal(app, use_sessions=False)
login_manager = LoginManager(app) login_manager = LoginManager(app)
mail = Mail(app) mail = Mail(app)
storage = Storage(app) storage = Storage(app)
userfiles = Userfiles(app) userfiles = Userfiles(app, storage)
analytics = Analytics(app) analytics = Analytics(app)
billing = Billing(app) billing = Billing(app)
sentry = Sentry(app) sentry = Sentry(app)

View file

@ -1,110 +1,31 @@
import boto
import os import os
import logging import logging
import hashlib
import magic import magic
from boto.s3.key import Key
from uuid import uuid4 from uuid import uuid4
from flask import url_for, request, send_file, make_response, abort from flask import url_for, request, send_file, make_response, abort
from flask.views import View from flask.views import View
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class FakeUserfiles(object):
def prepare_for_drop(self, mime_type):
return ('http://fake/url', uuid4())
def store_file(self, file_like_obj, content_type):
raise NotImplementedError()
def get_file_url(self, file_id, expires_in=300):
return ('http://fake/url')
def get_file_checksum(self, file_id):
return 'abcdefg'
class S3FileWriteException(Exception):
pass
class S3Userfiles(object):
def __init__(self, path, s3_access_key, s3_secret_key, bucket_name):
self._initialized = False
self._bucket_name = bucket_name
self._access_key = s3_access_key
self._secret_key = s3_secret_key
self._prefix = path
self._s3_conn = None
self._bucket = None
def _initialize_s3(self):
if not self._initialized:
self._s3_conn = boto.connect_s3(self._access_key, self._secret_key)
self._bucket = self._s3_conn.get_bucket(self._bucket_name)
self._initialized = True
def prepare_for_drop(self, mime_type):
""" Returns a signed URL to upload a file to our bucket. """
self._initialize_s3()
logger.debug('Requested upload url with content type: %s' % mime_type)
file_id = str(uuid4())
full_key = os.path.join(self._prefix, file_id)
k = Key(self._bucket, full_key)
url = k.generate_url(300, 'PUT', headers={'Content-Type': mime_type},
encrypt_key=True)
return (url, file_id)
def store_file(self, file_like_obj, content_type):
self._initialize_s3()
file_id = str(uuid4())
full_key = os.path.join(self._prefix, file_id)
k = Key(self._bucket, full_key)
logger.debug('Setting s3 content type to: %s' % content_type)
k.set_metadata('Content-Type', content_type)
bytes_written = k.set_contents_from_file(file_like_obj, encrypt_key=True,
rewind=True)
if bytes_written == 0:
raise S3FileWriteException('Unable to write file to S3')
return file_id
def get_file_url(self, file_id, expires_in=300, mime_type=None):
self._initialize_s3()
full_key = os.path.join(self._prefix, file_id)
k = Key(self._bucket, full_key)
headers = None
if mime_type:
headers={'Content-Type': mime_type}
return k.generate_url(expires_in, headers=headers)
def get_file_checksum(self, file_id):
self._initialize_s3()
full_key = os.path.join(self._prefix, file_id)
k = self._bucket.lookup(full_key)
return k.etag[1:-1][:7]
class UserfilesHandlers(View): class UserfilesHandlers(View):
methods = ['GET', 'PUT'] methods = ['GET', 'PUT']
def __init__(self, local_userfiles): def __init__(self, distributed_storage, location, files):
self._userfiles = local_userfiles self._storage = distributed_storage
self._files = files
self._locations = {location}
self._magic = magic.Magic(mime=True) self._magic = magic.Magic(mime=True)
def get(self, file_id): def get(self, file_id):
path = self._userfiles.file_path(file_id) path = self._files.get_file_id_path(file_id)
if not os.path.exists(path): try:
file_stream = self._storage.stream_read_file(self._locations, path)
return send_file(file_stream)
except IOError:
abort(404) abort(404)
logger.debug('Sending path: %s' % path)
return send_file(path, mimetype=self._magic.from_file(path))
def put(self, file_id): def put(self, file_id):
input_stream = request.stream input_stream = request.stream
if request.headers.get('transfer-encoding') == 'chunked': if request.headers.get('transfer-encoding') == 'chunked':
@ -112,7 +33,8 @@ class UserfilesHandlers(View):
# encoding (Gunicorn) # encoding (Gunicorn)
input_stream = request.environ['wsgi.input'] input_stream = request.environ['wsgi.input']
self._userfiles.store_stream(input_stream, file_id) path = self._files.get_file_id_path(file_id)
self._storage.stream_write(self._locations, path, input_stream)
return make_response('Okay') return make_response('Okay')
@ -123,99 +45,79 @@ class UserfilesHandlers(View):
return self.put(file_id) return self.put(file_id)
class LocalUserfiles(object): class DelegateUserfiles(object):
def __init__(self, app, path): def __init__(self, app, distributed_storage, location, path, handler_name):
self._root_path = path
self._buffer_size = 64 * 1024 # 64 KB
self._app = app self._app = app
self._storage = distributed_storage
self._locations = {location}
self._prefix = path
self._handler_name = handler_name
def _build_url_adapter(self): def _build_url_adapter(self):
return self._app.url_map.bind(self._app.config['SERVER_HOSTNAME'], return self._app.url_map.bind(self._app.config['SERVER_HOSTNAME'],
script_name=self._app.config['APPLICATION_ROOT'] or '/', script_name=self._app.config['APPLICATION_ROOT'] or '/',
url_scheme=self._app.config['PREFERRED_URL_SCHEME']) url_scheme=self._app.config['PREFERRED_URL_SCHEME'])
def prepare_for_drop(self, mime_type): def get_file_id_path(self, file_id):
return os.path.join(self._prefix, file_id)
def prepare_for_drop(self, mime_type, requires_cors=True):
""" Returns a signed URL to upload a file to our bucket. """
logger.debug('Requested upload url with content type: %s' % mime_type)
file_id = str(uuid4()) file_id = str(uuid4())
with self._app.app_context() as ctx: path = self.get_file_id_path(file_id)
ctx.url_adapter = self._build_url_adapter() url = self._storage.get_direct_upload_url(self._locations, path, mime_type, requires_cors)
return (url_for('userfiles_handlers', file_id=file_id, _external=True), file_id)
def file_path(self, file_id): if url is None:
if '..' in file_id or file_id.startswith('/'): with self._app.app_context() as ctx:
raise RuntimeError('Invalid Filename') ctx.url_adapter = self._build_url_adapter()
return os.path.join(self._root_path, file_id) return (url_for(self._handler_name, file_id=file_id, _external=True), file_id)
def store_stream(self, stream, file_id): return (url, file_id)
path = self.file_path(file_id)
dirname = os.path.dirname(path)
if not os.path.exists(dirname):
os.makedirs(dirname)
with open(path, 'w') as to_write:
while True:
try:
buf = stream.read(self._buffer_size)
if not buf:
break
to_write.write(buf)
except IOError:
break
def store_file(self, file_like_obj, content_type): def store_file(self, file_like_obj, content_type):
file_id = str(uuid4()) file_id = str(uuid4())
path = self.get_file_id_path(file_id)
# Rewind the file to match what s3 does self._storage.stream_write(self._locations, path, file_like_obj)
file_like_obj.seek(0, os.SEEK_SET)
self.store_stream(file_like_obj, file_id)
return file_id return file_id
def get_file_url(self, file_id, expires_in=300): def get_file_url(self, file_id, expires_in=300, requires_cors=False):
with self._app.app_context() as ctx: path = self.get_file_id_path(file_id)
ctx.url_adapter = self._build_url_adapter() url = self._storage.get_direct_download_url(self._locations, path, expires_in, requires_cors)
return url_for('userfiles_handlers', file_id=file_id, _external=True)
if url is None:
with self._app.app_context() as ctx:
ctx.url_adapter = self._build_url_adapter()
return url_for(self._handler_name, file_id=file_id, _external=True)
return url
def get_file_checksum(self, file_id): def get_file_checksum(self, file_id):
path = self.file_path(file_id) path = self.get_file_id_path(file_id)
sha_hash = hashlib.sha256() return self._storage.get_checksum(self._locations, path)
with open(path, 'r') as to_hash:
while True:
buf = to_hash.read(self._buffer_size)
if not buf:
break
sha_hash.update(buf)
return sha_hash.hexdigest()[:7]
class Userfiles(object): class Userfiles(object):
def __init__(self, app=None): def __init__(self, app=None, distributed_storage=None):
self.app = app self.app = app
if app is not None: if app is not None:
self.state = self.init_app(app) self.state = self.init_app(app, distributed_storage)
else: else:
self.state = None self.state = None
def init_app(self, app): def init_app(self, app, distributed_storage):
storage_type = app.config.get('USERFILES_TYPE', 'LocalUserfiles') location = app.config.get('USERFILES_LOCATION')
path = app.config.get('USERFILES_PATH', '') path = app.config.get('USERFILES_PATH', None)
if storage_type == 'LocalUserfiles': handler_name = 'userfiles_handlers'
userfiles = LocalUserfiles(app, path)
app.add_url_rule('/userfiles/<file_id>',
view_func=UserfilesHandlers.as_view('userfiles_handlers',
local_userfiles=userfiles))
elif storage_type == 'S3Userfiles': userfiles = DelegateUserfiles(app, distributed_storage, location, path, handler_name)
access_key = app.config.get('USERFILES_AWS_ACCESS_KEY', '')
secret_key = app.config.get('USERFILES_AWS_SECRET_KEY', '')
bucket = app.config.get('USERFILES_S3_BUCKET', '')
userfiles = S3Userfiles(path, access_key, secret_key, bucket)
elif storage_type == 'FakeUserfiles': app.add_url_rule('/userfiles/<file_id>',
userfiles = FakeUserfiles() view_func=UserfilesHandlers.as_view(handler_name,
distributed_storage=distributed_storage,
else: location=location,
raise RuntimeError('Unknown userfiles type: %s' % storage_type) files=userfiles))
# register extension with app # register extension with app
app.extensions = getattr(app, 'extensions', {}) app.extensions = getattr(app, 'extensions', {})

View file

@ -80,7 +80,7 @@ def build_status_view(build_obj, can_write=False):
} }
if can_write: if can_write:
resp['archive_url'] = user_files.get_file_url(build_obj.resource_key) resp['archive_url'] = user_files.get_file_url(build_obj.resource_key, requires_cors=True)
return resp return resp
@ -257,7 +257,7 @@ class FileDropResource(ApiResource):
def post(self): def post(self):
""" Request a URL to which a file may be uploaded. """ """ Request a URL to which a file may be uploaded. """
mime_type = request.get_json()['mimeType'] mime_type = request.get_json()['mimeType']
(url, file_id) = user_files.prepare_for_drop(mime_type) (url, file_id) = user_files.prepare_for_drop(mime_type, requires_cors=True)
return { return {
'url': url, 'url': url,
'file_id': str(file_id), 'file_id': str(file_id),

View file

@ -110,10 +110,10 @@ def head_image_layer(namespace, repository, image_id, headers):
extra_headers = {} extra_headers = {}
# Add the Accept-Ranges header if the storage engine supports resumeable # Add the Accept-Ranges header if the storage engine supports resumable
# downloads. # downloads.
if store.get_supports_resumeable_downloads(repo_image.storage.locations): if store.get_supports_resumable_downloads(repo_image.storage.locations):
profile.debug('Storage supports resumeable downloads') profile.debug('Storage supports resumable downloads')
extra_headers['Accept-Ranges'] = 'bytes' extra_headers['Accept-Ranges'] = 'bytes'
resp = make_response('') resp = make_response('')

View file

@ -1,5 +1,5 @@
from storage.local import LocalStorage from storage.local import LocalStorage
from storage.cloud import S3Storage, GoogleCloudStorage from storage.cloud import S3Storage, GoogleCloudStorage, RadosGWStorage
from storage.fakestorage import FakeStorage from storage.fakestorage import FakeStorage
from storage.distributedstorage import DistributedStorage from storage.distributedstorage import DistributedStorage
@ -8,6 +8,7 @@ STORAGE_DRIVER_CLASSES = {
'LocalStorage': LocalStorage, 'LocalStorage': LocalStorage,
'S3Storage': S3Storage, 'S3Storage': S3Storage,
'GoogleCloudStorage': GoogleCloudStorage, 'GoogleCloudStorage': GoogleCloudStorage,
'RadosGWStorage': RadosGWStorage,
} }

View file

@ -54,10 +54,13 @@ class BaseStorage(StoragePaths):
# Set the IO buffer to 64kB # Set the IO buffer to 64kB
buffer_size = 64 * 1024 buffer_size = 64 * 1024
def get_direct_download_url(self, path, expires_in=60): def get_direct_download_url(self, path, expires_in=60, requires_cors=False):
return None return None
def get_supports_resumeable_downloads(self): def get_direct_upload_url(self, path, mime_type, requires_cors=True):
return None
def get_supports_resumable_downloads(self):
return False return False
def get_content(self, path): def get_content(self, path):
@ -83,3 +86,6 @@ class BaseStorage(StoragePaths):
def remove(self, path): def remove(self, path):
raise NotImplementedError raise NotImplementedError
def get_checksum(self, path):
raise NotImplementedError

View file

@ -35,8 +35,8 @@ class StreamReadKeyAsFile(object):
class _CloudStorage(BaseStorage): class _CloudStorage(BaseStorage):
def __init__(self, connection_class, key_class, upload_params, storage_path, access_key, def __init__(self, connection_class, key_class, connect_kwargs, upload_params, storage_path,
secret_key, bucket_name): access_key, secret_key, bucket_name):
self._initialized = False self._initialized = False
self._bucket_name = bucket_name self._bucket_name = bucket_name
self._access_key = access_key self._access_key = access_key
@ -45,12 +45,14 @@ class _CloudStorage(BaseStorage):
self._connection_class = connection_class self._connection_class = connection_class
self._key_class = key_class self._key_class = key_class
self._upload_params = upload_params self._upload_params = upload_params
self._connect_kwargs = connect_kwargs
self._cloud_conn = None self._cloud_conn = None
self._cloud_bucket = None self._cloud_bucket = None
def _initialize_cloud_conn(self): def _initialize_cloud_conn(self):
if not self._initialized: if not self._initialized:
self._cloud_conn = self._connection_class(self._access_key, self._secret_key) self._cloud_conn = self._connection_class(self._access_key, self._secret_key,
**self._connect_kwargs)
self._cloud_bucket = self._cloud_conn.get_bucket(self._bucket_name) self._cloud_bucket = self._cloud_conn.get_bucket(self._bucket_name)
self._initialized = True self._initialized = True
@ -87,15 +89,22 @@ class _CloudStorage(BaseStorage):
key.set_contents_from_string(content, **self._upload_params) key.set_contents_from_string(content, **self._upload_params)
return path return path
def get_supports_resumeable_downloads(self): def get_supports_resumable_downloads(self):
return True return True
def get_direct_download_url(self, path, expires_in=60): def get_direct_download_url(self, path, expires_in=60, requires_cors=False):
self._initialize_cloud_conn() self._initialize_cloud_conn()
path = self._init_path(path) path = self._init_path(path)
k = self._key_class(self._cloud_bucket, path) k = self._key_class(self._cloud_bucket, path)
return k.generate_url(expires_in) return k.generate_url(expires_in)
def get_direct_upload_url(self, path, mime_type, requires_cors=True):
self._initialize_cloud_conn()
path = self._init_path(path)
key = self._key_class(self._cloud_bucket, path)
url = key.generate_url(300, 'PUT', headers={'Content-Type': mime_type}, encrypt_key=True)
return url
def stream_read(self, path): def stream_read(self, path):
self._initialize_cloud_conn() self._initialize_cloud_conn()
path = self._init_path(path) path = self._init_path(path)
@ -179,21 +188,32 @@ class _CloudStorage(BaseStorage):
for key in self._cloud_bucket.list(prefix=path): for key in self._cloud_bucket.list(prefix=path):
key.delete() key.delete()
def get_checksum(self, path):
self._initialize_cloud_conn()
path = self._init_path(path)
key = self._key_class(self._cloud_bucket, path)
k = self._cloud_bucket.lookup(key)
return k.etag[1:-1][:7]
class S3Storage(_CloudStorage): class S3Storage(_CloudStorage):
def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket): def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket):
upload_params = { upload_params = {
'encrypt_key': True, 'encrypt_key': True,
} }
connect_kwargs = {}
super(S3Storage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key, super(S3Storage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key,
upload_params, storage_path, s3_access_key, s3_secret_key, connect_kwargs, upload_params, storage_path, s3_access_key,
s3_bucket) s3_secret_key, s3_bucket)
class GoogleCloudStorage(_CloudStorage): class GoogleCloudStorage(_CloudStorage):
def __init__(self, storage_path, access_key, secret_key, bucket_name): def __init__(self, storage_path, access_key, secret_key, bucket_name):
super(GoogleCloudStorage, self).__init__(boto.gs.connection.GSConnection, boto.gs.key.Key, {}, upload_params = {}
storage_path, access_key, secret_key, bucket_name) connect_kwargs = {}
super(GoogleCloudStorage, self).__init__(boto.gs.connection.GSConnection, boto.gs.key.Key,
connect_kwargs, upload_params, storage_path,
access_key, secret_key, bucket_name)
def stream_write(self, path, fp): def stream_write(self, path, fp):
# Minimum size of upload part size on S3 is 5MB # Minimum size of upload part size on S3 is 5MB
@ -201,3 +221,30 @@ class GoogleCloudStorage(_CloudStorage):
path = self._init_path(path) path = self._init_path(path)
key = self._key_class(self._cloud_bucket, path) key = self._key_class(self._cloud_bucket, path)
key.set_contents_from_stream(fp) key.set_contents_from_stream(fp)
class RadosGWStorage(_CloudStorage):
def __init__(self, hostname, is_secure, storage_path, access_key, secret_key, bucket_name):
upload_params = {}
connect_kwargs = {
'host': hostname,
'is_secure': is_secure,
'calling_format': boto.s3.connection.OrdinaryCallingFormat(),
}
super(RadosGWStorage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key,
connect_kwargs, upload_params, storage_path, access_key,
secret_key, bucket_name)
# TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624
def get_direct_download_url(self, path, expires_in=60, requires_cors=False):
if requires_cors:
return None
return super(RadosGWStorage, self).get_direct_download_url(path, expires_in, requires_cors)
# TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624
def get_direct_upload_url(self, path, mime_type, requires_cors=True):
if requires_cors:
return None
return super(RadosGWStorage, self).get_direct_upload_url(path, mime_type, requires_cors)

View file

@ -31,6 +31,7 @@ class DistributedStorage(StoragePaths):
self.preferred_locations = list(preferred_locations) self.preferred_locations = list(preferred_locations)
get_direct_download_url = _location_aware(BaseStorage.get_direct_download_url) get_direct_download_url = _location_aware(BaseStorage.get_direct_download_url)
get_direct_upload_url = _location_aware(BaseStorage.get_direct_upload_url)
get_content = _location_aware(BaseStorage.get_content) get_content = _location_aware(BaseStorage.get_content)
put_content = _location_aware(BaseStorage.put_content) put_content = _location_aware(BaseStorage.put_content)
stream_read = _location_aware(BaseStorage.stream_read) stream_read = _location_aware(BaseStorage.stream_read)
@ -39,4 +40,5 @@ class DistributedStorage(StoragePaths):
list_directory = _location_aware(BaseStorage.list_directory) list_directory = _location_aware(BaseStorage.list_directory)
exists = _location_aware(BaseStorage.exists) exists = _location_aware(BaseStorage.exists)
remove = _location_aware(BaseStorage.remove) remove = _location_aware(BaseStorage.remove)
get_supports_resumeable_downloads = _location_aware(BaseStorage.get_supports_resumeable_downloads) get_checksum = _location_aware(BaseStorage.get_checksum)
get_supports_resumable_downloads = _location_aware(BaseStorage.get_supports_resumable_downloads)

View file

@ -22,3 +22,6 @@ class FakeStorage(BaseStorage):
def exists(self, path): def exists(self, path):
return False return False
def get_checksum(self, path):
return 'abcdefg'

View file

@ -1,4 +1,3 @@
import os import os
import shutil import shutil
@ -80,3 +79,14 @@ class LocalStorage(BaseStorage):
os.remove(path) os.remove(path)
except OSError: except OSError:
pass pass
def get_checksum(self, path):
path = self._init_path(path)
sha_hash = hashlib.sha256()
with open(path, 'r') as to_hash:
while True:
buf = to_hash.read(self.buffer_size)
if not buf:
break
sha_hash.update(buf)
return sha_hash.hexdigest()[:7]

View file

@ -30,7 +30,7 @@ class TestConfig(DefaultConfig):
BUILDLOGS_MODULE_AND_CLASS = ('test.testlogs', 'testlogs.TestBuildLogs') BUILDLOGS_MODULE_AND_CLASS = ('test.testlogs', 'testlogs.TestBuildLogs')
BUILDLOGS_OPTIONS = ['devtable', 'building', 'deadbeef-dead-beef-dead-beefdeadbeef', False] BUILDLOGS_OPTIONS = ['devtable', 'building', 'deadbeef-dead-beef-dead-beefdeadbeef', False]
USERFILES_TYPE = 'FakeUserfiles' USERFILES_LOCATION = 'local_us'
FEATURE_SUPER_USERS = True FEATURE_SUPER_USERS = True
FEATURE_BILLING = True FEATURE_BILLING = True

View file

@ -495,7 +495,7 @@ class DockerfileBuildWorker(Worker):
job_config = json.loads(repository_build.job_config) job_config = json.loads(repository_build.job_config)
resource_url = user_files.get_file_url(repository_build.resource_key) resource_url = user_files.get_file_url(repository_build.resource_key, requires_cors=False)
tag_names = job_config['docker_tags'] tag_names = job_config['docker_tags']
build_subdir = job_config['build_subdir'] build_subdir = job_config['build_subdir']
repo = job_config['repository'] repo = job_config['repository']