From 110366f6565d934207363fec7b45e8036f332465 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 21 Mar 2018 16:05:27 -0400 Subject: [PATCH 1/2] Retarget hard links pointing to deleted files by emitting the deleted file contents under the first hard link instance. This fixes a breakage in the squashed TAR where we were pointing hard links to missing data. Fixes https://jira.coreos.com/browse/QUAY-885 --- endpoints/verbs/__init__.py | 17 +- image/appc/__init__.py | 4 +- image/common.py | 6 +- image/docker/squashed.py | 4 +- test/test_streamlayerformat.py | 245 +++++++++++++++++------------ util/registry/streamlayerformat.py | 73 +++++---- util/registry/tarfileappender.py | 56 ------- util/registry/tarlayerformat.py | 184 +++++++++++++++------- 8 files changed, 337 insertions(+), 252 deletions(-) delete mode 100644 util/registry/tarfileappender.py diff --git a/endpoints/verbs/__init__.py b/endpoints/verbs/__init__.py index bd7798569..7be39fded 100644 --- a/endpoints/verbs/__init__.py +++ b/endpoints/verbs/__init__.py @@ -46,19 +46,24 @@ def _open_stream(formatter, repo_image, tag, derived_image_id, handlers): for current_image in image_list: yield current_image - def get_next_layer(): - # Re-Initialize the storage engine because some may not respond well to forking (e.g. S3) - store = Storage(app, metric_queue, config_provider=config_provider, ip_resolver=ip_resolver) - for current_image in image_list: + def image_stream_getter(store, current_image): + def func(): current_image_path = model.get_blob_path(current_image.blob) current_image_stream = store.stream_read_file(current_image.blob.locations, current_image_path) logger.debug('Returning image layer %s: %s', current_image.image_id, current_image_path) - yield current_image_stream + return current_image_stream + return func + + def tar_stream_getter_iterator(): + # Re-Initialize the storage engine because some may not respond well to forking (e.g. S3) + store = Storage(app, metric_queue, config_provider=config_provider, ip_resolver=ip_resolver) + for current_image in image_list: + yield image_stream_getter(store, current_image) stream = formatter.build_stream(repo_image, tag, derived_image_id, get_next_image, - get_next_layer) + tar_stream_getter_iterator) for handler_fn in handlers: stream = wrap_with_handler(stream, handler_fn) diff --git a/image/appc/__init__.py b/image/appc/__init__.py index 296c9c84e..b9ac26247 100644 --- a/image/appc/__init__.py +++ b/image/appc/__init__.py @@ -19,7 +19,7 @@ class AppCImageFormatter(TarImageFormatter): """ def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator, - get_layer_iterator): + tar_stream_getter_iterator): image_mtime = 0 created = next(get_image_iterator()).v1_metadata.created if created is not None: @@ -40,7 +40,7 @@ class AppCImageFormatter(TarImageFormatter): # Yield the merged layer dtaa. yield self.tar_folder('rootfs', mtime=image_mtime) - layer_merger = StreamLayerMerger(get_layer_iterator, path_prefix='rootfs/') + layer_merger = StreamLayerMerger(tar_stream_getter_iterator, path_prefix='rootfs/') for entry in layer_merger.get_generator(): yield entry diff --git a/image/common.py b/image/common.py index 8d9bfefbc..b075aa0ff 100644 --- a/image/common.py +++ b/image/common.py @@ -8,16 +8,16 @@ class TarImageFormatter(object): """ def build_stream(self, repo_image, tag, synthetic_image_id, get_image_iterator, - get_layer_iterator): + tar_stream_getter_iterator): """ Builds and streams a synthetic .tar.gz that represents the formatted tar created by this class's implementation. """ return GzipWrap(self.stream_generator(repo_image, tag, synthetic_image_id, get_image_iterator, - get_layer_iterator)) + tar_stream_getter_iterator)) def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator, - get_layer_iterator): + tar_stream_getter_iterator): raise NotImplementedError def tar_file(self, name, contents, mtime=None): diff --git a/image/docker/squashed.py b/image/docker/squashed.py index 31370513e..df41d6e6b 100644 --- a/image/docker/squashed.py +++ b/image/docker/squashed.py @@ -29,7 +29,7 @@ class SquashedDockerImageFormatter(TarImageFormatter): SIZE_MULTIPLIER = 1.2 def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator, - get_layer_iterator): + tar_stream_getter_iterator): image_mtime = 0 created = next(get_image_iterator()).v1_metadata.created if created is not None: @@ -42,7 +42,7 @@ class SquashedDockerImageFormatter(TarImageFormatter): # json - The layer JSON # layer.tar - The tarballed contents of the layer # VERSION - The docker import version: '1.0' - layer_merger = StreamLayerMerger(get_layer_iterator) + layer_merger = StreamLayerMerger(tar_stream_getter_iterator) # Yield the repositories file: synthetic_layer_info = {} diff --git a/test/test_streamlayerformat.py b/test/test_streamlayerformat.py index e2df30c92..4bc85226f 100644 --- a/test/test_streamlayerformat.py +++ b/test/test_streamlayerformat.py @@ -2,17 +2,16 @@ import unittest import tarfile from StringIO import StringIO -from util.registry.streamlayerformat import StreamLayerMerger, AUFS_WHITEOUT +from util.registry.streamlayerformat import StreamLayerMerger +from util.registry.aufs import AUFS_WHITEOUT from util.registry.tarlayerformat import TarLayerReadException class TestStreamLayerMerger(unittest.TestCase): - def create_layer(self, **kwargs): + def create_layer(self, *file_pairs): output = StringIO() with tarfile.open(fileobj=output, mode='w:gz') as tar: - for current_contents in kwargs: - current_filename = kwargs[current_contents] - - if current_contents == '_': + for current_filename, current_contents in file_pairs: + if current_contents is None: # This is a deleted file. if current_filename.endswith('/'): current_filename = current_filename[:-1] @@ -25,9 +24,15 @@ class TestStreamLayerMerger(unittest.TestCase): current_contents = '' - info = tarfile.TarInfo(name=current_filename) - info.size = len(current_contents) - tar.addfile(info, fileobj=StringIO(current_contents)) + if current_contents.startswith('linkto:'): + info = tarfile.TarInfo(name=current_filename) + info.linkname = current_contents[len('linkto:'):] + info.type = tarfile.LNKTYPE + tar.addfile(info) + else: + info = tarfile.TarInfo(name=current_filename) + info.size = len(current_contents) + tar.addfile(info, fileobj=StringIO(current_contents)) return output.getvalue() @@ -35,10 +40,13 @@ class TestStreamLayerMerger(unittest.TestCase): return '' def squash_layers(self, layers, path_prefix=None): - def get_layers(): - return [StringIO(layer) for layer in layers] + def getter_for_layer(layer): + return lambda: StringIO(layer) - merger = StreamLayerMerger(get_layers, path_prefix=path_prefix) + def layer_stream_getter(): + return [getter_for_layer(layer) for layer in layers] + + merger = StreamLayerMerger(layer_stream_getter, path_prefix=path_prefix) merged_data = ''.join(merger.get_generator()) return merged_data @@ -58,9 +66,9 @@ class TestStreamLayerMerger(unittest.TestCase): def test_single_layer(self): tar_layer = self.create_layer( - foo = 'some_file', - bar = 'another_file', - meh = 'third_file') + ('some_file', 'foo'), + ('another_file', 'bar'), + ('third_file', 'meh')) squashed = self.squash_layers([tar_layer]) @@ -70,12 +78,12 @@ class TestStreamLayerMerger(unittest.TestCase): def test_multiple_layers(self): second_layer = self.create_layer( - foo = 'some_file', - bar = 'another_file', - meh = 'third_file') + ('some_file', 'foo'), + ('another_file', 'bar'), + ('third_file', 'meh')) first_layer = self.create_layer( - top = 'top_file') + ('top_file', 'top')) squashed = self.squash_layers([first_layer, second_layer]) @@ -86,12 +94,12 @@ class TestStreamLayerMerger(unittest.TestCase): def test_multiple_layers_dot(self): second_layer = self.create_layer( - foo = './some_file', - bar = 'another_file', - meh = './third_file') + ('./some_file', 'foo'), + ('another_file', 'bar'), + ('./third_file', 'meh')) first_layer = self.create_layer( - top = 'top_file') + ('top_file', 'top')) squashed = self.squash_layers([first_layer, second_layer]) @@ -102,12 +110,12 @@ class TestStreamLayerMerger(unittest.TestCase): def test_multiple_layers_overwrite(self): second_layer = self.create_layer( - foo = 'some_file', - bar = 'another_file', - meh = 'third_file') + ('some_file', 'foo'), + ('another_file', 'bar'), + ('third_file', 'meh')) first_layer = self.create_layer( - top = 'another_file') + ('another_file', 'top')) squashed = self.squash_layers([first_layer, second_layer]) @@ -117,12 +125,12 @@ class TestStreamLayerMerger(unittest.TestCase): def test_multiple_layers_overwrite_base_dot(self): second_layer = self.create_layer( - foo = 'some_file', - bar = './another_file', - meh = 'third_file') + ('some_file', 'foo'), + ('./another_file', 'bar'), + ('third_file', 'meh')) first_layer = self.create_layer( - top = 'another_file') + ('another_file', 'top')) squashed = self.squash_layers([first_layer, second_layer]) @@ -133,12 +141,12 @@ class TestStreamLayerMerger(unittest.TestCase): def test_multiple_layers_overwrite_top_dot(self): second_layer = self.create_layer( - foo = 'some_file', - bar = 'another_file', - meh = 'third_file') + ('some_file', 'foo'), + ('another_file', 'bar'), + ('third_file', 'meh')) first_layer = self.create_layer( - top = './another_file') + ('./another_file', 'top')) squashed = self.squash_layers([first_layer, second_layer]) @@ -149,12 +157,12 @@ class TestStreamLayerMerger(unittest.TestCase): def test_deleted_file(self): second_layer = self.create_layer( - foo = 'some_file', - bar = 'another_file', - meh = 'third_file') + ('some_file', 'foo'), + ('another_file', 'bar'), + ('third_file', 'meh')) first_layer = self.create_layer( - _ = 'another_file') + ('another_file', None)) squashed = self.squash_layers([first_layer, second_layer]) @@ -164,15 +172,15 @@ class TestStreamLayerMerger(unittest.TestCase): def test_deleted_readded_file(self): third_layer = self.create_layer( - bar = 'another_file') + ('another_file', 'bar')) second_layer = self.create_layer( - foo = 'some_file', - _ = 'another_file', - meh = 'third_file') + ('some_file', 'foo'), + ('another_file', None), + ('third_file', 'meh')) first_layer = self.create_layer( - newagain = 'another_file') + ('another_file', 'newagain')) squashed = self.squash_layers([first_layer, second_layer, third_layer]) @@ -182,15 +190,15 @@ class TestStreamLayerMerger(unittest.TestCase): def test_deleted_in_lower_layer(self): third_layer = self.create_layer( - bar = 'deleted_file') + ('deleted_file', 'bar')) second_layer = self.create_layer( - foo = 'some_file', - _ = 'deleted_file', - meh = 'third_file') + ('some_file', 'foo'), + ('deleted_file', None), + ('third_file', 'meh')) first_layer = self.create_layer( - top = 'top_file') + ('top_file', 'top')) squashed = self.squash_layers([first_layer, second_layer, third_layer]) @@ -201,31 +209,31 @@ class TestStreamLayerMerger(unittest.TestCase): def test_deleted_in_lower_layer_with_added_dot(self): third_layer = self.create_layer( - something = './deleted_file') + ('./deleted_file', 'something')) second_layer = self.create_layer( - _ = 'deleted_file') + ('deleted_file', None)) squashed = self.squash_layers([second_layer, third_layer]) self.assertDoesNotHaveFile(squashed, 'deleted_file') def test_deleted_in_lower_layer_with_deleted_dot(self): third_layer = self.create_layer( - something = './deleted_file') + ('./deleted_file', 'something')) second_layer = self.create_layer( - _ = './deleted_file') + ('./deleted_file', None)) squashed = self.squash_layers([second_layer, third_layer]) self.assertDoesNotHaveFile(squashed, 'deleted_file') def test_directory(self): second_layer = self.create_layer( - foo = 'foo/some_file', - bar = 'foo/another_file') + ('foo/some_file', 'foo'), + ('foo/another_file', 'bar')) first_layer = self.create_layer( - top = 'foo/some_file') + ('foo/some_file', 'top')) squashed = self.squash_layers([first_layer, second_layer]) @@ -234,11 +242,11 @@ class TestStreamLayerMerger(unittest.TestCase): def test_sub_directory(self): second_layer = self.create_layer( - foo = 'foo/some_file', - bar = 'foo/bar/another_file') + ('foo/some_file', 'foo'), + ('foo/bar/another_file', 'bar')) first_layer = self.create_layer( - top = 'foo/some_file') + ('foo/some_file', 'top')) squashed = self.squash_layers([first_layer, second_layer]) @@ -247,11 +255,11 @@ class TestStreamLayerMerger(unittest.TestCase): def test_delete_directory(self): second_layer = self.create_layer( - foo = 'foo/some_file', - bar = 'foo/another_file') + ('foo/some_file', 'foo'), + ('foo/another_file', 'bar')) first_layer = self.create_layer( - _ = 'foo/') + ('foo/', None)) squashed = self.squash_layers([first_layer, second_layer]) @@ -260,11 +268,11 @@ class TestStreamLayerMerger(unittest.TestCase): def test_delete_sub_directory(self): second_layer = self.create_layer( - foo = 'foo/some_file', - bar = 'foo/bar/another_file') + ('foo/some_file', 'foo'), + ('foo/bar/another_file', 'bar')) first_layer = self.create_layer( - _ = 'foo/bar/') + ('foo/bar/', None)) squashed = self.squash_layers([first_layer, second_layer]) @@ -273,11 +281,11 @@ class TestStreamLayerMerger(unittest.TestCase): def test_delete_sub_directory_with_dot(self): second_layer = self.create_layer( - foo = 'foo/some_file', - bar = 'foo/bar/another_file') + ('foo/some_file', 'foo'), + ('foo/bar/another_file', 'bar')) first_layer = self.create_layer( - _ = './foo/bar/') + ('./foo/bar/', None)) squashed = self.squash_layers([first_layer, second_layer]) @@ -286,11 +294,11 @@ class TestStreamLayerMerger(unittest.TestCase): def test_delete_sub_directory_with_subdot(self): second_layer = self.create_layer( - foo = './foo/some_file', - bar = './foo/bar/another_file') + ('./foo/some_file', 'foo'), + ('./foo/bar/another_file', 'bar')) first_layer = self.create_layer( - _ = 'foo/bar/') + ('foo/bar/', None)) squashed = self.squash_layers([first_layer, second_layer]) @@ -300,14 +308,14 @@ class TestStreamLayerMerger(unittest.TestCase): def test_delete_directory_recreate(self): third_layer = self.create_layer( - foo = 'foo/some_file', - bar = 'foo/another_file') + ('foo/some_file', 'foo'), + ('foo/another_file', 'bar')) second_layer = self.create_layer( - _ = 'foo/') + ('foo/', None)) first_layer = self.create_layer( - baz = 'foo/some_file') + ('foo/some_file', 'baz')) squashed = self.squash_layers([first_layer, second_layer, third_layer]) @@ -316,11 +324,11 @@ class TestStreamLayerMerger(unittest.TestCase): def test_delete_directory_prefix(self): third_layer = self.create_layer( - foo = 'foobar/some_file', - bar = 'foo/another_file') + ('foobar/some_file', 'foo'), + ('foo/another_file', 'bar')) second_layer = self.create_layer( - _ = 'foo/') + ('foo/', None)) squashed = self.squash_layers([second_layer, third_layer]) @@ -330,11 +338,11 @@ class TestStreamLayerMerger(unittest.TestCase): def test_delete_directory_pre_prefix(self): third_layer = self.create_layer( - foo = 'foobar/baz/some_file', - bar = 'foo/another_file') + ('foobar/baz/some_file', 'foo'), + ('foo/another_file', 'bar')) second_layer = self.create_layer( - _ = 'foo/') + ('foo/', None)) squashed = self.squash_layers([second_layer, third_layer]) @@ -344,11 +352,11 @@ class TestStreamLayerMerger(unittest.TestCase): def test_delete_root_directory(self): third_layer = self.create_layer( - foo = 'build/first_file', - bar = 'build/second_file') + ('build/first_file', 'foo'), + ('build/second_file', 'bar')) second_layer = self.create_layer( - _ = 'build') + ('build', None)) squashed = self.squash_layers([second_layer, third_layer]) @@ -358,8 +366,8 @@ class TestStreamLayerMerger(unittest.TestCase): def test_tar_empty_layer(self): third_layer = self.create_layer( - foo = 'build/first_file', - bar = 'build/second_file') + ('build/first_file', 'foo'), + ('build/second_file', 'bar')) empty_layer = self.create_layer() @@ -371,8 +379,8 @@ class TestStreamLayerMerger(unittest.TestCase): def test_data_empty_layer(self): third_layer = self.create_layer( - foo = 'build/first_file', - bar = 'build/second_file') + ('build/first_file', 'foo'), + ('build/second_file', 'bar')) empty_layer = self.create_empty_layer() @@ -384,8 +392,8 @@ class TestStreamLayerMerger(unittest.TestCase): def test_broken_layer(self): third_layer = self.create_layer( - foo = 'build/first_file', - bar = 'build/second_file') + ('build/first_file', 'foo'), + ('build/second_file', 'bar')) broken_layer = 'not valid data' @@ -397,9 +405,9 @@ class TestStreamLayerMerger(unittest.TestCase): def test_single_layer_with_prefix(self): tar_layer = self.create_layer( - foo = 'some_file', - bar = 'another_file', - meh = 'third_file') + ('some_file', 'foo'), + ('another_file', 'bar'), + ('third_file', 'meh')) squashed = self.squash_layers([tar_layer], path_prefix='foo/') @@ -409,12 +417,12 @@ class TestStreamLayerMerger(unittest.TestCase): def test_multiple_layers_overwrite_with_prefix(self): second_layer = self.create_layer( - foo = 'some_file', - bar = 'another_file', - meh = 'third_file') + ('some_file', 'foo'), + ('another_file', 'bar'), + ('third_file', 'meh')) first_layer = self.create_layer( - top = 'another_file') + ('another_file', 'top')) squashed = self.squash_layers([first_layer, second_layer], path_prefix='foo/') @@ -425,7 +433,7 @@ class TestStreamLayerMerger(unittest.TestCase): def test_superlong_filename(self): tar_layer = self.create_layer( - meh = 'this_is_the_filename_that_never_ends_it_goes_on_and_on_my_friend_some_people_started') + ('this_is_the_filename_that_never_ends_it_goes_on_and_on_my_friend_some_people_started', 'meh')) squashed = self.squash_layers([tar_layer], path_prefix='foo/') @@ -435,9 +443,9 @@ class TestStreamLayerMerger(unittest.TestCase): def test_superlong_prefix(self): tar_layer = self.create_layer( - foo = 'some_file', - bar = 'another_file', - meh = 'third_file') + ('some_file', 'foo'), + ('another_file', 'bar'), + ('third_file', 'meh')) squashed = self.squash_layers([tar_layer], path_prefix='foo/bar/baz/something/foo/bar/baz/anotherthing/whatever/this/is/a/really/long/filename/that/goes/here/') @@ -447,5 +455,40 @@ class TestStreamLayerMerger(unittest.TestCase): self.assertHasFile(squashed, 'foo/bar/baz/something/foo/bar/baz/anotherthing/whatever/this/is/a/really/long/filename/that/goes/here/third_file', 'meh') + def test_hardlink_to_deleted_file(self): + first_layer = self.create_layer( + ('tobedeletedfile', 'somecontents'), + ('link_to_deleted_file', 'linkto:tobedeletedfile'), + ('third_file', 'meh')) + + second_layer = self.create_layer( + ('tobedeletedfile', None)) + + squashed = self.squash_layers([second_layer, first_layer], path_prefix='foo/') + + self.assertHasFile(squashed, 'foo/third_file', 'meh') + self.assertHasFile(squashed, 'foo/link_to_deleted_file', 'somecontents') + self.assertDoesNotHaveFile(squashed, 'foo/tobedeletedfile') + + + def test_multiple_hardlink_to_deleted_file(self): + first_layer = self.create_layer( + ('tobedeletedfile', 'somecontents'), + ('link_to_deleted_file', 'linkto:tobedeletedfile'), + ('another_link_to_deleted_file', 'linkto:tobedeletedfile'), + ('third_file', 'meh')) + + second_layer = self.create_layer( + ('tobedeletedfile', None)) + + squashed = self.squash_layers([second_layer, first_layer], path_prefix='foo/') + + self.assertHasFile(squashed, 'foo/third_file', 'meh') + self.assertHasFile(squashed, 'foo/link_to_deleted_file', 'somecontents') + self.assertHasFile(squashed, 'foo/another_link_to_deleted_file', 'somecontents') + + self.assertDoesNotHaveFile(squashed, 'foo/tobedeletedfile') + + if __name__ == '__main__': unittest.main() diff --git a/util/registry/streamlayerformat.py b/util/registry/streamlayerformat.py index 15db6a3a4..d21c61401 100644 --- a/util/registry/streamlayerformat.py +++ b/util/registry/streamlayerformat.py @@ -1,53 +1,70 @@ -import marisa_trie import os +import tarfile + +import marisa_trie + from util.registry.aufs import is_aufs_metadata, get_deleted_prefix from util.registry.tarlayerformat import TarLayerFormat -AUFS_METADATA = u'.wh..wh.' - -AUFS_WHITEOUT = u'.wh.' -AUFS_WHITEOUT_PREFIX_LENGTH = len(AUFS_WHITEOUT) - class StreamLayerMerger(TarLayerFormat): """ Class which creates a generator of the combined TAR data for a set of Docker layers. """ - def __init__(self, layer_iterator, path_prefix=None): - super(StreamLayerMerger, self).__init__(layer_iterator, path_prefix) + def __init__(self, get_tar_stream_iterator, path_prefix=None): + super(StreamLayerMerger, self).__init__(get_tar_stream_iterator, path_prefix) self.path_trie = marisa_trie.Trie() - self.path_encountered = [] + self.path_encountered = set() - self.prefix_trie = marisa_trie.Trie() - self.prefix_encountered = [] + self.deleted_prefix_trie = marisa_trie.Trie() + self.deleted_prefixes_encountered = set() - def after_tar_layer(self, current_layer): - # Update the tries. - self.path_trie = marisa_trie.Trie(self.path_encountered) - self.prefix_trie = marisa_trie.Trie(self.prefix_encountered) + def after_tar_layer(self): + # Update the tries. + self.path_trie = marisa_trie.Trie(self.path_encountered) + self.deleted_prefix_trie = marisa_trie.Trie(self.deleted_prefixes_encountered) - def check_tar_info(self, tar_info): - absolute = os.path.relpath(tar_info.name.decode('utf-8'), './') + @staticmethod + def _normalize_path(path): + return os.path.relpath(path.decode('utf-8'), './') + + def _check_deleted(self, absolute): + ubsolute = unicode(absolute) + for prefix in self.deleted_prefix_trie.iter_prefixes(ubsolute): + if not os.path.relpath(ubsolute, prefix).startswith('..'): + return True + + return False + + def is_skipped_file(self, filename): + absolute = StreamLayerMerger._normalize_path(filename) # Skip metadata. if is_aufs_metadata(absolute): - return False + return True - # Add any prefix of deleted paths to the prefix list. - deleted_prefix = get_deleted_prefix(absolute) - if deleted_prefix is not None: - self.prefix_encountered.append(deleted_prefix) - return False + # Check if the file is under a deleted path. + if self._check_deleted(absolute): + return True # Check if this file has already been encountered somewhere. If so, # skip it. ubsolute = unicode(absolute) if ubsolute in self.path_trie: + return True + + return False + + def should_append_file(self, filename): + if self.is_skipped_file(filename): return False - # Check if this file is under a deleted path. - for prefix in self.prefix_trie.iter_prefixes(ubsolute): - if not os.path.relpath(ubsolute, prefix).startswith('..'): - return False + absolute = StreamLayerMerger._normalize_path(filename) + + # Add any prefix of deleted paths to the prefix list. + deleted_prefix = get_deleted_prefix(absolute) + if deleted_prefix is not None: + self.deleted_prefixes_encountered.add(deleted_prefix) + return False # Otherwise, add the path to the encountered list and return it. - self.path_encountered.append(absolute) + self.path_encountered.add(absolute) return True diff --git a/util/registry/tarfileappender.py b/util/registry/tarfileappender.py deleted file mode 100644 index 1dcf9b05d..000000000 --- a/util/registry/tarfileappender.py +++ /dev/null @@ -1,56 +0,0 @@ -import tarfile - -from cStringIO import StringIO - -from util.registry.tarlayerformat import TarLayerFormat -from util.registry.gzipwrap import GzipWrap - -class TarfileAppender(TarLayerFormat): - """ Helper class which allows for appending entries to a gzipped-tarfile and doing so - in a streaming manner. - """ - def __init__(self, base_tar_file, entries): - super(TarfileAppender, self).__init__(self._get_tar_iterator) - self.entries = entries - self.base_tar_file = base_tar_file - self.first_info = None - - def get_stream(self): - return GzipWrap(self.get_generator()) - - def after_tar_layer(self, current_layer): - pass - - def check_tar_info(self, tar_info): - if not self.first_info: - self.first_info = tar_info - return True - - def _get_tar_iterator(self): - # Yield the contents of the base tar. - yield self.base_tar_file - - # Construct an in-memory tar containing the entries to append, and then yield - # its data. - def add_entry(arch, dir_path, contents=None): - info = tarfile.TarInfo(dir_path) - info.uid = self.first_info.uid - info.gid = self.first_info.gid - info.mode = self.first_info.mode - info.mtime = self.first_info.mtime - - info.type = tarfile.REGTYPE if contents else tarfile.DIRTYPE - - if contents: - info.size = len(contents) - - arch.addfile(info, fileobj=StringIO(contents) if contents else None) - - append_tarball = StringIO() - with tarfile.open(fileobj=append_tarball, mode='w') as updated_archive: - for entry in self.entries: - add_entry(updated_archive, entry, self.entries[entry]) - - # To make tarfile happy. - append_tarball.seek(0) - yield append_tarball diff --git a/util/registry/tarlayerformat.py b/util/registry/tarlayerformat.py index 111e0f731..e6c02e551 100644 --- a/util/registry/tarlayerformat.py +++ b/util/registry/tarlayerformat.py @@ -2,93 +2,169 @@ import os import tarfile import copy +from abc import ABCMeta, abstractmethod +from collections import defaultdict +from six import add_metaclass + + class TarLayerReadException(Exception): """ Exception raised when reading a layer has failed. """ pass +# 9MB (+ padding below) so that it matches the 10MB expected by Gzip. +CHUNK_SIZE = 1024 * 1024 * 9 + + +@add_metaclass(ABCMeta) class TarLayerFormat(object): """ Class which creates a generator of the combined TAR data. """ - def __init__(self, tar_iterator, path_prefix=None): - self.tar_iterator = tar_iterator - self.path_prefix = path_prefix + def __init__(self, tar_stream_getter_iterator, path_prefix=None): + self.tar_stream_getter_iterator = tar_stream_getter_iterator + self.path_prefix = path_prefix or '' def get_generator(self): - for current_tar in self.tar_iterator(): + for stream_getter in self.tar_stream_getter_iterator(): + current_tar_stream = stream_getter() + # Read the current TAR. If it is empty, we just continue # to the next one. - tar_file = None - try: - tar_file = tarfile.open(mode='r|*', fileobj=current_tar) - except tarfile.ReadError as re: - if re.message != 'empty file': - raise TarLayerReadException('Could not read layer') - + tar_file = TarLayerFormat._tar_file_from_stream(current_tar_stream) if not tar_file: continue # For each of the tar entries, yield them IF and ONLY IF we have not # encountered the path before. - - # 9MB (+ padding below) so that it matches the 10MB expected by Gzip. - chunk_size = 1024 * 1024 * 9 - + dangling_hard_links = defaultdict(list) for tar_info in tar_file: - if not self.check_tar_info(tar_info): + if not self.should_append_file(tar_info.name): continue + # Note: We use a copy here because we need to make sure we copy over all the internal + # data of the tar header. We cannot use frombuf(tobuf()), however, because it doesn't + # properly handle large filenames. + clone = copy.deepcopy(tar_info) + clone.name = os.path.join(self.path_prefix, clone.name) + + # If the entry is a *hard* link, then prefix it as well. Soft links are relative. + if clone.linkname and clone.type == tarfile.LNKTYPE: + # If the entry is a dangling hard link, we skip here. Dangling hard links will be handled + # in a second pass. + if self.is_skipped_file(tar_info.linkname): + dangling_hard_links[tar_info.linkname].append(tar_info) + continue + + clone.linkname = os.path.join(self.path_prefix, clone.linkname) + # Yield the tar header. - if self.path_prefix: - # Note: We use a copy here because we need to make sure we copy over all the internal - # data of the tar header. We cannot use frombuf(tobuf()), however, because it doesn't - # properly handle large filenames. - clone = copy.deepcopy(tar_info) - clone.name = os.path.join(self.path_prefix, clone.name) - - # If the entry is a *hard* link, then prefix it as well. Soft links are relative. - if clone.linkname and clone.type == tarfile.LNKTYPE: - clone.linkname = os.path.join(self.path_prefix, clone.linkname) - - yield clone.tobuf() - else: - yield tar_info.tobuf() + yield clone.tobuf() # Try to extract any file contents for the tar. If found, we yield them as well. if tar_info.isreg(): - file_stream = tar_file.extractfile(tar_info) - if file_stream is not None: - length = 0 - while True: - current_block = file_stream.read(chunk_size) - if not len(current_block): - break - - yield current_block - length += len(current_block) - - file_stream.close() - - # Files must be padding to 512 byte multiples. - if length % 512 != 0: - yield '\0' * (512 - (length % 512)) + for block in TarLayerFormat._emit_file(tar_file, tar_info): + yield block # Close the layer stream now that we're done with it. tar_file.close() + # If there are any dangling hard links, open a new stream and retarget the dangling hard + # links to a new copy of the contents, which will be placed under the *first* dangling hard + # link's name. + if len(dangling_hard_links) > 0: + tar_file = TarLayerFormat._tar_file_from_stream(stream_getter()) + if not tar_file: + raise TarLayerReadException('Could not re-read tar layer') + + for tar_info in tar_file: + # If we encounter a file that holds the data for a dangling link, + # emit it under the name of the first dangling hard link. All other + # dangling hard links will be retargeted to this first name. + if tar_info.name in dangling_hard_links: + first_dangling = dangling_hard_links[tar_info.name][0] + + # Copy the first dangling hard link, change it to a normal file, + # and emit the deleted file's contents for it. + clone = copy.deepcopy(first_dangling) + clone.name = os.path.join(self.path_prefix, first_dangling.name) + clone.type = tar_info.type + clone.size = tar_info.size + clone.pax_headers = tar_info.pax_headers + yield clone.tobuf() + + for block in TarLayerFormat._emit_file(tar_file, tar_info): + yield block + + elif (tar_info.type == tarfile.LNKTYPE and + tar_info.linkname in dangling_hard_links and + not self.is_skipped_file(tar_info.name)): + # Retarget if necessary. All dangling hard links (but the first) will + # need to be retargeted. + first_dangling = dangling_hard_links[tar_info.linkname][0] + if tar_info.name == first_dangling.name: + # Skip; the first dangling is handled above. + continue + + # Retarget the hard link to the first dangling hard link. + clone = copy.deepcopy(tar_info) + clone.name = os.path.join(self.path_prefix, clone.name) + clone.linkname = os.path.join(self.path_prefix, first_dangling.name) + yield clone.tobuf() + + # Close the layer stream now that we're done with it. + tar_file.close() + # Conduct any post-tar work. - self.after_tar_layer(current_tar) + self.after_tar_layer() # Last two records are empty in TAR spec. yield '\0' * 512 yield '\0' * 512 - - def check_tar_info(self, tar_info): - """ Returns true if the current tar_info should be added to the combined tar. False - otherwise. + @abstractmethod + def is_skipped_file(self, filename): + """ Returns true if the file with the given name will be skipped during append. """ - raise NotImplementedError() + pass - def after_tar_layer(self, current_tar): + @abstractmethod + def should_append_file(self, filename): + """ Returns true if the file with the given name should be appended when producing + the new TAR. + """ + pass + + @abstractmethod + def after_tar_layer(self): """ Invoked after a TAR layer is added, to do any post-add work. """ - raise NotImplementedError() + pass + + @staticmethod + def _tar_file_from_stream(stream): + tar_file = None + try: + tar_file = tarfile.open(mode='r|*', fileobj=stream) + except tarfile.ReadError as re: + if re.message != 'empty file': + raise TarLayerReadException('Could not read layer') + + return tar_file + + @staticmethod + def _emit_file(tar_file, tar_info): + file_stream = tar_file.extractfile(tar_info) + if file_stream is not None: + length = 0 + while True: + current_block = file_stream.read(CHUNK_SIZE) + if not len(current_block): + break + + yield current_block + length += len(current_block) + + file_stream.close() + + # Files must be padding to 512 byte multiples. + if length % 512 != 0: + yield '\0' * (512 - (length % 512)) + \ No newline at end of file From dd470bdc9dbdc396ac9689d3afe50f7ab75ac99f Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 23 Mar 2018 14:39:38 -0400 Subject: [PATCH 2/2] Add a reporter for verbs to track number of storage streams are needed --- endpoints/verbs/__init__.py | 20 +++++++++++++++----- image/appc/__init__.py | 5 +++-- image/common.py | 4 ++-- image/docker/squashed.py | 4 ++-- util/metrics/metricqueue.py | 3 +++ util/registry/streamlayerformat.py | 4 ++-- util/registry/tarlayerformat.py | 17 ++++++++++++++++- 7 files changed, 43 insertions(+), 14 deletions(-) diff --git a/endpoints/verbs/__init__.py b/endpoints/verbs/__init__.py index 7be39fded..7a13c54d3 100644 --- a/endpoints/verbs/__init__.py +++ b/endpoints/verbs/__init__.py @@ -21,6 +21,7 @@ from util.http import exact_abort from util.registry.filelike import wrap_with_handler from util.registry.queuefile import QueueFile from util.registry.queueprocess import QueueProcess +from util.registry.tarlayerformat import TarLayerFormatterReporter from util.registry.torrent import ( make_torrent, per_user_torrent_filename, public_torrent_filename, PieceHasher) @@ -32,7 +33,15 @@ license_validator.enforce_license_before_request(verbs) LAYER_MIMETYPE = 'binary/octet-stream' -def _open_stream(formatter, repo_image, tag, derived_image_id, handlers): +class VerbReporter(TarLayerFormatterReporter): + def __init__(self, kind): + self.kind = kind + + def report_pass(self, pass_count): + metric_queue.verb_action_passes.Inc(labelvalues=[self.kind, pass_count]) + + +def _open_stream(formatter, repo_image, tag, derived_image_id, handlers, reporter): """ This method generates a stream of data which will be replicated and read from the queue files. This method runs in a separate process. @@ -47,14 +56,14 @@ def _open_stream(formatter, repo_image, tag, derived_image_id, handlers): yield current_image def image_stream_getter(store, current_image): - def func(): + def get_stream_for_storage(): current_image_path = model.get_blob_path(current_image.blob) current_image_stream = store.stream_read_file(current_image.blob.locations, current_image_path) logger.debug('Returning image layer %s: %s', current_image.image_id, current_image_path) return current_image_stream - return func + return get_stream_for_storage def tar_stream_getter_iterator(): # Re-Initialize the storage engine because some may not respond well to forking (e.g. S3) @@ -63,7 +72,7 @@ def _open_stream(formatter, repo_image, tag, derived_image_id, handlers): yield image_stream_getter(store, current_image) stream = formatter.build_stream(repo_image, tag, derived_image_id, get_next_image, - tar_stream_getter_iterator) + tar_stream_getter_iterator, reporter=reporter) for handler_fn in handlers: stream = wrap_with_handler(stream, handler_fn) @@ -286,7 +295,8 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker= # Create a queue process to generate the data. The queue files will read from the process # and send the results to the client and storage. handlers = [hasher.update] - args = (formatter, repo_image, tag, derived_image_id, handlers) + reporter = VerbReporter(verb) + args = (formatter, repo_image, tag, derived_image_id, handlers, reporter) queue_process = QueueProcess( _open_stream, 8 * 1024, diff --git a/image/appc/__init__.py b/image/appc/__init__.py index b9ac26247..91730f923 100644 --- a/image/appc/__init__.py +++ b/image/appc/__init__.py @@ -19,7 +19,7 @@ class AppCImageFormatter(TarImageFormatter): """ def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator, - tar_stream_getter_iterator): + tar_stream_getter_iterator, reporter=None): image_mtime = 0 created = next(get_image_iterator()).v1_metadata.created if created is not None: @@ -40,7 +40,8 @@ class AppCImageFormatter(TarImageFormatter): # Yield the merged layer dtaa. yield self.tar_folder('rootfs', mtime=image_mtime) - layer_merger = StreamLayerMerger(tar_stream_getter_iterator, path_prefix='rootfs/') + layer_merger = StreamLayerMerger(tar_stream_getter_iterator, path_prefix='rootfs/', + reporter=reporter) for entry in layer_merger.get_generator(): yield entry diff --git a/image/common.py b/image/common.py index b075aa0ff..356288316 100644 --- a/image/common.py +++ b/image/common.py @@ -8,13 +8,13 @@ class TarImageFormatter(object): """ def build_stream(self, repo_image, tag, synthetic_image_id, get_image_iterator, - tar_stream_getter_iterator): + tar_stream_getter_iterator, reporter=None): """ Builds and streams a synthetic .tar.gz that represents the formatted tar created by this class's implementation. """ return GzipWrap(self.stream_generator(repo_image, tag, synthetic_image_id, get_image_iterator, - tar_stream_getter_iterator)) + tar_stream_getter_iterator, reporter=reporter)) def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator, tar_stream_getter_iterator): diff --git a/image/docker/squashed.py b/image/docker/squashed.py index df41d6e6b..5a52c7109 100644 --- a/image/docker/squashed.py +++ b/image/docker/squashed.py @@ -29,7 +29,7 @@ class SquashedDockerImageFormatter(TarImageFormatter): SIZE_MULTIPLIER = 1.2 def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator, - tar_stream_getter_iterator): + tar_stream_getter_iterator, reporter=None): image_mtime = 0 created = next(get_image_iterator()).v1_metadata.created if created is not None: @@ -42,7 +42,7 @@ class SquashedDockerImageFormatter(TarImageFormatter): # json - The layer JSON # layer.tar - The tarballed contents of the layer # VERSION - The docker import version: '1.0' - layer_merger = StreamLayerMerger(tar_stream_getter_iterator) + layer_merger = StreamLayerMerger(tar_stream_getter_iterator, reporter=reporter) # Yield the repositories file: synthetic_layer_info = {} diff --git a/util/metrics/metricqueue.py b/util/metrics/metricqueue.py index 6348ec6d2..2ebf95e1c 100644 --- a/util/metrics/metricqueue.py +++ b/util/metrics/metricqueue.py @@ -114,6 +114,9 @@ class MetricQueue(object): 'Invalid registry instance key count', labelnames=['key_id']) + self.verb_action_passes = prom.create_counter('verb_action_passes', 'Verb Pass Count', + labelnames=['kind', 'pass_count']) + # Deprecated: Define an in-memory queue for reporting metrics to CloudWatch or another # provider. self._queue = None diff --git a/util/registry/streamlayerformat.py b/util/registry/streamlayerformat.py index d21c61401..f6330ef4c 100644 --- a/util/registry/streamlayerformat.py +++ b/util/registry/streamlayerformat.py @@ -8,8 +8,8 @@ from util.registry.tarlayerformat import TarLayerFormat class StreamLayerMerger(TarLayerFormat): """ Class which creates a generator of the combined TAR data for a set of Docker layers. """ - def __init__(self, get_tar_stream_iterator, path_prefix=None): - super(StreamLayerMerger, self).__init__(get_tar_stream_iterator, path_prefix) + def __init__(self, get_tar_stream_iterator, path_prefix=None, reporter=None): + super(StreamLayerMerger, self).__init__(get_tar_stream_iterator, path_prefix, reporter=reporter) self.path_trie = marisa_trie.Trie() self.path_encountered = set() diff --git a/util/registry/tarlayerformat.py b/util/registry/tarlayerformat.py index e6c02e551..5c114519f 100644 --- a/util/registry/tarlayerformat.py +++ b/util/registry/tarlayerformat.py @@ -6,6 +6,7 @@ from abc import ABCMeta, abstractmethod from collections import defaultdict from six import add_metaclass +from util.abchelpers import nooper class TarLayerReadException(Exception): """ Exception raised when reading a layer has failed. """ @@ -15,13 +16,26 @@ class TarLayerReadException(Exception): # 9MB (+ padding below) so that it matches the 10MB expected by Gzip. CHUNK_SIZE = 1024 * 1024 * 9 +@add_metaclass(ABCMeta) +class TarLayerFormatterReporter(object): + @abstractmethod + def report_pass(self, stream_count): + """ Reports a formatting pass. """ + pass + + +@nooper +class NoopReporter(TarLayerFormatterReporter): + pass + @add_metaclass(ABCMeta) class TarLayerFormat(object): """ Class which creates a generator of the combined TAR data. """ - def __init__(self, tar_stream_getter_iterator, path_prefix=None): + def __init__(self, tar_stream_getter_iterator, path_prefix=None, reporter=None): self.tar_stream_getter_iterator = tar_stream_getter_iterator self.path_prefix = path_prefix or '' + self.reporter = reporter or NoopReporter() def get_generator(self): for stream_getter in self.tar_stream_getter_iterator(): @@ -115,6 +129,7 @@ class TarLayerFormat(object): # Conduct any post-tar work. self.after_tar_layer() + self.reporter.report_pass(2 if len(dangling_hard_links) > 0 else 1) # Last two records are empty in TAR spec. yield '\0' * 512