Add basic layer merging tests
This commit is contained in:
parent
efc06b54f6
commit
62f1f5f583
1 changed files with 143 additions and 0 deletions
143
test/test_streamlayerformat.py
Normal file
143
test/test_streamlayerformat.py
Normal file
|
@ -0,0 +1,143 @@
|
|||
import unittest
|
||||
import tarfile
|
||||
|
||||
from StringIO import StringIO
|
||||
from util.streamlayerformat import StreamLayerMerger, AUFS_WHITEOUT
|
||||
|
||||
class TestStreamLayerMerger(unittest.TestCase):
|
||||
def create_layer(self, **kwargs):
|
||||
output = StringIO()
|
||||
with tarfile.open(fileobj=output, mode='w:gz') as tar:
|
||||
for filename in kwargs:
|
||||
current_filename = filename
|
||||
current_contents = kwargs[filename]
|
||||
|
||||
if current_contents is None:
|
||||
# This is a deleted file.
|
||||
current_filename = AUFS_WHITEOUT + current_filename
|
||||
current_contents = ''
|
||||
|
||||
info = tarfile.TarInfo(name=current_filename)
|
||||
info.size = len(current_contents)
|
||||
tar.addfile(info, fileobj=StringIO(current_contents))
|
||||
|
||||
return output.getvalue()
|
||||
|
||||
def squash_layers(self, layers):
|
||||
def get_layers():
|
||||
return [StringIO(layer) for layer in layers]
|
||||
|
||||
merger = StreamLayerMerger(get_layers)
|
||||
merged_data = ''.join(merger.get_generator())
|
||||
return merged_data
|
||||
|
||||
def assertHasFile(self, squashed, filename, contents):
|
||||
with tarfile.open(fileobj=StringIO(squashed), mode='r:*') as tar:
|
||||
member = tar.getmember(filename)
|
||||
self.assertEquals(contents, '\n'.join(tar.extractfile(member).readlines()))
|
||||
|
||||
def assertDoesNotHaveFile(self, squashed, filename):
|
||||
with tarfile.open(fileobj=StringIO(squashed), mode='r:*') as tar:
|
||||
try:
|
||||
member = tar.getmember(filename)
|
||||
self.fail('Filename %s found' % filename)
|
||||
except:
|
||||
pass
|
||||
|
||||
def test_single_layer(self):
|
||||
tar_layer = self.create_layer(
|
||||
some_file = 'foo',
|
||||
another_file = 'bar',
|
||||
third_file = 'meh')
|
||||
|
||||
squashed = self.squash_layers([tar_layer])
|
||||
|
||||
self.assertHasFile(squashed, 'some_file', 'foo')
|
||||
self.assertHasFile(squashed, 'another_file', 'bar')
|
||||
self.assertHasFile(squashed, 'third_file', 'meh')
|
||||
|
||||
def test_multiple_layers(self):
|
||||
second_layer = self.create_layer(
|
||||
some_file = 'foo',
|
||||
another_file = 'bar',
|
||||
third_file = 'meh')
|
||||
|
||||
first_layer = self.create_layer(
|
||||
top_file = 'top')
|
||||
|
||||
squashed = self.squash_layers([first_layer, second_layer])
|
||||
|
||||
self.assertHasFile(squashed, 'some_file', 'foo')
|
||||
self.assertHasFile(squashed, 'another_file', 'bar')
|
||||
self.assertHasFile(squashed, 'third_file', 'meh')
|
||||
self.assertHasFile(squashed, 'top_file', 'top')
|
||||
|
||||
def test_multiple_layers_overwrite(self):
|
||||
second_layer = self.create_layer(
|
||||
some_file = 'foo',
|
||||
another_file = 'bar',
|
||||
third_file = 'meh')
|
||||
|
||||
first_layer = self.create_layer(
|
||||
another_file = 'top')
|
||||
|
||||
squashed = self.squash_layers([first_layer, second_layer])
|
||||
|
||||
self.assertHasFile(squashed, 'some_file', 'foo')
|
||||
self.assertHasFile(squashed, 'third_file', 'meh')
|
||||
self.assertHasFile(squashed, 'another_file', 'top')
|
||||
|
||||
def test_deleted_file(self):
|
||||
second_layer = self.create_layer(
|
||||
some_file = 'foo',
|
||||
another_file = 'bar',
|
||||
third_file = 'meh')
|
||||
|
||||
first_layer = self.create_layer(
|
||||
another_file = None)
|
||||
|
||||
squashed = self.squash_layers([first_layer, second_layer])
|
||||
|
||||
self.assertHasFile(squashed, 'some_file', 'foo')
|
||||
self.assertHasFile(squashed, 'third_file', 'meh')
|
||||
self.assertDoesNotHaveFile(squashed, 'another_file')
|
||||
|
||||
def test_deleted_readded_file(self):
|
||||
third_layer = self.create_layer(
|
||||
another_file = 'bar')
|
||||
|
||||
second_layer = self.create_layer(
|
||||
some_file = 'foo',
|
||||
another_file = None,
|
||||
third_file = 'meh')
|
||||
|
||||
first_layer = self.create_layer(
|
||||
another_file = 'newagain')
|
||||
|
||||
squashed = self.squash_layers([first_layer, second_layer, third_layer])
|
||||
|
||||
self.assertHasFile(squashed, 'some_file', 'foo')
|
||||
self.assertHasFile(squashed, 'third_file', 'meh')
|
||||
self.assertHasFile(squashed, 'another_file', 'newagain')
|
||||
|
||||
def test_deleted_in_lower_layer(self):
|
||||
third_layer = self.create_layer(
|
||||
another_file = 'bar')
|
||||
|
||||
second_layer = self.create_layer(
|
||||
some_file = 'foo',
|
||||
another_file = None,
|
||||
third_file = 'meh')
|
||||
|
||||
first_layer = self.create_layer(
|
||||
top_file = 'top')
|
||||
|
||||
squashed = self.squash_layers([first_layer, second_layer, third_layer])
|
||||
|
||||
self.assertHasFile(squashed, 'some_file', 'foo')
|
||||
self.assertHasFile(squashed, 'third_file', 'meh')
|
||||
self.assertHasFile(squashed, 'top_file', 'top')
|
||||
self.assertDoesNotHaveFile(squashed, 'another_file')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Reference in a new issue