Fix GeneratorFile for working with BufferedReader

The user files system uses a BufferedReader along with the magic library to determine the mime type of the user file being served. Currently, BufferedReader fails with an exception on Swift storage, because Swift storage returns a GeneratorFile, which is missing the `readable()` method.
This commit is contained in:
Joseph Schorr 2016-06-23 13:40:57 -04:00
parent 1173192739
commit 30ede029d5
2 changed files with 31 additions and 0 deletions

View file

@ -1,4 +1,5 @@
import unittest
import magic
from itertools import islice
from semantic_version import Version, Spec
@ -7,6 +8,7 @@ from util.validation import generate_valid_usernames
from util.registry.generatorfile import GeneratorFile
from util.registry.dockerver import docker_version
from util import slash_join
from _pyio import BufferedReader
class TestGeneratorFile(unittest.TestCase):
def sample_generator(self):
@ -76,6 +78,32 @@ class TestGeneratorFile(unittest.TestCase):
self.assertEquals("thisisatest", f.read(60))
self.assertEquals(11, f.tell())
def test_with_bufferedreader(self):
with GeneratorFile(self.sample_generator()) as f:
buffered = BufferedReader(f)
self.assertEquals("thisisatest", buffered.peek(10))
self.assertEquals("thisisates", buffered.read(10))
def mimed_html_generator(self):
yield '<html>'
yield '<body>'
yield 'sometext' * 1024
yield '</body>'
yield '</html>'
def test_magic(self):
mgc = magic.Magic(mime=True)
with GeneratorFile(self.mimed_html_generator()) as f:
buffered = BufferedReader(f)
file_header_bytes = buffered.peek(1024)
self.assertEquals("text/html", mgc.from_buffer(file_header_bytes))
with GeneratorFile(self.sample_generator()) as f:
buffered = BufferedReader(f)
file_header_bytes = buffered.peek(1024)
self.assertEquals("text/plain", mgc.from_buffer(file_header_bytes))
class TestUsernameGenerator(unittest.TestCase):
def assert_generated_output(self, input_username, expected_output):

View file

@ -33,6 +33,9 @@ class GeneratorFile(object):
raise StopIteration
return r
def readable(self):
return not self._closed
def readline(self):
buf = []
while True: