Merge remote-tracking branch 'origin/master' into waltermitty
Conflicts: app.py data/userfiles.py
This commit is contained in:
commit
2455c17f96
23 changed files with 232 additions and 241 deletions
|
@ -1,113 +1,35 @@
|
|||
import boto
|
||||
import os
|
||||
import logging
|
||||
import hashlib
|
||||
import magic
|
||||
|
||||
from boto.s3.key import Key
|
||||
from uuid import uuid4
|
||||
from flask import url_for, request, send_file, make_response, abort
|
||||
from flask.views import View
|
||||
from _pyio import BufferedReader
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FakeUserfiles(object):
|
||||
def prepare_for_drop(self, mime_type):
|
||||
return ('http://fake/url', uuid4())
|
||||
|
||||
def store_file(self, file_like_obj, content_type):
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_file_url(self, file_id, expires_in=300):
|
||||
return ('http://fake/url')
|
||||
|
||||
def get_file_checksum(self, file_id):
|
||||
return 'abcdefg'
|
||||
|
||||
|
||||
class S3FileWriteException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class S3Userfiles(object):
|
||||
def __init__(self, path, s3_access_key, s3_secret_key, bucket_name):
|
||||
self._initialized = False
|
||||
self._bucket_name = bucket_name
|
||||
self._access_key = s3_access_key
|
||||
self._secret_key = s3_secret_key
|
||||
self._prefix = path
|
||||
self._s3_conn = None
|
||||
self._bucket = None
|
||||
|
||||
def _initialize_s3(self):
|
||||
if not self._initialized:
|
||||
self._s3_conn = boto.connect_s3(self._access_key, self._secret_key)
|
||||
self._bucket = self._s3_conn.get_bucket(self._bucket_name)
|
||||
self._initialized = True
|
||||
|
||||
def prepare_for_drop(self, mime_type):
|
||||
""" Returns a signed URL to upload a file to our bucket. """
|
||||
self._initialize_s3()
|
||||
logger.debug('Requested upload url with content type: %s' % mime_type)
|
||||
file_id = str(uuid4())
|
||||
full_key = os.path.join(self._prefix, file_id)
|
||||
k = Key(self._bucket, full_key)
|
||||
url = k.generate_url(300, 'PUT', headers={'Content-Type': mime_type},
|
||||
encrypt_key=True)
|
||||
return (url, file_id)
|
||||
|
||||
def store_file(self, file_like_obj, content_type, file_id=None):
|
||||
self._initialize_s3()
|
||||
|
||||
if file_id is None:
|
||||
file_id = str(uuid4())
|
||||
|
||||
full_key = os.path.join(self._prefix, file_id)
|
||||
k = Key(self._bucket, full_key)
|
||||
logger.debug('Setting s3 content type to: %s' % content_type)
|
||||
k.set_metadata('Content-Type', content_type)
|
||||
bytes_written = k.set_contents_from_file(file_like_obj, encrypt_key=True,
|
||||
rewind=True)
|
||||
|
||||
if bytes_written == 0:
|
||||
raise S3FileWriteException('Unable to write file to S3')
|
||||
|
||||
return file_id
|
||||
|
||||
def get_file_url(self, file_id, expires_in=300, mime_type=None):
|
||||
self._initialize_s3()
|
||||
full_key = os.path.join(self._prefix, file_id)
|
||||
k = Key(self._bucket, full_key)
|
||||
headers = None
|
||||
if mime_type:
|
||||
headers={'Content-Type': mime_type}
|
||||
|
||||
return k.generate_url(expires_in, headers=headers)
|
||||
|
||||
def get_file_checksum(self, file_id):
|
||||
self._initialize_s3()
|
||||
full_key = os.path.join(self._prefix, file_id)
|
||||
k = self._bucket.lookup(full_key)
|
||||
return k.etag[1:-1][:7]
|
||||
|
||||
|
||||
class UserfilesHandlers(View):
|
||||
methods = ['GET', 'PUT']
|
||||
|
||||
def __init__(self, local_userfiles):
|
||||
self._userfiles = local_userfiles
|
||||
def __init__(self, distributed_storage, location, files):
|
||||
self._storage = distributed_storage
|
||||
self._files = files
|
||||
self._locations = {location}
|
||||
self._magic = magic.Magic(mime=True)
|
||||
|
||||
def get(self, file_id):
|
||||
path = self._userfiles.file_path(file_id)
|
||||
if not os.path.exists(path):
|
||||
path = self._files.get_file_id_path(file_id)
|
||||
try:
|
||||
file_stream = self._storage.stream_read_file(self._locations, path)
|
||||
buffered = BufferedReader(file_stream)
|
||||
file_header_bytes = buffered.peek(1024)
|
||||
return send_file(buffered, mimetype=self._magic.from_buffer(file_header_bytes))
|
||||
except IOError:
|
||||
abort(404)
|
||||
|
||||
logger.debug('Sending path: %s' % path)
|
||||
return send_file(path, mimetype=self._magic.from_file(path))
|
||||
|
||||
def put(self, file_id):
|
||||
input_stream = request.stream
|
||||
if request.headers.get('transfer-encoding') == 'chunked':
|
||||
|
@ -115,7 +37,10 @@ class UserfilesHandlers(View):
|
|||
# encoding (Gunicorn)
|
||||
input_stream = request.environ['wsgi.input']
|
||||
|
||||
self._userfiles.store_stream(input_stream, file_id)
|
||||
c_type = request.headers.get('Content-Type', None)
|
||||
|
||||
path = self._files.get_file_id_path(file_id)
|
||||
self._storage.stream_write(self._locations, path, input_stream, c_type)
|
||||
|
||||
return make_response('Okay')
|
||||
|
||||
|
@ -126,100 +51,81 @@ class UserfilesHandlers(View):
|
|||
return self.put(file_id)
|
||||
|
||||
|
||||
class LocalUserfiles(object):
|
||||
def __init__(self, app, path):
|
||||
self._root_path = path
|
||||
self._buffer_size = 64 * 1024 # 64 KB
|
||||
class DelegateUserfiles(object):
|
||||
def __init__(self, app, distributed_storage, location, path, handler_name):
|
||||
self._app = app
|
||||
self._storage = distributed_storage
|
||||
self._locations = {location}
|
||||
self._prefix = path
|
||||
self._handler_name = handler_name
|
||||
|
||||
def _build_url_adapter(self):
|
||||
return self._app.url_map.bind(self._app.config['SERVER_HOSTNAME'],
|
||||
script_name=self._app.config['APPLICATION_ROOT'] or '/',
|
||||
url_scheme=self._app.config['PREFERRED_URL_SCHEME'])
|
||||
|
||||
def prepare_for_drop(self, mime_type):
|
||||
def get_file_id_path(self, file_id):
|
||||
return os.path.join(self._prefix, file_id)
|
||||
|
||||
def prepare_for_drop(self, mime_type, requires_cors=True):
|
||||
""" Returns a signed URL to upload a file to our bucket. """
|
||||
logger.debug('Requested upload url with content type: %s' % mime_type)
|
||||
file_id = str(uuid4())
|
||||
with self._app.app_context() as ctx:
|
||||
ctx.url_adapter = self._build_url_adapter()
|
||||
return (url_for('userfiles_handlers', file_id=file_id, _external=True), file_id)
|
||||
path = self.get_file_id_path(file_id)
|
||||
url = self._storage.get_direct_upload_url(self._locations, path, mime_type, requires_cors)
|
||||
|
||||
def file_path(self, file_id):
|
||||
if '..' in file_id or file_id.startswith('/'):
|
||||
raise RuntimeError('Invalid Filename')
|
||||
return os.path.join(self._root_path, file_id)
|
||||
if url is None:
|
||||
with self._app.app_context() as ctx:
|
||||
ctx.url_adapter = self._build_url_adapter()
|
||||
return (url_for(self._handler_name, file_id=file_id, _external=True), file_id)
|
||||
|
||||
def store_stream(self, stream, file_id):
|
||||
path = self.file_path(file_id)
|
||||
dirname = os.path.dirname(path)
|
||||
if not os.path.exists(dirname):
|
||||
os.makedirs(dirname)
|
||||
|
||||
with open(path, 'w') as to_write:
|
||||
while True:
|
||||
try:
|
||||
buf = stream.read(self._buffer_size)
|
||||
if not buf:
|
||||
break
|
||||
to_write.write(buf)
|
||||
except IOError:
|
||||
break
|
||||
return (url, file_id)
|
||||
|
||||
def store_file(self, file_like_obj, content_type, file_id=None):
|
||||
if file_id is None:
|
||||
file_id = str(uuid4())
|
||||
|
||||
# Rewind the file to match what s3 does
|
||||
file_like_obj.seek(0, os.SEEK_SET)
|
||||
|
||||
self.store_stream(file_like_obj, file_id)
|
||||
path = self.get_file_id_path(file_id)
|
||||
self._storage.stream_write(self._locations, path, file_like_obj, content_type)
|
||||
return file_id
|
||||
|
||||
def get_file_url(self, file_id, expires_in=300):
|
||||
with self._app.app_context() as ctx:
|
||||
ctx.url_adapter = self._build_url_adapter()
|
||||
return url_for('userfiles_handlers', file_id=file_id, _external=True)
|
||||
def get_file_url(self, file_id, expires_in=300, requires_cors=False):
|
||||
path = self.get_file_id_path(file_id)
|
||||
url = self._storage.get_direct_download_url(self._locations, path, expires_in, requires_cors)
|
||||
|
||||
if url is None:
|
||||
with self._app.app_context() as ctx:
|
||||
ctx.url_adapter = self._build_url_adapter()
|
||||
return url_for(self._handler_name, file_id=file_id, _external=True)
|
||||
|
||||
return url
|
||||
|
||||
def get_file_checksum(self, file_id):
|
||||
path = self.file_path(file_id)
|
||||
sha_hash = hashlib.sha256()
|
||||
with open(path, 'r') as to_hash:
|
||||
while True:
|
||||
buf = to_hash.read(self._buffer_size)
|
||||
if not buf:
|
||||
break
|
||||
sha_hash.update(buf)
|
||||
return sha_hash.hexdigest()[:7]
|
||||
path = self.get_file_id_path(file_id)
|
||||
return self._storage.get_checksum(self._locations, path)
|
||||
|
||||
|
||||
class Userfiles(object):
|
||||
def __init__(self, app=None):
|
||||
def __init__(self, app=None, distributed_storage=None):
|
||||
self.app = app
|
||||
if app is not None:
|
||||
self.state = self.init_app(app)
|
||||
self.state = self.init_app(app, distributed_storage)
|
||||
else:
|
||||
self.state = None
|
||||
|
||||
def init_app(self, app):
|
||||
storage_type = app.config.get('USERFILES_TYPE', 'LocalUserfiles')
|
||||
path = app.config.get('USERFILES_PATH', '')
|
||||
def init_app(self, app, distributed_storage):
|
||||
location = app.config.get('USERFILES_LOCATION')
|
||||
path = app.config.get('USERFILES_PATH', None)
|
||||
|
||||
if storage_type == 'LocalUserfiles':
|
||||
userfiles = LocalUserfiles(app, path)
|
||||
app.add_url_rule('/userfiles/<file_id>',
|
||||
view_func=UserfilesHandlers.as_view('userfiles_handlers',
|
||||
local_userfiles=userfiles))
|
||||
handler_name = 'userfiles_handlers'
|
||||
|
||||
elif storage_type == 'S3Userfiles':
|
||||
access_key = app.config.get('USERFILES_AWS_ACCESS_KEY', '')
|
||||
secret_key = app.config.get('USERFILES_AWS_SECRET_KEY', '')
|
||||
bucket = app.config.get('USERFILES_S3_BUCKET', '')
|
||||
userfiles = S3Userfiles(path, access_key, secret_key, bucket)
|
||||
userfiles = DelegateUserfiles(app, distributed_storage, location, path, handler_name)
|
||||
|
||||
elif storage_type == 'FakeUserfiles':
|
||||
userfiles = FakeUserfiles()
|
||||
|
||||
else:
|
||||
raise RuntimeError('Unknown userfiles type: %s' % storage_type)
|
||||
app.add_url_rule('/userfiles/<file_id>',
|
||||
view_func=UserfilesHandlers.as_view(handler_name,
|
||||
distributed_storage=distributed_storage,
|
||||
location=location,
|
||||
files=userfiles))
|
||||
|
||||
# register extension with app
|
||||
app.extensions = getattr(app, 'extensions', {})
|
||||
|
|
Reference in a new issue