This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/test_streamlayerformat.py

144 lines
4.3 KiB
Python
Raw Normal View History

2014-09-17 15:50:52 +00:00
import unittest
import tarfile
from StringIO import StringIO
from util.streamlayerformat import StreamLayerMerger, AUFS_WHITEOUT
class TestStreamLayerMerger(unittest.TestCase):
def create_layer(self, **kwargs):
output = StringIO()
with tarfile.open(fileobj=output, mode='w:gz') as tar:
for filename in kwargs:
current_filename = filename
current_contents = kwargs[filename]
if current_contents is None:
# This is a deleted file.
current_filename = AUFS_WHITEOUT + current_filename
current_contents = ''
info = tarfile.TarInfo(name=current_filename)
info.size = len(current_contents)
tar.addfile(info, fileobj=StringIO(current_contents))
return output.getvalue()
def squash_layers(self, layers):
def get_layers():
return [StringIO(layer) for layer in layers]
merger = StreamLayerMerger(get_layers)
merged_data = ''.join(merger.get_generator())
return merged_data
def assertHasFile(self, squashed, filename, contents):
with tarfile.open(fileobj=StringIO(squashed), mode='r:*') as tar:
member = tar.getmember(filename)
self.assertEquals(contents, '\n'.join(tar.extractfile(member).readlines()))
def assertDoesNotHaveFile(self, squashed, filename):
with tarfile.open(fileobj=StringIO(squashed), mode='r:*') as tar:
try:
member = tar.getmember(filename)
self.fail('Filename %s found' % filename)
except:
pass
def test_single_layer(self):
tar_layer = self.create_layer(
some_file = 'foo',
another_file = 'bar',
third_file = 'meh')
squashed = self.squash_layers([tar_layer])
self.assertHasFile(squashed, 'some_file', 'foo')
self.assertHasFile(squashed, 'another_file', 'bar')
self.assertHasFile(squashed, 'third_file', 'meh')
def test_multiple_layers(self):
second_layer = self.create_layer(
some_file = 'foo',
another_file = 'bar',
third_file = 'meh')
first_layer = self.create_layer(
top_file = 'top')
squashed = self.squash_layers([first_layer, second_layer])
self.assertHasFile(squashed, 'some_file', 'foo')
self.assertHasFile(squashed, 'another_file', 'bar')
self.assertHasFile(squashed, 'third_file', 'meh')
self.assertHasFile(squashed, 'top_file', 'top')
def test_multiple_layers_overwrite(self):
second_layer = self.create_layer(
some_file = 'foo',
another_file = 'bar',
third_file = 'meh')
first_layer = self.create_layer(
another_file = 'top')
squashed = self.squash_layers([first_layer, second_layer])
self.assertHasFile(squashed, 'some_file', 'foo')
self.assertHasFile(squashed, 'third_file', 'meh')
self.assertHasFile(squashed, 'another_file', 'top')
def test_deleted_file(self):
second_layer = self.create_layer(
some_file = 'foo',
another_file = 'bar',
third_file = 'meh')
first_layer = self.create_layer(
another_file = None)
squashed = self.squash_layers([first_layer, second_layer])
self.assertHasFile(squashed, 'some_file', 'foo')
self.assertHasFile(squashed, 'third_file', 'meh')
self.assertDoesNotHaveFile(squashed, 'another_file')
def test_deleted_readded_file(self):
third_layer = self.create_layer(
another_file = 'bar')
second_layer = self.create_layer(
some_file = 'foo',
another_file = None,
third_file = 'meh')
first_layer = self.create_layer(
another_file = 'newagain')
squashed = self.squash_layers([first_layer, second_layer, third_layer])
self.assertHasFile(squashed, 'some_file', 'foo')
self.assertHasFile(squashed, 'third_file', 'meh')
self.assertHasFile(squashed, 'another_file', 'newagain')
def test_deleted_in_lower_layer(self):
third_layer = self.create_layer(
another_file = 'bar')
second_layer = self.create_layer(
some_file = 'foo',
another_file = None,
third_file = 'meh')
first_layer = self.create_layer(
top_file = 'top')
squashed = self.squash_layers([first_layer, second_layer, third_layer])
self.assertHasFile(squashed, 'some_file', 'foo')
self.assertHasFile(squashed, 'third_file', 'meh')
self.assertHasFile(squashed, 'top_file', 'top')
self.assertDoesNotHaveFile(squashed, 'another_file')
if __name__ == '__main__':
unittest.main()