This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/storage/basestorage.py
2016-01-15 15:35:04 -05:00

136 lines
3.8 KiB
Python

import logging
import tempfile
from digest.digest_tools import content_path
from util.registry.filelike import READ_UNTIL_END
logger = logging.getLogger(__name__)
class StoragePaths(object):
shared_images = 'sharedimages'
@staticmethod
def temp_store_handler():
tmpf = tempfile.TemporaryFile()
def fn(buf):
try:
tmpf.write(buf)
except IOError:
pass
return tmpf, fn
def _image_path(self, storage_uuid):
return '{0}/{1}/'.format(self.shared_images, storage_uuid)
def v1_image_layer_path(self, storage_uuid):
base_path = self._image_path(storage_uuid)
return '{0}layer'.format(base_path)
def blob_path(self, digest_str):
return content_path(digest_str)
def image_file_trie_path(self, storage_uuid):
base_path = self._image_path(storage_uuid)
return '{0}files.trie'.format(base_path)
class BaseStorage(StoragePaths):
def __init__(self):
# Set the IO buffer to 64kB
self.buffer_size = 64 * 1024
def setup(self):
""" Called to perform any storage system setup. """
pass
def validate(self, client):
""" Called to perform any custom storage system validation. The client is an HTTP
client to use for any external calls. """
pass
def get_direct_download_url(self, path, expires_in=60, requires_cors=False):
return None
def get_direct_upload_url(self, path, mime_type, requires_cors=True):
return None
def get_supports_resumable_downloads(self):
return False
def get_content(self, path):
raise NotImplementedError
def put_content(self, path, content):
raise NotImplementedError
def stream_read(self, path):
raise NotImplementedError
def stream_read_file(self, path):
raise NotImplementedError
def stream_write(self, path, fp, content_type=None, content_encoding=None):
raise NotImplementedError
def exists(self, path):
raise NotImplementedError
def remove(self, path):
raise NotImplementedError
def get_checksum(self, path):
raise NotImplementedError
def stream_write_to_fp(self, in_fp, out_fp, num_bytes=READ_UNTIL_END):
""" Copy the specified number of bytes from the input file stream to the output stream. If
num_bytes < 0 copy until the stream ends.
"""
bytes_copied = 0
while bytes_copied < num_bytes or num_bytes == READ_UNTIL_END:
size_to_read = min(num_bytes - bytes_copied, self.buffer_size)
if size_to_read < 0:
size_to_read = self.buffer_size
buf = in_fp.read(size_to_read)
if not buf:
break
out_fp.write(buf)
bytes_copied += len(buf)
return bytes_copied
def copy_to(self, destination, path):
raise NotImplementedError
class InvalidChunkException(RuntimeError):
pass
class BaseStorageV2(BaseStorage):
def initiate_chunked_upload(self):
""" Start a new chunked upload, returning the uuid and any associated storage metadata
"""
raise NotImplementedError
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, content_type=None):
""" Upload the specified amount of data from the given file pointer to the chunked destination
specified, starting at the given offset. Returns the number of bytes uploaded, and a new
version of the storage_metadata. Raises InvalidChunkException if the offset or length can
not be accepted. Pass length as -1 to upload as much data from the in_fp as possible.
"""
raise NotImplementedError
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
""" Complete the chunked upload and store the final results in the path indicated.
Returns nothing.
"""
raise NotImplementedError
def cancel_chunked_upload(self, uuid, storage_metadata):
""" Cancel the chunked upload and clean up any outstanding partially uploaded data.
Returns nothing.
"""
raise NotImplementedError