import unittest import tarfile from StringIO import StringIO from util.registry.streamlayerformat import StreamLayerMerger from util.registry.aufs import AUFS_WHITEOUT from util.registry.tarlayerformat import TarLayerReadException class TestStreamLayerMerger(unittest.TestCase): def create_layer(self, *file_pairs): output = StringIO() with tarfile.open(fileobj=output, mode='w:gz') as tar: for current_filename, current_contents in file_pairs: if current_contents is None: # This is a deleted file. if current_filename.endswith('/'): current_filename = current_filename[:-1] parts = current_filename.split('/') if len(parts) > 1: current_filename = '/'.join(parts[:-1]) + '/' + AUFS_WHITEOUT + parts[-1] else: current_filename = AUFS_WHITEOUT + parts[-1] current_contents = '' if current_contents.startswith('linkto:'): info = tarfile.TarInfo(name=current_filename) info.linkname = current_contents[len('linkto:'):] info.type = tarfile.LNKTYPE tar.addfile(info) else: info = tarfile.TarInfo(name=current_filename) info.size = len(current_contents) tar.addfile(info, fileobj=StringIO(current_contents)) return output.getvalue() def create_empty_layer(self): return '' def squash_layers(self, layers, path_prefix=None): def getter_for_layer(layer): return lambda: StringIO(layer) def layer_stream_getter(): return [getter_for_layer(layer) for layer in layers] merger = StreamLayerMerger(layer_stream_getter, path_prefix=path_prefix) merged_data = ''.join(merger.get_generator()) return merged_data def assertHasFile(self, squashed, filename, contents): with tarfile.open(fileobj=StringIO(squashed), mode='r:*') as tar: member = tar.getmember(filename) self.assertEquals(contents, '\n'.join(tar.extractfile(member).readlines())) def assertDoesNotHaveFile(self, squashed, filename): with tarfile.open(fileobj=StringIO(squashed), mode='r:*') as tar: try: member = tar.getmember(filename) except Exception as ex: return self.fail('Filename %s found' % filename) def test_single_layer(self): tar_layer = self.create_layer( ('some_file', 'foo'), ('another_file', 'bar'), ('third_file', 'meh')) squashed = self.squash_layers([tar_layer]) self.assertHasFile(squashed, 'some_file', 'foo') self.assertHasFile(squashed, 'another_file', 'bar') self.assertHasFile(squashed, 'third_file', 'meh') def test_multiple_layers(self): second_layer = self.create_layer( ('some_file', 'foo'), ('another_file', 'bar'), ('third_file', 'meh')) first_layer = self.create_layer( ('top_file', 'top')) squashed = self.squash_layers([first_layer, second_layer]) self.assertHasFile(squashed, 'some_file', 'foo') self.assertHasFile(squashed, 'another_file', 'bar') self.assertHasFile(squashed, 'third_file', 'meh') self.assertHasFile(squashed, 'top_file', 'top') def test_multiple_layers_dot(self): second_layer = self.create_layer( ('./some_file', 'foo'), ('another_file', 'bar'), ('./third_file', 'meh')) first_layer = self.create_layer( ('top_file', 'top')) squashed = self.squash_layers([first_layer, second_layer]) self.assertHasFile(squashed, './some_file', 'foo') self.assertHasFile(squashed, 'another_file', 'bar') self.assertHasFile(squashed, './third_file', 'meh') self.assertHasFile(squashed, 'top_file', 'top') def test_multiple_layers_overwrite(self): second_layer = self.create_layer( ('some_file', 'foo'), ('another_file', 'bar'), ('third_file', 'meh')) first_layer = self.create_layer( ('another_file', 'top')) squashed = self.squash_layers([first_layer, second_layer]) self.assertHasFile(squashed, 'some_file', 'foo') self.assertHasFile(squashed, 'third_file', 'meh') self.assertHasFile(squashed, 'another_file', 'top') def test_multiple_layers_overwrite_base_dot(self): second_layer = self.create_layer( ('some_file', 'foo'), ('./another_file', 'bar'), ('third_file', 'meh')) first_layer = self.create_layer( ('another_file', 'top')) squashed = self.squash_layers([first_layer, second_layer]) self.assertHasFile(squashed, 'some_file', 'foo') self.assertHasFile(squashed, 'third_file', 'meh') self.assertHasFile(squashed, 'another_file', 'top') self.assertDoesNotHaveFile(squashed, './another_file') def test_multiple_layers_overwrite_top_dot(self): second_layer = self.create_layer( ('some_file', 'foo'), ('another_file', 'bar'), ('third_file', 'meh')) first_layer = self.create_layer( ('./another_file', 'top')) squashed = self.squash_layers([first_layer, second_layer]) self.assertHasFile(squashed, 'some_file', 'foo') self.assertHasFile(squashed, 'third_file', 'meh') self.assertHasFile(squashed, './another_file', 'top') self.assertDoesNotHaveFile(squashed, 'another_file') def test_deleted_file(self): second_layer = self.create_layer( ('some_file', 'foo'), ('another_file', 'bar'), ('third_file', 'meh')) first_layer = self.create_layer( ('another_file', None)) squashed = self.squash_layers([first_layer, second_layer]) self.assertHasFile(squashed, 'some_file', 'foo') self.assertHasFile(squashed, 'third_file', 'meh') self.assertDoesNotHaveFile(squashed, 'another_file') def test_deleted_readded_file(self): third_layer = self.create_layer( ('another_file', 'bar')) second_layer = self.create_layer( ('some_file', 'foo'), ('another_file', None), ('third_file', 'meh')) first_layer = self.create_layer( ('another_file', 'newagain')) squashed = self.squash_layers([first_layer, second_layer, third_layer]) self.assertHasFile(squashed, 'some_file', 'foo') self.assertHasFile(squashed, 'third_file', 'meh') self.assertHasFile(squashed, 'another_file', 'newagain') def test_deleted_in_lower_layer(self): third_layer = self.create_layer( ('deleted_file', 'bar')) second_layer = self.create_layer( ('some_file', 'foo'), ('deleted_file', None), ('third_file', 'meh')) first_layer = self.create_layer( ('top_file', 'top')) squashed = self.squash_layers([first_layer, second_layer, third_layer]) self.assertHasFile(squashed, 'some_file', 'foo') self.assertHasFile(squashed, 'third_file', 'meh') self.assertHasFile(squashed, 'top_file', 'top') self.assertDoesNotHaveFile(squashed, 'deleted_file') def test_deleted_in_lower_layer_with_added_dot(self): third_layer = self.create_layer( ('./deleted_file', 'something')) second_layer = self.create_layer( ('deleted_file', None)) squashed = self.squash_layers([second_layer, third_layer]) self.assertDoesNotHaveFile(squashed, 'deleted_file') def test_deleted_in_lower_layer_with_deleted_dot(self): third_layer = self.create_layer( ('./deleted_file', 'something')) second_layer = self.create_layer( ('./deleted_file', None)) squashed = self.squash_layers([second_layer, third_layer]) self.assertDoesNotHaveFile(squashed, 'deleted_file') def test_directory(self): second_layer = self.create_layer( ('foo/some_file', 'foo'), ('foo/another_file', 'bar')) first_layer = self.create_layer( ('foo/some_file', 'top')) squashed = self.squash_layers([first_layer, second_layer]) self.assertHasFile(squashed, 'foo/some_file', 'top') self.assertHasFile(squashed, 'foo/another_file', 'bar') def test_sub_directory(self): second_layer = self.create_layer( ('foo/some_file', 'foo'), ('foo/bar/another_file', 'bar')) first_layer = self.create_layer( ('foo/some_file', 'top')) squashed = self.squash_layers([first_layer, second_layer]) self.assertHasFile(squashed, 'foo/some_file', 'top') self.assertHasFile(squashed, 'foo/bar/another_file', 'bar') def test_delete_directory(self): second_layer = self.create_layer( ('foo/some_file', 'foo'), ('foo/another_file', 'bar')) first_layer = self.create_layer( ('foo/', None)) squashed = self.squash_layers([first_layer, second_layer]) self.assertDoesNotHaveFile(squashed, 'foo/some_file') self.assertDoesNotHaveFile(squashed, 'foo/another_file') def test_delete_sub_directory(self): second_layer = self.create_layer( ('foo/some_file', 'foo'), ('foo/bar/another_file', 'bar')) first_layer = self.create_layer( ('foo/bar/', None)) squashed = self.squash_layers([first_layer, second_layer]) self.assertDoesNotHaveFile(squashed, 'foo/bar/another_file') self.assertHasFile(squashed, 'foo/some_file', 'foo') def test_delete_sub_directory_with_dot(self): second_layer = self.create_layer( ('foo/some_file', 'foo'), ('foo/bar/another_file', 'bar')) first_layer = self.create_layer( ('./foo/bar/', None)) squashed = self.squash_layers([first_layer, second_layer]) self.assertDoesNotHaveFile(squashed, 'foo/bar/another_file') self.assertHasFile(squashed, 'foo/some_file', 'foo') def test_delete_sub_directory_with_subdot(self): second_layer = self.create_layer( ('./foo/some_file', 'foo'), ('./foo/bar/another_file', 'bar')) first_layer = self.create_layer( ('foo/bar/', None)) squashed = self.squash_layers([first_layer, second_layer]) self.assertDoesNotHaveFile(squashed, 'foo/bar/another_file') self.assertDoesNotHaveFile(squashed, './foo/bar/another_file') self.assertHasFile(squashed, './foo/some_file', 'foo') def test_delete_directory_recreate(self): third_layer = self.create_layer( ('foo/some_file', 'foo'), ('foo/another_file', 'bar')) second_layer = self.create_layer( ('foo/', None)) first_layer = self.create_layer( ('foo/some_file', 'baz')) squashed = self.squash_layers([first_layer, second_layer, third_layer]) self.assertHasFile(squashed, 'foo/some_file', 'baz') self.assertDoesNotHaveFile(squashed, 'foo/another_file') def test_delete_directory_prefix(self): third_layer = self.create_layer( ('foobar/some_file', 'foo'), ('foo/another_file', 'bar')) second_layer = self.create_layer( ('foo/', None)) squashed = self.squash_layers([second_layer, third_layer]) self.assertHasFile(squashed, 'foobar/some_file', 'foo') self.assertDoesNotHaveFile(squashed, 'foo/another_file') def test_delete_directory_pre_prefix(self): third_layer = self.create_layer( ('foobar/baz/some_file', 'foo'), ('foo/another_file', 'bar')) second_layer = self.create_layer( ('foo/', None)) squashed = self.squash_layers([second_layer, third_layer]) self.assertHasFile(squashed, 'foobar/baz/some_file', 'foo') self.assertDoesNotHaveFile(squashed, 'foo/another_file') def test_delete_root_directory(self): third_layer = self.create_layer( ('build/first_file', 'foo'), ('build/second_file', 'bar')) second_layer = self.create_layer( ('build', None)) squashed = self.squash_layers([second_layer, third_layer]) self.assertDoesNotHaveFile(squashed, 'build/first_file') self.assertDoesNotHaveFile(squashed, 'build/second_file') def test_tar_empty_layer(self): third_layer = self.create_layer( ('build/first_file', 'foo'), ('build/second_file', 'bar')) empty_layer = self.create_layer() squashed = self.squash_layers([empty_layer, third_layer]) self.assertHasFile(squashed, 'build/first_file', 'foo') self.assertHasFile(squashed, 'build/second_file', 'bar') def test_data_empty_layer(self): third_layer = self.create_layer( ('build/first_file', 'foo'), ('build/second_file', 'bar')) empty_layer = self.create_empty_layer() squashed = self.squash_layers([empty_layer, third_layer]) self.assertHasFile(squashed, 'build/first_file', 'foo') self.assertHasFile(squashed, 'build/second_file', 'bar') def test_broken_layer(self): third_layer = self.create_layer( ('build/first_file', 'foo'), ('build/second_file', 'bar')) broken_layer = 'not valid data' try: self.squash_layers([broken_layer, third_layer]) self.fail('Expected exception') except TarLayerReadException as ex: self.assertEquals('Could not read layer', ex.message) def test_single_layer_with_prefix(self): tar_layer = self.create_layer( ('some_file', 'foo'), ('another_file', 'bar'), ('third_file', 'meh')) squashed = self.squash_layers([tar_layer], path_prefix='foo/') self.assertHasFile(squashed, 'foo/some_file', 'foo') self.assertHasFile(squashed, 'foo/another_file', 'bar') self.assertHasFile(squashed, 'foo/third_file', 'meh') def test_multiple_layers_overwrite_with_prefix(self): second_layer = self.create_layer( ('some_file', 'foo'), ('another_file', 'bar'), ('third_file', 'meh')) first_layer = self.create_layer( ('another_file', 'top')) squashed = self.squash_layers([first_layer, second_layer], path_prefix='foo/') self.assertHasFile(squashed, 'foo/some_file', 'foo') self.assertHasFile(squashed, 'foo/third_file', 'meh') self.assertHasFile(squashed, 'foo/another_file', 'top') def test_superlong_filename(self): tar_layer = self.create_layer( ('this_is_the_filename_that_never_ends_it_goes_on_and_on_my_friend_some_people_started', 'meh')) squashed = self.squash_layers([tar_layer], path_prefix='foo/') self.assertHasFile(squashed, 'foo/this_is_the_filename_that_never_ends_it_goes_on_and_on_my_friend_some_people_started', 'meh') def test_superlong_prefix(self): tar_layer = self.create_layer( ('some_file', 'foo'), ('another_file', 'bar'), ('third_file', 'meh')) squashed = self.squash_layers([tar_layer], path_prefix='foo/bar/baz/something/foo/bar/baz/anotherthing/whatever/this/is/a/really/long/filename/that/goes/here/') self.assertHasFile(squashed, 'foo/bar/baz/something/foo/bar/baz/anotherthing/whatever/this/is/a/really/long/filename/that/goes/here/some_file', 'foo') self.assertHasFile(squashed, 'foo/bar/baz/something/foo/bar/baz/anotherthing/whatever/this/is/a/really/long/filename/that/goes/here/another_file', 'bar') self.assertHasFile(squashed, 'foo/bar/baz/something/foo/bar/baz/anotherthing/whatever/this/is/a/really/long/filename/that/goes/here/third_file', 'meh') def test_hardlink_to_deleted_file(self): first_layer = self.create_layer( ('tobedeletedfile', 'somecontents'), ('link_to_deleted_file', 'linkto:tobedeletedfile'), ('third_file', 'meh')) second_layer = self.create_layer( ('tobedeletedfile', None)) squashed = self.squash_layers([second_layer, first_layer], path_prefix='foo/') self.assertHasFile(squashed, 'foo/third_file', 'meh') self.assertHasFile(squashed, 'foo/link_to_deleted_file', 'somecontents') self.assertDoesNotHaveFile(squashed, 'foo/tobedeletedfile') def test_multiple_hardlink_to_deleted_file(self): first_layer = self.create_layer( ('tobedeletedfile', 'somecontents'), ('link_to_deleted_file', 'linkto:tobedeletedfile'), ('another_link_to_deleted_file', 'linkto:tobedeletedfile'), ('third_file', 'meh')) second_layer = self.create_layer( ('tobedeletedfile', None)) squashed = self.squash_layers([second_layer, first_layer], path_prefix='foo/') self.assertHasFile(squashed, 'foo/third_file', 'meh') self.assertHasFile(squashed, 'foo/link_to_deleted_file', 'somecontents') self.assertHasFile(squashed, 'foo/another_link_to_deleted_file', 'somecontents') self.assertDoesNotHaveFile(squashed, 'foo/tobedeletedfile') if __name__ == '__main__': unittest.main()