From c0c1da323237f5e807e9b052d7d052905854bb05 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 28 Aug 2015 14:08:13 -0400 Subject: [PATCH] Change build logs load to using streaming Gzip --- data/archivedlogs.py | 9 +-- util/registry/gzipinputstream.py | 113 +++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 6 deletions(-) create mode 100644 util/registry/gzipinputstream.py diff --git a/data/archivedlogs.py b/data/archivedlogs.py index e190b9782..fd532c3ca 100644 --- a/data/archivedlogs.py +++ b/data/archivedlogs.py @@ -1,8 +1,7 @@ import logging -from gzip import GzipFile +from util.registry.gzipinputstream import GzipInputStream from flask import send_file, abort -from cStringIO import StringIO from data.userfiles import DelegateUserfiles, UserfilesHandlers @@ -17,10 +16,8 @@ class LogArchiveHandlers(UserfilesHandlers): def get(self, file_id): path = self._files.get_file_id_path(file_id) try: - with self._storage.stream_read_file(self._locations, path) as gzip_stream: - with GzipFile(fileobj=gzip_stream) as unzipped: - unzipped_buffer = StringIO(unzipped.read()) - return send_file(unzipped_buffer, mimetype=JSON_MIMETYPE) + data_stream = self._storage.stream_read_file(self._locations, path) + return send_file(GzipInputStream(data_stream), mimetype=JSON_MIMETYPE) except IOError: abort(404) diff --git a/util/registry/gzipinputstream.py b/util/registry/gzipinputstream.py new file mode 100644 index 000000000..10dc060f1 --- /dev/null +++ b/util/registry/gzipinputstream.py @@ -0,0 +1,113 @@ +import zlib +import string + +BLOCK_SIZE = 16384 +"""Read block size""" + +WINDOW_BUFFER_SIZE = 16 + zlib.MAX_WBITS +"""zlib window buffer size, set to gzip's format""" + + +class GzipInputStream(object): + """ + Simple class that allow streaming reads from GZip files. + + Python 2.x gzip.GZipFile relies on .seek() and .tell(), so it + doesn't support this (@see: http://bo4.me/YKWSsL). + + Adapted from: https://gist.github.com/beaufour/4205533 + """ + + def __init__(self, fileobj): + """ + Initialize with the given file-like object. + + @param fileobj: file-like object, + """ + self._file = fileobj + self._zip = zlib.decompressobj(WINDOW_BUFFER_SIZE) + self._offset = 0 # position in unzipped stream + self._data = "" + + def __fill(self, num_bytes): + """ + Fill the internal buffer with 'num_bytes' of data. + + @param num_bytes: int, number of bytes to read in (0 = everything) + """ + + if not self._zip: + return + + while not num_bytes or len(self._data) < num_bytes: + data = self._file.read(BLOCK_SIZE) + if not data: + self._data = self._data + self._zip.flush() + self._zip = None # no more data + break + + self._data = self._data + self._zip.decompress(data) + + def __iter__(self): + return self + + def seek(self, offset, whence=0): + if whence == 0: + position = offset + elif whence == 1: + position = self._offset + offset + else: + raise IOError("Illegal argument") + + if position < self._offset: + raise IOError("Cannot seek backwards") + + # skip forward, in blocks + while position > self._offset: + if not self.read(min(position - self._offset, BLOCK_SIZE)): + break + + def tell(self): + return self._offset + + def read(self, size=0): + self.__fill(size) + if size: + data = self._data[:size] + self._data = self._data[size:] + else: + data = self._data + self._data = "" + + self._offset = self._offset + len(data) + return data + + def next(self): + line = self.readline() + if not line: + raise StopIteration() + return line + + def readline(self): + # make sure we have an entire line + while self._zip and "\n" not in self._data: + self.__fill(len(self._data) + 512) + + pos = string.find(self._data, "\n") + 1 + if pos <= 0: + return self.read() + + return self.read(pos) + + def readlines(self): + lines = [] + while True: + line = self.readline() + if not line: + break + + lines.append(line) + return lines + + def close(self): + self._file.close()