This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/storage/fakestorage.py
Joseph Schorr 68c9d5e432 Add torrent tests
Fixes #1128
2016-01-19 17:40:11 -05:00

82 lines
2.3 KiB
Python

import cStringIO as StringIO
import hashlib
from collections import defaultdict
from uuid import uuid4
from storage.basestorage import BaseStorageV2
_FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO)
class FakeStorage(BaseStorageV2):
def __init__(self, metric_queue):
super(FakeStorage, self).__init__()
def _init_path(self, path=None, create=False):
return path
def get_direct_download_url(self, path, expires_in=60, requires_cors=False):
try:
if self.get_content('supports_direct_download') == 'true':
return 'http://somefakeurl'
except:
pass
return None
def get_content(self, path):
if not path in _FAKE_STORAGE_MAP:
raise IOError('Fake file %s not found' % path)
_FAKE_STORAGE_MAP.get(path).seek(0)
return _FAKE_STORAGE_MAP.get(path).read()
def put_content(self, path, content):
_FAKE_STORAGE_MAP.pop(path, None)
_FAKE_STORAGE_MAP[path].write(content)
def stream_read(self, path):
io_obj = _FAKE_STORAGE_MAP[path]
io_obj.seek(0)
while True:
buf = io_obj.read(self.buffer_size)
if not buf:
break
yield buf
def stream_read_file(self, path):
return StringIO.StringIO(self.get_content(path))
def stream_write(self, path, fp, content_type=None, content_encoding=None):
out_fp = _FAKE_STORAGE_MAP[path]
out_fp.seek(0)
self.stream_write_to_fp(fp, out_fp)
def remove(self, path):
_FAKE_STORAGE_MAP.pop(path, None)
def exists(self, path):
return path in _FAKE_STORAGE_MAP
def get_checksum(self, path):
return hashlib.sha256(_FAKE_STORAGE_MAP[path].read()).hexdigest()[:7]
def initiate_chunked_upload(self):
new_uuid = str(uuid4())
_FAKE_STORAGE_MAP[new_uuid].seek(0)
return new_uuid, {}
def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None):
upload_storage = _FAKE_STORAGE_MAP[uuid]
upload_storage.seek(offset)
try:
return self.stream_write_to_fp(in_fp, upload_storage, length), {}, None
except IOError as ex:
return 0, {}, ex
def complete_chunked_upload(self, uuid, final_path, _):
_FAKE_STORAGE_MAP[final_path] = _FAKE_STORAGE_MAP[uuid]
_FAKE_STORAGE_MAP.pop(uuid, None)
def cancel_chunked_upload(self, uuid, _):
_FAKE_STORAGE_MAP.pop(uuid, None)