This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/storage/fakestorage.py
Joseph Schorr ee0eb80c8f Fix blob content types
Fixes #990
2015-12-04 16:13:58 -05:00

67 lines
1.9 KiB
Python

import cStringIO as StringIO
import hashlib
from collections import defaultdict
from uuid import uuid4
from storage.basestorage import BaseStorageV2
_FAKE_STORAGE_MAP = defaultdict(StringIO.StringIO)
class FakeStorage(BaseStorageV2):
def _init_path(self, path=None, create=False):
return path
def get_content(self, path):
if not path in _FAKE_STORAGE_MAP:
raise IOError('Fake file %s not found' % path)
_FAKE_STORAGE_MAP.get(path).seek(0)
return _FAKE_STORAGE_MAP.get(path).read()
def put_content(self, path, content):
_FAKE_STORAGE_MAP.pop(path, None)
_FAKE_STORAGE_MAP[path].write(content)
def stream_read(self, path):
io_obj = _FAKE_STORAGE_MAP[path]
io_obj.seek(0)
while True:
buf = io_obj.read(self.buffer_size)
if not buf:
break
yield buf
def stream_read_file(self, path):
return StringIO.StringIO(self.get_content(path))
def stream_write(self, path, fp, content_type=None, content_encoding=None):
out_fp = _FAKE_STORAGE_MAP[path]
out_fp.seek(0)
self.stream_write_to_fp(fp, out_fp)
def remove(self, path):
_FAKE_STORAGE_MAP.pop(path, None)
def exists(self, path):
return path in _FAKE_STORAGE_MAP
def get_checksum(self, path):
return hashlib.sha256(_FAKE_STORAGE_MAP[path].read()).hexdigest()[:7]
def initiate_chunked_upload(self):
new_uuid = str(uuid4())
_FAKE_STORAGE_MAP[new_uuid].seek(0)
return new_uuid, {}
def stream_upload_chunk(self, uuid, offset, length, in_fp, _, content_type=None):
upload_storage = _FAKE_STORAGE_MAP[uuid]
upload_storage.seek(offset)
return self.stream_write_to_fp(in_fp, upload_storage, length), {}
def complete_chunked_upload(self, uuid, final_path, _):
_FAKE_STORAGE_MAP[final_path] = _FAKE_STORAGE_MAP[uuid]
_FAKE_STORAGE_MAP.pop(uuid, None)
def cancel_chunked_upload(self, uuid, _):
_FAKE_STORAGE_MAP.pop(uuid, None)