a821ad2b01
The previous change to this file didn't raise the error up to stream_write, and so the complete_upload function still ran because the loop was only broken. It errored because the data was already canceled. This is better than what we had before, which was to silently fail but report success (even internally to ourselves!) on bad image upload. This means we discovered a bug where a user could have failed during image upload, but quay would write that image to the repository, potentially writing broken images to S3.
343 lines
11 KiB
Python
343 lines
11 KiB
Python
import cStringIO as StringIO
|
|
import os
|
|
import logging
|
|
|
|
import boto.s3.connection
|
|
import boto.gs.connection
|
|
import boto.s3.key
|
|
import boto.gs.key
|
|
|
|
from io import BufferedIOBase
|
|
|
|
import app
|
|
from storage.basestorage import BaseStorage
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class StreamReadKeyAsFile(BufferedIOBase):
|
|
def __init__(self, key):
|
|
self._key = key
|
|
|
|
def read(self, amt=None):
|
|
if self.closed:
|
|
return None
|
|
|
|
resp = self._key.read(amt)
|
|
return resp
|
|
|
|
def readable(self):
|
|
return True
|
|
|
|
@property
|
|
def closed(self):
|
|
return self._key.closed
|
|
|
|
def close(self):
|
|
self._key.close(fast=True)
|
|
|
|
|
|
class _CloudStorage(BaseStorage):
|
|
def __init__(self, connection_class, key_class, connect_kwargs, upload_params, storage_path,
|
|
access_key, secret_key, bucket_name):
|
|
self._initialized = False
|
|
self._bucket_name = bucket_name
|
|
self._access_key = access_key
|
|
self._secret_key = secret_key
|
|
self._root_path = storage_path
|
|
self._connection_class = connection_class
|
|
self._key_class = key_class
|
|
self._upload_params = upload_params
|
|
self._connect_kwargs = connect_kwargs
|
|
self._cloud_conn = None
|
|
self._cloud_bucket = None
|
|
|
|
def _initialize_cloud_conn(self):
|
|
if not self._initialized:
|
|
self._cloud_conn = self._connection_class(self._access_key, self._secret_key,
|
|
**self._connect_kwargs)
|
|
self._cloud_bucket = self._cloud_conn.get_bucket(self._bucket_name)
|
|
self._initialized = True
|
|
|
|
def _debug_key(self, key):
|
|
"""Used for debugging only."""
|
|
orig_meth = key.bucket.connection.make_request
|
|
|
|
def new_meth(*args, **kwargs):
|
|
print '#' * 16
|
|
print args
|
|
print kwargs
|
|
print '#' * 16
|
|
return orig_meth(*args, **kwargs)
|
|
key.bucket.connection.make_request = new_meth
|
|
|
|
def _init_path(self, path=None):
|
|
path = os.path.join(self._root_path, path) if path else self._root_path
|
|
if path and path[0] == '/':
|
|
return path[1:]
|
|
return path
|
|
|
|
def get_cloud_conn(self):
|
|
self._initialize_cloud_conn()
|
|
return self._cloud_conn
|
|
|
|
def get_cloud_bucket(self):
|
|
return self._cloud_bucket
|
|
|
|
def get_content(self, path):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
if not key.exists():
|
|
raise IOError('No such key: \'{0}\''.format(path))
|
|
return key.get_contents_as_string()
|
|
|
|
def put_content(self, path, content):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
key.set_contents_from_string(content, **self._upload_params)
|
|
return path
|
|
|
|
def get_supports_resumable_downloads(self):
|
|
return True
|
|
|
|
def get_direct_download_url(self, path, expires_in=60, requires_cors=False):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
k = self._key_class(self._cloud_bucket, path)
|
|
return k.generate_url(expires_in)
|
|
|
|
def get_direct_upload_url(self, path, mime_type, requires_cors=True):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
url = key.generate_url(300, 'PUT', headers={'Content-Type': mime_type}, encrypt_key=True)
|
|
return url
|
|
|
|
def stream_read(self, path):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
if not key.exists():
|
|
raise IOError('No such key: \'{0}\''.format(path))
|
|
while True:
|
|
buf = key.read(self.buffer_size)
|
|
if not buf:
|
|
break
|
|
yield buf
|
|
|
|
def stream_read_file(self, path):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
if not key.exists():
|
|
raise IOError('No such key: \'{0}\''.format(path))
|
|
return StreamReadKeyAsFile(key)
|
|
|
|
def stream_write(self, path, fp, content_type=None, content_encoding=None):
|
|
# Minimum size of upload part size on S3 is 5MB
|
|
self._initialize_cloud_conn()
|
|
buffer_size = 5 * 1024 * 1024
|
|
if self.buffer_size > buffer_size:
|
|
buffer_size = self.buffer_size
|
|
path = self._init_path(path)
|
|
|
|
metadata = {}
|
|
if content_type is not None:
|
|
metadata['Content-Type'] = content_type
|
|
|
|
if content_encoding is not None:
|
|
metadata['Content-Encoding'] = content_encoding
|
|
|
|
mp = self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
|
|
**self._upload_params)
|
|
app.metric_queue.put('MultipartUploadStart', 1)
|
|
num_part = 1
|
|
while True:
|
|
try:
|
|
buf = fp.read(buffer_size)
|
|
if not buf:
|
|
break
|
|
io = StringIO.StringIO(buf)
|
|
mp.upload_part_from_file(io, num_part)
|
|
num_part += 1
|
|
io.close()
|
|
except IOError:
|
|
app.metric_queue.put('MultipartUploadFailure', 1)
|
|
mp.cancel_upload()
|
|
raise
|
|
|
|
app.metric_queue.put('MultipartUploadSuccess', 1)
|
|
mp.complete_upload()
|
|
|
|
def list_directory(self, path=None):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
if not path.endswith('/'):
|
|
path += '/'
|
|
ln = 0
|
|
if self._root_path != '/':
|
|
ln = len(self._root_path)
|
|
exists = False
|
|
for key in self._cloud_bucket.list(prefix=path, delimiter='/'):
|
|
exists = True
|
|
name = key.name
|
|
if name.endswith('/'):
|
|
yield name[ln:-1]
|
|
else:
|
|
yield name[ln:]
|
|
if exists is False:
|
|
# In order to be compliant with the LocalStorage API. Even though
|
|
# S3 does not have a concept of folders.
|
|
raise OSError('No such directory: \'{0}\''.format(path))
|
|
|
|
def exists(self, path):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
return key.exists()
|
|
|
|
def remove(self, path):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
if key.exists():
|
|
# It's a file
|
|
key.delete()
|
|
return
|
|
# We assume it's a directory
|
|
if not path.endswith('/'):
|
|
path += '/'
|
|
for key in self._cloud_bucket.list(prefix=path):
|
|
key.delete()
|
|
|
|
def get_checksum(self, path):
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
k = self._cloud_bucket.lookup(key)
|
|
if k is None:
|
|
raise IOError('No such key: \'{0}\''.format(path))
|
|
|
|
return k.etag[1:-1][:7]
|
|
|
|
def copy_to(self, destination, path):
|
|
# First try to copy directly via boto, but only if the storages are the
|
|
# same type, with the same access information.
|
|
if (self.__class__ == destination.__class__ and
|
|
self._access_key == destination._access_key and
|
|
self._secret_key == destination._secret_key):
|
|
logger.debug('Copying file from %s to %s via a direct boto copy', self._cloud_bucket,
|
|
destination._cloud_bucket)
|
|
|
|
source_path = self._init_path(path)
|
|
source_key = self._key_class(self._cloud_bucket, source_path)
|
|
|
|
dest_path = destination._init_path(path)
|
|
source_key.copy(destination._cloud_bucket, dest_path)
|
|
return
|
|
|
|
# Fallback to a slower, default copy.
|
|
logger.debug('Copying file from %s to %s via a streamed copy', self._cloud_bucket,
|
|
destination)
|
|
with self.stream_read_file(path) as fp:
|
|
destination.stream_write(path, fp)
|
|
|
|
|
|
class S3Storage(_CloudStorage):
|
|
def __init__(self, storage_path, s3_access_key, s3_secret_key, s3_bucket):
|
|
upload_params = {
|
|
'encrypt_key': True,
|
|
}
|
|
connect_kwargs = {}
|
|
super(S3Storage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key,
|
|
connect_kwargs, upload_params, storage_path, s3_access_key,
|
|
s3_secret_key, s3_bucket)
|
|
|
|
def setup(self):
|
|
self.get_cloud_bucket().set_cors_xml("""<?xml version="1.0" encoding="UTF-8"?>
|
|
<CORSConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
|
|
<CORSRule>
|
|
<AllowedOrigin>*</AllowedOrigin>
|
|
<AllowedMethod>GET</AllowedMethod>
|
|
<MaxAgeSeconds>3000</MaxAgeSeconds>
|
|
<AllowedHeader>Authorization</AllowedHeader>
|
|
</CORSRule>
|
|
<CORSRule>
|
|
<AllowedOrigin>*</AllowedOrigin>
|
|
<AllowedMethod>PUT</AllowedMethod>
|
|
<MaxAgeSeconds>3000</MaxAgeSeconds>
|
|
<AllowedHeader>Content-Type</AllowedHeader>
|
|
<AllowedHeader>x-amz-acl</AllowedHeader>
|
|
<AllowedHeader>origin</AllowedHeader>
|
|
</CORSRule>
|
|
</CORSConfiguration>""")
|
|
|
|
class GoogleCloudStorage(_CloudStorage):
|
|
def __init__(self, storage_path, access_key, secret_key, bucket_name):
|
|
upload_params = {}
|
|
connect_kwargs = {}
|
|
super(GoogleCloudStorage, self).__init__(boto.gs.connection.GSConnection, boto.gs.key.Key,
|
|
connect_kwargs, upload_params, storage_path,
|
|
access_key, secret_key, bucket_name)
|
|
|
|
def setup(self):
|
|
self.get_cloud_bucket().set_cors_xml("""<?xml version="1.0" encoding="UTF-8"?>
|
|
<CorsConfig>
|
|
<Cors>
|
|
<Origins>
|
|
<Origin>*</Origin>
|
|
</Origins>
|
|
<Methods>
|
|
<Method>GET</Method>
|
|
<Method>PUT</Method>
|
|
</Methods>
|
|
<ResponseHeaders>
|
|
<ResponseHeader>Content-Type</ResponseHeader>
|
|
</ResponseHeaders>
|
|
<MaxAgeSec>3000</MaxAgeSec>
|
|
</Cors>
|
|
</CorsConfig>""")
|
|
|
|
def stream_write(self, path, fp, content_type=None, content_encoding=None):
|
|
# Minimum size of upload part size on S3 is 5MB
|
|
self._initialize_cloud_conn()
|
|
path = self._init_path(path)
|
|
key = self._key_class(self._cloud_bucket, path)
|
|
|
|
if content_type is not None:
|
|
key.set_metadata('Content-Type', content_type)
|
|
|
|
if content_encoding is not None:
|
|
key.set_metadata('Content-Encoding', content_encoding)
|
|
|
|
key.set_contents_from_stream(fp)
|
|
|
|
|
|
class RadosGWStorage(_CloudStorage):
|
|
def __init__(self, hostname, is_secure, storage_path, access_key, secret_key, bucket_name):
|
|
upload_params = {}
|
|
connect_kwargs = {
|
|
'host': hostname,
|
|
'is_secure': is_secure,
|
|
'calling_format': boto.s3.connection.OrdinaryCallingFormat(),
|
|
}
|
|
super(RadosGWStorage, self).__init__(boto.s3.connection.S3Connection, boto.s3.key.Key,
|
|
connect_kwargs, upload_params, storage_path, access_key,
|
|
secret_key, bucket_name)
|
|
|
|
# TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624
|
|
def get_direct_download_url(self, path, expires_in=60, requires_cors=False):
|
|
if requires_cors:
|
|
return None
|
|
|
|
return super(RadosGWStorage, self).get_direct_download_url(path, expires_in, requires_cors)
|
|
|
|
# TODO remove when radosgw supports cors: http://tracker.ceph.com/issues/8718#change-38624
|
|
def get_direct_upload_url(self, path, mime_type, requires_cors=True):
|
|
if requires_cors:
|
|
return None
|
|
|
|
return super(RadosGWStorage, self).get_direct_upload_url(path, mime_type, requires_cors)
|