diff --git a/app.py b/app.py index 81c59a30c..bcc4e86d7 100644 --- a/app.py +++ b/app.py @@ -88,7 +88,7 @@ Principal(app, use_sessions=False) login_manager = LoginManager(app) mail = Mail(app) storage = Storage(app) -userfiles = Userfiles(app) +userfiles = Userfiles(app, storage) analytics = Analytics(app) billing = Billing(app) sentry = Sentry(app) diff --git a/data/userfiles.py b/data/userfiles.py index 79fbcb507..e6d21c1c1 100644 --- a/data/userfiles.py +++ b/data/userfiles.py @@ -1,110 +1,31 @@ -import boto import os import logging -import hashlib import magic -from boto.s3.key import Key from uuid import uuid4 from flask import url_for, request, send_file, make_response, abort from flask.views import View - logger = logging.getLogger(__name__) -class FakeUserfiles(object): - def prepare_for_drop(self, mime_type): - return ('http://fake/url', uuid4()) - - def store_file(self, file_like_obj, content_type): - raise NotImplementedError() - - def get_file_url(self, file_id, expires_in=300): - return ('http://fake/url') - - def get_file_checksum(self, file_id): - return 'abcdefg' - - -class S3FileWriteException(Exception): - pass - - -class S3Userfiles(object): - def __init__(self, path, s3_access_key, s3_secret_key, bucket_name): - self._initialized = False - self._bucket_name = bucket_name - self._access_key = s3_access_key - self._secret_key = s3_secret_key - self._prefix = path - self._s3_conn = None - self._bucket = None - - def _initialize_s3(self): - if not self._initialized: - self._s3_conn = boto.connect_s3(self._access_key, self._secret_key) - self._bucket = self._s3_conn.get_bucket(self._bucket_name) - self._initialized = True - - def prepare_for_drop(self, mime_type): - """ Returns a signed URL to upload a file to our bucket. """ - self._initialize_s3() - logger.debug('Requested upload url with content type: %s' % mime_type) - file_id = str(uuid4()) - full_key = os.path.join(self._prefix, file_id) - k = Key(self._bucket, full_key) - url = k.generate_url(300, 'PUT', headers={'Content-Type': mime_type}, - encrypt_key=True) - return (url, file_id) - - def store_file(self, file_like_obj, content_type): - self._initialize_s3() - file_id = str(uuid4()) - full_key = os.path.join(self._prefix, file_id) - k = Key(self._bucket, full_key) - logger.debug('Setting s3 content type to: %s' % content_type) - k.set_metadata('Content-Type', content_type) - bytes_written = k.set_contents_from_file(file_like_obj, encrypt_key=True, - rewind=True) - - if bytes_written == 0: - raise S3FileWriteException('Unable to write file to S3') - - return file_id - - def get_file_url(self, file_id, expires_in=300, mime_type=None): - self._initialize_s3() - full_key = os.path.join(self._prefix, file_id) - k = Key(self._bucket, full_key) - headers = None - if mime_type: - headers={'Content-Type': mime_type} - - return k.generate_url(expires_in, headers=headers) - - def get_file_checksum(self, file_id): - self._initialize_s3() - full_key = os.path.join(self._prefix, file_id) - k = self._bucket.lookup(full_key) - return k.etag[1:-1][:7] - - class UserfilesHandlers(View): methods = ['GET', 'PUT'] - def __init__(self, local_userfiles): - self._userfiles = local_userfiles + def __init__(self, distributed_storage, location, files): + self._storage = distributed_storage + self._files = files + self._locations = {location} self._magic = magic.Magic(mime=True) def get(self, file_id): - path = self._userfiles.file_path(file_id) - if not os.path.exists(path): + path = self._files.get_file_id_path(file_id) + try: + file_stream = self._storage.stream_read_file(self._locations, path) + return send_file(file_stream) + except IOError: abort(404) - logger.debug('Sending path: %s' % path) - return send_file(path, mimetype=self._magic.from_file(path)) - def put(self, file_id): input_stream = request.stream if request.headers.get('transfer-encoding') == 'chunked': @@ -112,7 +33,8 @@ class UserfilesHandlers(View): # encoding (Gunicorn) input_stream = request.environ['wsgi.input'] - self._userfiles.store_stream(input_stream, file_id) + path = self._files.get_file_id_path(file_id) + self._storage.stream_write(self._locations, path, input_stream) return make_response('Okay') @@ -123,99 +45,79 @@ class UserfilesHandlers(View): return self.put(file_id) -class LocalUserfiles(object): - def __init__(self, app, path): - self._root_path = path - self._buffer_size = 64 * 1024 # 64 KB +class DelegateUserfiles(object): + def __init__(self, app, distributed_storage, location, path, handler_name): self._app = app + self._storage = distributed_storage + self._locations = {location} + self._prefix = path + self._handler_name = handler_name def _build_url_adapter(self): return self._app.url_map.bind(self._app.config['SERVER_HOSTNAME'], script_name=self._app.config['APPLICATION_ROOT'] or '/', url_scheme=self._app.config['PREFERRED_URL_SCHEME']) - def prepare_for_drop(self, mime_type): + def get_file_id_path(self, file_id): + return os.path.join(self._prefix, file_id) + + def prepare_for_drop(self, mime_type, requires_cors=True): + """ Returns a signed URL to upload a file to our bucket. """ + logger.debug('Requested upload url with content type: %s' % mime_type) file_id = str(uuid4()) - with self._app.app_context() as ctx: - ctx.url_adapter = self._build_url_adapter() - return (url_for('userfiles_handlers', file_id=file_id, _external=True), file_id) + path = self.get_file_id_path(file_id) + url = self._storage.get_direct_upload_url(self._locations, path, mime_type, requires_cors) - def file_path(self, file_id): - if '..' in file_id or file_id.startswith('/'): - raise RuntimeError('Invalid Filename') - return os.path.join(self._root_path, file_id) + if url is None: + with self._app.app_context() as ctx: + ctx.url_adapter = self._build_url_adapter() + return (url_for(self._handler_name, file_id=file_id, _external=True), file_id) - def store_stream(self, stream, file_id): - path = self.file_path(file_id) - dirname = os.path.dirname(path) - if not os.path.exists(dirname): - os.makedirs(dirname) - - with open(path, 'w') as to_write: - while True: - try: - buf = stream.read(self._buffer_size) - if not buf: - break - to_write.write(buf) - except IOError: - break + return (url, file_id) def store_file(self, file_like_obj, content_type): file_id = str(uuid4()) - - # Rewind the file to match what s3 does - file_like_obj.seek(0, os.SEEK_SET) - - self.store_stream(file_like_obj, file_id) + path = self.get_file_id_path(file_id) + self._storage.stream_write(self._locations, path, file_like_obj) return file_id - def get_file_url(self, file_id, expires_in=300): - with self._app.app_context() as ctx: - ctx.url_adapter = self._build_url_adapter() - return url_for('userfiles_handlers', file_id=file_id, _external=True) + def get_file_url(self, file_id, expires_in=300, requires_cors=False): + path = self.get_file_id_path(file_id) + url = self._storage.get_direct_download_url(self._locations, path, expires_in, requires_cors) + + if url is None: + with self._app.app_context() as ctx: + ctx.url_adapter = self._build_url_adapter() + return url_for(self._handler_name, file_id=file_id, _external=True) + + return url def get_file_checksum(self, file_id): - path = self.file_path(file_id) - sha_hash = hashlib.sha256() - with open(path, 'r') as to_hash: - while True: - buf = to_hash.read(self._buffer_size) - if not buf: - break - sha_hash.update(buf) - return sha_hash.hexdigest()[:7] + path = self.get_file_id_path(file_id) + return self._storage.get_checksum(self._locations, path) class Userfiles(object): - def __init__(self, app=None): + def __init__(self, app=None, distributed_storage=None): self.app = app if app is not None: - self.state = self.init_app(app) + self.state = self.init_app(app, distributed_storage) else: self.state = None - def init_app(self, app): - storage_type = app.config.get('USERFILES_TYPE', 'LocalUserfiles') - path = app.config.get('USERFILES_PATH', '') + def init_app(self, app, distributed_storage): + location = app.config.get('USERFILES_LOCATION') + path = app.config.get('USERFILES_PATH', None) - if storage_type == 'LocalUserfiles': - userfiles = LocalUserfiles(app, path) - app.add_url_rule('/userfiles/', - view_func=UserfilesHandlers.as_view('userfiles_handlers', - local_userfiles=userfiles)) + handler_name = 'userfiles_handlers' - elif storage_type == 'S3Userfiles': - access_key = app.config.get('USERFILES_AWS_ACCESS_KEY', '') - secret_key = app.config.get('USERFILES_AWS_SECRET_KEY', '') - bucket = app.config.get('USERFILES_S3_BUCKET', '') - userfiles = S3Userfiles(path, access_key, secret_key, bucket) + userfiles = DelegateUserfiles(app, distributed_storage, location, path, handler_name) - elif storage_type == 'FakeUserfiles': - userfiles = FakeUserfiles() - - else: - raise RuntimeError('Unknown userfiles type: %s' % storage_type) + app.add_url_rule('/userfiles/', + view_func=UserfilesHandlers.as_view(handler_name, + distributed_storage=distributed_storage, + location=location, + files=userfiles)) # register extension with app app.extensions = getattr(app, 'extensions', {}) diff --git a/endpoints/api/build.py b/endpoints/api/build.py index 21d554069..74677fadb 100644 --- a/endpoints/api/build.py +++ b/endpoints/api/build.py @@ -80,7 +80,7 @@ def build_status_view(build_obj, can_write=False): } if can_write: - resp['archive_url'] = user_files.get_file_url(build_obj.resource_key) + resp['archive_url'] = user_files.get_file_url(build_obj.resource_key, requires_cors=True) return resp @@ -257,7 +257,7 @@ class FileDropResource(ApiResource): def post(self): """ Request a URL to which a file may be uploaded. """ mime_type = request.get_json()['mimeType'] - (url, file_id) = user_files.prepare_for_drop(mime_type) + (url, file_id) = user_files.prepare_for_drop(mime_type, requires_cors=True) return { 'url': url, 'file_id': str(file_id), diff --git a/endpoints/registry.py b/endpoints/registry.py index 72633939e..94719905a 100644 --- a/endpoints/registry.py +++ b/endpoints/registry.py @@ -110,10 +110,10 @@ def head_image_layer(namespace, repository, image_id, headers): extra_headers = {} - # Add the Accept-Ranges header if the storage engine supports resumeable + # Add the Accept-Ranges header if the storage engine supports resumable # downloads. - if store.get_supports_resumeable_downloads(repo_image.storage.locations): - profile.debug('Storage supports resumeable downloads') + if store.get_supports_resumable_downloads(repo_image.storage.locations): + profile.debug('Storage supports resumable downloads') extra_headers['Accept-Ranges'] = 'bytes' resp = make_response('') diff --git a/storage/__init__.py b/storage/__init__.py index 6700dab0b..4d1134d4b 100644 --- a/storage/__init__.py +++ b/storage/__init__.py @@ -1,5 +1,5 @@ from storage.local import LocalStorage -from storage.cloud import S3Storage, GoogleCloudStorage +from storage.cloud import S3Storage, GoogleCloudStorage, RadosGWStorage from storage.fakestorage import FakeStorage from storage.distributedstorage import DistributedStorage @@ -8,6 +8,7 @@ STORAGE_DRIVER_CLASSES = { 'LocalStorage': LocalStorage, 'S3Storage': S3Storage, 'GoogleCloudStorage': GoogleCloudStorage, + 'RadosGWStorage': RadosGWStorage, } diff --git a/storage/basestorage.py b/storage/basestorage.py index 2d3727a5b..aa6434b8e 100644 --- a/storage/basestorage.py +++ b/storage/basestorage.py @@ -54,10 +54,13 @@ class BaseStorage(StoragePaths): # Set the IO buffer to 64kB buffer_size = 64 * 1024 - def get_direct_download_url(self, path, expires_in=60): + def get_direct_download_url(self, path, expires_in=60, requires_cors=False): return None - def get_supports_resumeable_downloads(self): + def get_direct_upload_url(self, path, mime_type, requires_cors=True): + return None + + def get_supports_resumable_downloads(self): return False def get_content(self, path): @@ -83,3 +86,6 @@ class BaseStorage(StoragePaths): def remove(self, path): raise NotImplementedError + + def get_checksum(self, path): + raise NotImplementedError \ No newline at end of file diff --git a/storage/cloud.py b/storage/cloud.py index d64415410..a576a6401 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -35,8 +35,8 @@ class StreamReadKeyAsFile(object): class _CloudStorage(BaseStorage): - def __init__(self, connection_class, key_class, upload_params, storage_path, access_key, - secret_key, bucket_name): + def __init__(self, connection_class, key_class, connect_kwargs, upload_params, storage_path, + access_key, secret_key, bucket_name): self._initialized = False self._bucket_name = bucket_name self._access_key = access_key @@ -45,12 +45,14 @@ class _CloudStorage(BaseStorage): self._connection_class = connection_class self._key_class = key_class self._upload_params = upload_params + self._connect_kwargs = connect_kwargs self._cloud_conn = None self._cloud_bucket = None def _initialize_cloud_conn(self): if not self._initialized: - self._cloud_conn = self._connection_class(self._access_key, self._secret_key) + self._cloud_conn = self._connection_class(self._access_key, self._secret_key, + **self._connect_kwargs) self._cloud_bucket = self._cloud_conn.get_bucket(self._bucket_name) self._initialized = True @@ -87,15 +89,22 @@ class _CloudStorage(BaseStorage): key.set_contents_from_string(content, **self._upload_params) return path - def get_supports_resumeable_downloads(self): + def get_supports_resumable_downloads(self): return True - def get_direct_download_url(self, path, expires_in=60): + def get_direct_download_url(self, path, expires_in=60, requires_cors=False): self._initialize_cloud_conn() path = self._init_path(path) k = self._key_class(self._cloud_bucket, path) return k.generate_url(expires_in) + def get_direct_upload_url(self, path, mime_type, requires_cors=True): + self._initialize_cloud_conn() + path = self._init_path(path) + key = self._key_class(self._cloud_bucket, path) + url = key.generate_url(300, 'PUT', headers={'Content-Type': mime_type}, encrypt_key=True) + return url + def stream_read(self, path): self._initialize_cloud_conn() path = self._init_path(path) @@ -179,21 +188,32 @@ class _CloudStorage(BaseStorage): for key in self._cloud_bucket.list(prefix=path): key.delete() + def get_checksum(self, path): + self._initialize_cloud_conn() + path = self._init_path(path) + key = self._key_class(self._cloud_bucket, path) + k = self._cloud_bucket.lookup(key) + return k.etag[1:-1][:7] + class S3Storage(_CloudStorage): def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket): upload_params = { 'encrypt_key': True, } + connect_kwargs = {} super(S3Storage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key, - upload_params, storage_path, s3_access_key, s3_secret_key, - s3_bucket) + connect_kwargs, upload_params, storage_path, s3_access_key, + s3_secret_key, s3_bucket) class GoogleCloudStorage(_CloudStorage): def __init__(self, storage_path, access_key, secret_key, bucket_name): - super(GoogleCloudStorage, self).__init__(boto.gs.connection.GSConnection, boto.gs.key.Key, {}, - storage_path, access_key, secret_key, bucket_name) + upload_params = {} + connect_kwargs = {} + super(GoogleCloudStorage, self).__init__(boto.gs.connection.GSConnection, boto.gs.key.Key, + connect_kwargs, upload_params, storage_path, + access_key, secret_key, bucket_name) def stream_write(self, path, fp): # Minimum size of upload part size on S3 is 5MB @@ -201,3 +221,30 @@ class GoogleCloudStorage(_CloudStorage): path = self._init_path(path) key = self._key_class(self._cloud_bucket, path) key.set_contents_from_stream(fp) + + +class RadosGWStorage(_CloudStorage): + def __init__(self, hostname, is_secure, storage_path, access_key, secret_key, bucket_name): + upload_params = {} + connect_kwargs = { + 'host': hostname, + 'is_secure': is_secure, + 'calling_format': boto.s3.connection.OrdinaryCallingFormat(), + } + super(RadosGWStorage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key, + connect_kwargs, upload_params, storage_path, access_key, + secret_key, bucket_name) + + # TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624 + def get_direct_download_url(self, path, expires_in=60, requires_cors=False): + if requires_cors: + return None + + return super(RadosGWStorage, self).get_direct_download_url(path, expires_in, requires_cors) + + # TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624 + def get_direct_upload_url(self, path, mime_type, requires_cors=True): + if requires_cors: + return None + + return super(RadosGWStorage, self).get_direct_upload_url(path, mime_type, requires_cors) diff --git a/storage/distributedstorage.py b/storage/distributedstorage.py index 9941f0fa5..1544d9725 100644 --- a/storage/distributedstorage.py +++ b/storage/distributedstorage.py @@ -31,6 +31,7 @@ class DistributedStorage(StoragePaths): self.preferred_locations = list(preferred_locations) get_direct_download_url = _location_aware(BaseStorage.get_direct_download_url) + get_direct_upload_url = _location_aware(BaseStorage.get_direct_upload_url) get_content = _location_aware(BaseStorage.get_content) put_content = _location_aware(BaseStorage.put_content) stream_read = _location_aware(BaseStorage.stream_read) @@ -39,4 +40,5 @@ class DistributedStorage(StoragePaths): list_directory = _location_aware(BaseStorage.list_directory) exists = _location_aware(BaseStorage.exists) remove = _location_aware(BaseStorage.remove) - get_supports_resumeable_downloads = _location_aware(BaseStorage.get_supports_resumeable_downloads) + get_checksum = _location_aware(BaseStorage.get_checksum) + get_supports_resumable_downloads = _location_aware(BaseStorage.get_supports_resumable_downloads) diff --git a/storage/fakestorage.py b/storage/fakestorage.py index 597d22af4..5761acf2f 100644 --- a/storage/fakestorage.py +++ b/storage/fakestorage.py @@ -22,3 +22,6 @@ class FakeStorage(BaseStorage): def exists(self, path): return False + + def get_checksum(self, path): + return 'abcdefg' \ No newline at end of file diff --git a/storage/local.py b/storage/local.py index 361d76403..55e79077b 100644 --- a/storage/local.py +++ b/storage/local.py @@ -1,4 +1,3 @@ - import os import shutil @@ -80,3 +79,14 @@ class LocalStorage(BaseStorage): os.remove(path) except OSError: pass + + def get_checksum(self, path): + path = self._init_path(path) + sha_hash = hashlib.sha256() + with open(path, 'r') as to_hash: + while True: + buf = to_hash.read(self.buffer_size) + if not buf: + break + sha_hash.update(buf) + return sha_hash.hexdigest()[:7] diff --git a/test/testconfig.py b/test/testconfig.py index c74e5712a..35c96a803 100644 --- a/test/testconfig.py +++ b/test/testconfig.py @@ -30,7 +30,7 @@ class TestConfig(DefaultConfig): BUILDLOGS_MODULE_AND_CLASS = ('test.testlogs', 'testlogs.TestBuildLogs') BUILDLOGS_OPTIONS = ['devtable', 'building', 'deadbeef-dead-beef-dead-beefdeadbeef', False] - USERFILES_TYPE = 'FakeUserfiles' + USERFILES_LOCATION = 'local_us' FEATURE_SUPER_USERS = True FEATURE_BILLING = True diff --git a/workers/dockerfilebuild.py b/workers/dockerfilebuild.py index b373a00a9..143a27103 100644 --- a/workers/dockerfilebuild.py +++ b/workers/dockerfilebuild.py @@ -495,7 +495,7 @@ class DockerfileBuildWorker(Worker): job_config = json.loads(repository_build.job_config) - resource_url = user_files.get_file_url(repository_build.resource_key) + resource_url = user_files.get_file_url(repository_build.resource_key, requires_cors=False) tag_names = job_config['docker_tags'] build_subdir = job_config['build_subdir'] repo = job_config['repository']