This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/util/queuefile.py

59 lines
1.3 KiB
Python
Raw Normal View History

from multiprocessing import Queue
import os
class QueueFile(object):
""" Class which implements a file-like interface and reads from a blocking
multiprocessing queue.
"""
def __init__(self, queue, name=None):
self._queue = queue
self._closed = False
self._done = False
self._buffer = ''
self._total_size = 0
self._name = name
self.raised_exception = False
self._exception_handlers = []
def add_exception_handler(self, handler):
self._exception_handlers.append(handler)
def read(self, size=8192):
if self._closed or self._done:
buf = self._buffer[0:size]
self._buffer = self._buffer[size:]
return buf
while len(self._buffer) < size:
result = self._queue.get(block=True)
if result is None:
self._done = True
break
if isinstance(result, Exception):
self._closed = True
self.raised_exception = True
handled = False
for handler in self._exception_handlers:
handler(result)
handled = True
if handled:
return
raise result
self._buffer += result
self._total_size += len(result)
buf = self._buffer[0:size]
self._buffer = self._buffer[size:]
return buf
def flush(self):
pass
def close(self):
self._closed = True