import unittest import tarfile from StringIO import StringIO from util.streamlayerformat import StreamLayerMerger, AUFS_WHITEOUT class TestStreamLayerMerger(unittest.TestCase): def create_layer(self, **kwargs): output = StringIO() with tarfile.open(fileobj=output, mode='w:gz') as tar: for current_contents in kwargs: current_filename = kwargs[current_contents] if current_contents == '_': # This is a deleted file. if current_filename.endswith('/'): current_filename = current_filename[:-1] parts = current_filename.split('/') if len(parts) > 1: current_filename = '/'.join(parts[:-1]) + '/' + AUFS_WHITEOUT + parts[-1] else: current_filename = AUFS_WHITEOUT + parts[-1] current_contents = '' info = tarfile.TarInfo(name=current_filename) info.size = len(current_contents) tar.addfile(info, fileobj=StringIO(current_contents)) return output.getvalue() def squash_layers(self, layers): def get_layers(): return [StringIO(layer) for layer in layers] merger = StreamLayerMerger(get_layers) merged_data = ''.join(merger.get_generator()) return merged_data def assertHasFile(self, squashed, filename, contents): with tarfile.open(fileobj=StringIO(squashed), mode='r:*') as tar: member = tar.getmember(filename) self.assertEquals(contents, '\n'.join(tar.extractfile(member).readlines())) def assertDoesNotHaveFile(self, squashed, filename): with tarfile.open(fileobj=StringIO(squashed), mode='r:*') as tar: try: member = tar.getmember(filename) except Exception as ex: return self.fail('Filename %s found' % filename) def test_single_layer(self): tar_layer = self.create_layer( foo = 'some_file', bar = 'another_file', meh = 'third_file') squashed = self.squash_layers([tar_layer]) self.assertHasFile(squashed, 'some_file', 'foo') self.assertHasFile(squashed, 'another_file', 'bar') self.assertHasFile(squashed, 'third_file', 'meh') def test_multiple_layers(self): second_layer = self.create_layer( foo = 'some_file', bar = 'another_file', meh = 'third_file') first_layer = self.create_layer( top = 'top_file') squashed = self.squash_layers([first_layer, second_layer]) self.assertHasFile(squashed, 'some_file', 'foo') self.assertHasFile(squashed, 'another_file', 'bar') self.assertHasFile(squashed, 'third_file', 'meh') self.assertHasFile(squashed, 'top_file', 'top') def test_multiple_layers_dot(self): second_layer = self.create_layer( foo = './some_file', bar = 'another_file', meh = './third_file') first_layer = self.create_layer( top = 'top_file') squashed = self.squash_layers([first_layer, second_layer]) self.assertHasFile(squashed, './some_file', 'foo') self.assertHasFile(squashed, 'another_file', 'bar') self.assertHasFile(squashed, './third_file', 'meh') self.assertHasFile(squashed, 'top_file', 'top') def test_multiple_layers_overwrite(self): second_layer = self.create_layer( foo = 'some_file', bar = 'another_file', meh = 'third_file') first_layer = self.create_layer( top = 'another_file') squashed = self.squash_layers([first_layer, second_layer]) self.assertHasFile(squashed, 'some_file', 'foo') self.assertHasFile(squashed, 'third_file', 'meh') self.assertHasFile(squashed, 'another_file', 'top') def test_multiple_layers_overwrite_base_dot(self): second_layer = self.create_layer( foo = 'some_file', bar = './another_file', meh = 'third_file') first_layer = self.create_layer( top = 'another_file') squashed = self.squash_layers([first_layer, second_layer]) self.assertHasFile(squashed, 'some_file', 'foo') self.assertHasFile(squashed, 'third_file', 'meh') self.assertHasFile(squashed, 'another_file', 'top') self.assertDoesNotHaveFile(squashed, './another_file') def test_multiple_layers_overwrite_top_dot(self): second_layer = self.create_layer( foo = 'some_file', bar = 'another_file', meh = 'third_file') first_layer = self.create_layer( top = './another_file') squashed = self.squash_layers([first_layer, second_layer]) self.assertHasFile(squashed, 'some_file', 'foo') self.assertHasFile(squashed, 'third_file', 'meh') self.assertHasFile(squashed, './another_file', 'top') self.assertDoesNotHaveFile(squashed, 'another_file') def test_deleted_file(self): second_layer = self.create_layer( foo = 'some_file', bar = 'another_file', meh = 'third_file') first_layer = self.create_layer( _ = 'another_file') squashed = self.squash_layers([first_layer, second_layer]) self.assertHasFile(squashed, 'some_file', 'foo') self.assertHasFile(squashed, 'third_file', 'meh') self.assertDoesNotHaveFile(squashed, 'another_file') def test_deleted_readded_file(self): third_layer = self.create_layer( bar = 'another_file') second_layer = self.create_layer( foo = 'some_file', _ = 'another_file', meh = 'third_file') first_layer = self.create_layer( newagain = 'another_file') squashed = self.squash_layers([first_layer, second_layer, third_layer]) self.assertHasFile(squashed, 'some_file', 'foo') self.assertHasFile(squashed, 'third_file', 'meh') self.assertHasFile(squashed, 'another_file', 'newagain') def test_deleted_in_lower_layer(self): third_layer = self.create_layer( bar = 'deleted_file') second_layer = self.create_layer( foo = 'some_file', _ = 'deleted_file', meh = 'third_file') first_layer = self.create_layer( top = 'top_file') squashed = self.squash_layers([first_layer, second_layer, third_layer]) self.assertHasFile(squashed, 'some_file', 'foo') self.assertHasFile(squashed, 'third_file', 'meh') self.assertHasFile(squashed, 'top_file', 'top') self.assertDoesNotHaveFile(squashed, 'deleted_file') def test_deleted_in_lower_layer_with_added_dot(self): third_layer = self.create_layer( something = './deleted_file') second_layer = self.create_layer( _ = 'deleted_file') squashed = self.squash_layers([second_layer, third_layer]) self.assertDoesNotHaveFile(squashed, 'deleted_file') def test_deleted_in_lower_layer_with_deleted_dot(self): third_layer = self.create_layer( something = './deleted_file') second_layer = self.create_layer( _ = './deleted_file') squashed = self.squash_layers([second_layer, third_layer]) self.assertDoesNotHaveFile(squashed, 'deleted_file') def test_directory(self): second_layer = self.create_layer( foo = 'foo/some_file', bar = 'foo/another_file') first_layer = self.create_layer( top = 'foo/some_file') squashed = self.squash_layers([first_layer, second_layer]) self.assertHasFile(squashed, 'foo/some_file', 'top') self.assertHasFile(squashed, 'foo/another_file', 'bar') def test_sub_directory(self): second_layer = self.create_layer( foo = 'foo/some_file', bar = 'foo/bar/another_file') first_layer = self.create_layer( top = 'foo/some_file') squashed = self.squash_layers([first_layer, second_layer]) self.assertHasFile(squashed, 'foo/some_file', 'top') self.assertHasFile(squashed, 'foo/bar/another_file', 'bar') def test_delete_directory(self): second_layer = self.create_layer( foo = 'foo/some_file', bar = 'foo/another_file') first_layer = self.create_layer( _ = 'foo/') squashed = self.squash_layers([first_layer, second_layer]) self.assertDoesNotHaveFile(squashed, 'foo/some_file') self.assertDoesNotHaveFile(squashed, 'foo/another_file') def test_delete_sub_directory(self): second_layer = self.create_layer( foo = 'foo/some_file', bar = 'foo/bar/another_file') first_layer = self.create_layer( _ = 'foo/bar/') squashed = self.squash_layers([first_layer, second_layer]) self.assertDoesNotHaveFile(squashed, 'foo/bar/another_file') self.assertHasFile(squashed, 'foo/some_file', 'foo') def test_delete_sub_directory_with_dot(self): second_layer = self.create_layer( foo = 'foo/some_file', bar = 'foo/bar/another_file') first_layer = self.create_layer( _ = './foo/bar/') squashed = self.squash_layers([first_layer, second_layer]) self.assertDoesNotHaveFile(squashed, 'foo/bar/another_file') self.assertHasFile(squashed, 'foo/some_file', 'foo') def test_delete_sub_directory_with_subdot(self): second_layer = self.create_layer( foo = './foo/some_file', bar = './foo/bar/another_file') first_layer = self.create_layer( _ = 'foo/bar/') squashed = self.squash_layers([first_layer, second_layer]) self.assertDoesNotHaveFile(squashed, 'foo/bar/another_file') self.assertDoesNotHaveFile(squashed, './foo/bar/another_file') self.assertHasFile(squashed, './foo/some_file', 'foo') def test_delete_directory_recreate(self): third_layer = self.create_layer( foo = 'foo/some_file', bar = 'foo/another_file') second_layer = self.create_layer( _ = 'foo/') first_layer = self.create_layer( baz = 'foo/some_file') squashed = self.squash_layers([first_layer, second_layer, third_layer]) self.assertHasFile(squashed, 'foo/some_file', 'baz') self.assertDoesNotHaveFile(squashed, 'foo/another_file') if __name__ == '__main__': unittest.main()