This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/storage/local.py

94 lines
2.4 KiB
Python

import os
import shutil
import hashlib
import io
from storage.basestorage import BaseStorage
class LocalStorage(BaseStorage):
def __init__(self, storage_path):
self._root_path = storage_path
def _init_path(self, path=None, create=False):
path = os.path.join(self._root_path, path) if path else self._root_path
if create is True:
dirname = os.path.dirname(path)
if not os.path.exists(dirname):
os.makedirs(dirname)
return path
def get_content(self, path):
path = self._init_path(path)
with open(path, mode='r') as f:
return f.read()
def put_content(self, path, content):
path = self._init_path(path, create=True)
with open(path, mode='w') as f:
f.write(content)
return path
def stream_read(self, path):
path = self._init_path(path)
with open(path, mode='rb') as f:
while True:
buf = f.read(self.buffer_size)
if not buf:
break
yield buf
def stream_read_file(self, path):
path = self._init_path(path)
return io.open(path, mode='rb')
def stream_write(self, path, fp, content_type=None, content_encoding=None):
# Size is mandatory
path = self._init_path(path, create=True)
with open(path, mode='wb') as f:
while True:
try:
buf = fp.read(self.buffer_size)
if not buf:
break
f.write(buf)
except IOError:
break
def list_directory(self, path=None):
path = self._init_path(path)
prefix = path[len(self._root_path) + 1:] + '/'
exists = False
for d in os.listdir(path):
exists = True
yield prefix + d
if exists is False:
# Raises OSError even when the directory is empty
# (to be consistent with S3)
raise OSError('No such directory: \'{0}\''.format(path))
def exists(self, path):
path = self._init_path(path)
return os.path.exists(path)
def remove(self, path):
path = self._init_path(path)
if os.path.isdir(path):
shutil.rmtree(path)
return
try:
os.remove(path)
except OSError:
pass
def get_checksum(self, path):
path = self._init_path(path)
sha_hash = hashlib.sha256()
with open(path, 'r') as to_hash:
while True:
buf = to_hash.read(self.buffer_size)
if not buf:
break
sha_hash.update(buf)
return sha_hash.hexdigest()[:7]