Merge pull request #3032 from coreos-inc/joseph.schorr/QUAY-885/squashed-sym

Retarget broken hard links in squashed images
This commit is contained in:
josephschorr 2018-03-26 17:59:52 -04:00 committed by GitHub
commit 323eb63747
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 368 additions and 254 deletions

View file

@ -21,6 +21,7 @@ from util.http import exact_abort
from util.registry.filelike import wrap_with_handler from util.registry.filelike import wrap_with_handler
from util.registry.queuefile import QueueFile from util.registry.queuefile import QueueFile
from util.registry.queueprocess import QueueProcess from util.registry.queueprocess import QueueProcess
from util.registry.tarlayerformat import TarLayerFormatterReporter
from util.registry.torrent import ( from util.registry.torrent import (
make_torrent, per_user_torrent_filename, public_torrent_filename, PieceHasher) make_torrent, per_user_torrent_filename, public_torrent_filename, PieceHasher)
@ -31,7 +32,15 @@ verbs = Blueprint('verbs', __name__)
LAYER_MIMETYPE = 'binary/octet-stream' LAYER_MIMETYPE = 'binary/octet-stream'
def _open_stream(formatter, repo_image, tag, derived_image_id, handlers): class VerbReporter(TarLayerFormatterReporter):
def __init__(self, kind):
self.kind = kind
def report_pass(self, pass_count):
metric_queue.verb_action_passes.Inc(labelvalues=[self.kind, pass_count])
def _open_stream(formatter, repo_image, tag, derived_image_id, handlers, reporter):
""" """
This method generates a stream of data which will be replicated and read from the queue files. This method generates a stream of data which will be replicated and read from the queue files.
This method runs in a separate process. This method runs in a separate process.
@ -45,19 +54,24 @@ def _open_stream(formatter, repo_image, tag, derived_image_id, handlers):
for current_image in image_list: for current_image in image_list:
yield current_image yield current_image
def get_next_layer(): def image_stream_getter(store, current_image):
# Re-Initialize the storage engine because some may not respond well to forking (e.g. S3) def get_stream_for_storage():
store = Storage(app, metric_queue, config_provider=config_provider, ip_resolver=ip_resolver)
for current_image in image_list:
current_image_path = model.get_blob_path(current_image.blob) current_image_path = model.get_blob_path(current_image.blob)
current_image_stream = store.stream_read_file(current_image.blob.locations, current_image_stream = store.stream_read_file(current_image.blob.locations,
current_image_path) current_image_path)
logger.debug('Returning image layer %s: %s', current_image.image_id, current_image_path) logger.debug('Returning image layer %s: %s', current_image.image_id, current_image_path)
yield current_image_stream return current_image_stream
return get_stream_for_storage
def tar_stream_getter_iterator():
# Re-Initialize the storage engine because some may not respond well to forking (e.g. S3)
store = Storage(app, metric_queue, config_provider=config_provider, ip_resolver=ip_resolver)
for current_image in image_list:
yield image_stream_getter(store, current_image)
stream = formatter.build_stream(repo_image, tag, derived_image_id, get_next_image, stream = formatter.build_stream(repo_image, tag, derived_image_id, get_next_image,
get_next_layer) tar_stream_getter_iterator, reporter=reporter)
for handler_fn in handlers: for handler_fn in handlers:
stream = wrap_with_handler(stream, handler_fn) stream = wrap_with_handler(stream, handler_fn)
@ -280,7 +294,8 @@ def _repo_verb(namespace, repository, tag, verb, formatter, sign=False, checker=
# Create a queue process to generate the data. The queue files will read from the process # Create a queue process to generate the data. The queue files will read from the process
# and send the results to the client and storage. # and send the results to the client and storage.
handlers = [hasher.update] handlers = [hasher.update]
args = (formatter, repo_image, tag, derived_image_id, handlers) reporter = VerbReporter(verb)
args = (formatter, repo_image, tag, derived_image_id, handlers, reporter)
queue_process = QueueProcess( queue_process = QueueProcess(
_open_stream, _open_stream,
8 * 1024, 8 * 1024,

View file

@ -19,7 +19,7 @@ class AppCImageFormatter(TarImageFormatter):
""" """
def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator, def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator,
get_layer_iterator): tar_stream_getter_iterator, reporter=None):
image_mtime = 0 image_mtime = 0
created = next(get_image_iterator()).v1_metadata.created created = next(get_image_iterator()).v1_metadata.created
if created is not None: if created is not None:
@ -40,7 +40,8 @@ class AppCImageFormatter(TarImageFormatter):
# Yield the merged layer dtaa. # Yield the merged layer dtaa.
yield self.tar_folder('rootfs', mtime=image_mtime) yield self.tar_folder('rootfs', mtime=image_mtime)
layer_merger = StreamLayerMerger(get_layer_iterator, path_prefix='rootfs/') layer_merger = StreamLayerMerger(tar_stream_getter_iterator, path_prefix='rootfs/',
reporter=reporter)
for entry in layer_merger.get_generator(): for entry in layer_merger.get_generator():
yield entry yield entry

View file

@ -8,16 +8,16 @@ class TarImageFormatter(object):
""" """
def build_stream(self, repo_image, tag, synthetic_image_id, get_image_iterator, def build_stream(self, repo_image, tag, synthetic_image_id, get_image_iterator,
get_layer_iterator): tar_stream_getter_iterator, reporter=None):
""" """
Builds and streams a synthetic .tar.gz that represents the formatted tar created by this class's Builds and streams a synthetic .tar.gz that represents the formatted tar created by this class's
implementation. implementation.
""" """
return GzipWrap(self.stream_generator(repo_image, tag, synthetic_image_id, get_image_iterator, return GzipWrap(self.stream_generator(repo_image, tag, synthetic_image_id, get_image_iterator,
get_layer_iterator)) tar_stream_getter_iterator, reporter=reporter))
def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator, def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator,
get_layer_iterator): tar_stream_getter_iterator):
raise NotImplementedError raise NotImplementedError
def tar_file(self, name, contents, mtime=None): def tar_file(self, name, contents, mtime=None):

View file

@ -29,7 +29,7 @@ class SquashedDockerImageFormatter(TarImageFormatter):
SIZE_MULTIPLIER = 1.2 SIZE_MULTIPLIER = 1.2
def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator, def stream_generator(self, repo_image, tag, synthetic_image_id, get_image_iterator,
get_layer_iterator): tar_stream_getter_iterator, reporter=None):
image_mtime = 0 image_mtime = 0
created = next(get_image_iterator()).v1_metadata.created created = next(get_image_iterator()).v1_metadata.created
if created is not None: if created is not None:
@ -42,7 +42,7 @@ class SquashedDockerImageFormatter(TarImageFormatter):
# json - The layer JSON # json - The layer JSON
# layer.tar - The tarballed contents of the layer # layer.tar - The tarballed contents of the layer
# VERSION - The docker import version: '1.0' # VERSION - The docker import version: '1.0'
layer_merger = StreamLayerMerger(get_layer_iterator) layer_merger = StreamLayerMerger(tar_stream_getter_iterator, reporter=reporter)
# Yield the repositories file: # Yield the repositories file:
synthetic_layer_info = {} synthetic_layer_info = {}

View file

@ -2,17 +2,16 @@ import unittest
import tarfile import tarfile
from StringIO import StringIO from StringIO import StringIO
from util.registry.streamlayerformat import StreamLayerMerger, AUFS_WHITEOUT from util.registry.streamlayerformat import StreamLayerMerger
from util.registry.aufs import AUFS_WHITEOUT
from util.registry.tarlayerformat import TarLayerReadException from util.registry.tarlayerformat import TarLayerReadException
class TestStreamLayerMerger(unittest.TestCase): class TestStreamLayerMerger(unittest.TestCase):
def create_layer(self, **kwargs): def create_layer(self, *file_pairs):
output = StringIO() output = StringIO()
with tarfile.open(fileobj=output, mode='w:gz') as tar: with tarfile.open(fileobj=output, mode='w:gz') as tar:
for current_contents in kwargs: for current_filename, current_contents in file_pairs:
current_filename = kwargs[current_contents] if current_contents is None:
if current_contents == '_':
# This is a deleted file. # This is a deleted file.
if current_filename.endswith('/'): if current_filename.endswith('/'):
current_filename = current_filename[:-1] current_filename = current_filename[:-1]
@ -25,9 +24,15 @@ class TestStreamLayerMerger(unittest.TestCase):
current_contents = '' current_contents = ''
info = tarfile.TarInfo(name=current_filename) if current_contents.startswith('linkto:'):
info.size = len(current_contents) info = tarfile.TarInfo(name=current_filename)
tar.addfile(info, fileobj=StringIO(current_contents)) info.linkname = current_contents[len('linkto:'):]
info.type = tarfile.LNKTYPE
tar.addfile(info)
else:
info = tarfile.TarInfo(name=current_filename)
info.size = len(current_contents)
tar.addfile(info, fileobj=StringIO(current_contents))
return output.getvalue() return output.getvalue()
@ -35,10 +40,13 @@ class TestStreamLayerMerger(unittest.TestCase):
return '' return ''
def squash_layers(self, layers, path_prefix=None): def squash_layers(self, layers, path_prefix=None):
def get_layers(): def getter_for_layer(layer):
return [StringIO(layer) for layer in layers] return lambda: StringIO(layer)
merger = StreamLayerMerger(get_layers, path_prefix=path_prefix) def layer_stream_getter():
return [getter_for_layer(layer) for layer in layers]
merger = StreamLayerMerger(layer_stream_getter, path_prefix=path_prefix)
merged_data = ''.join(merger.get_generator()) merged_data = ''.join(merger.get_generator())
return merged_data return merged_data
@ -58,9 +66,9 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_single_layer(self): def test_single_layer(self):
tar_layer = self.create_layer( tar_layer = self.create_layer(
foo = 'some_file', ('some_file', 'foo'),
bar = 'another_file', ('another_file', 'bar'),
meh = 'third_file') ('third_file', 'meh'))
squashed = self.squash_layers([tar_layer]) squashed = self.squash_layers([tar_layer])
@ -70,12 +78,12 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_multiple_layers(self): def test_multiple_layers(self):
second_layer = self.create_layer( second_layer = self.create_layer(
foo = 'some_file', ('some_file', 'foo'),
bar = 'another_file', ('another_file', 'bar'),
meh = 'third_file') ('third_file', 'meh'))
first_layer = self.create_layer( first_layer = self.create_layer(
top = 'top_file') ('top_file', 'top'))
squashed = self.squash_layers([first_layer, second_layer]) squashed = self.squash_layers([first_layer, second_layer])
@ -86,12 +94,12 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_multiple_layers_dot(self): def test_multiple_layers_dot(self):
second_layer = self.create_layer( second_layer = self.create_layer(
foo = './some_file', ('./some_file', 'foo'),
bar = 'another_file', ('another_file', 'bar'),
meh = './third_file') ('./third_file', 'meh'))
first_layer = self.create_layer( first_layer = self.create_layer(
top = 'top_file') ('top_file', 'top'))
squashed = self.squash_layers([first_layer, second_layer]) squashed = self.squash_layers([first_layer, second_layer])
@ -102,12 +110,12 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_multiple_layers_overwrite(self): def test_multiple_layers_overwrite(self):
second_layer = self.create_layer( second_layer = self.create_layer(
foo = 'some_file', ('some_file', 'foo'),
bar = 'another_file', ('another_file', 'bar'),
meh = 'third_file') ('third_file', 'meh'))
first_layer = self.create_layer( first_layer = self.create_layer(
top = 'another_file') ('another_file', 'top'))
squashed = self.squash_layers([first_layer, second_layer]) squashed = self.squash_layers([first_layer, second_layer])
@ -117,12 +125,12 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_multiple_layers_overwrite_base_dot(self): def test_multiple_layers_overwrite_base_dot(self):
second_layer = self.create_layer( second_layer = self.create_layer(
foo = 'some_file', ('some_file', 'foo'),
bar = './another_file', ('./another_file', 'bar'),
meh = 'third_file') ('third_file', 'meh'))
first_layer = self.create_layer( first_layer = self.create_layer(
top = 'another_file') ('another_file', 'top'))
squashed = self.squash_layers([first_layer, second_layer]) squashed = self.squash_layers([first_layer, second_layer])
@ -133,12 +141,12 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_multiple_layers_overwrite_top_dot(self): def test_multiple_layers_overwrite_top_dot(self):
second_layer = self.create_layer( second_layer = self.create_layer(
foo = 'some_file', ('some_file', 'foo'),
bar = 'another_file', ('another_file', 'bar'),
meh = 'third_file') ('third_file', 'meh'))
first_layer = self.create_layer( first_layer = self.create_layer(
top = './another_file') ('./another_file', 'top'))
squashed = self.squash_layers([first_layer, second_layer]) squashed = self.squash_layers([first_layer, second_layer])
@ -149,12 +157,12 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_deleted_file(self): def test_deleted_file(self):
second_layer = self.create_layer( second_layer = self.create_layer(
foo = 'some_file', ('some_file', 'foo'),
bar = 'another_file', ('another_file', 'bar'),
meh = 'third_file') ('third_file', 'meh'))
first_layer = self.create_layer( first_layer = self.create_layer(
_ = 'another_file') ('another_file', None))
squashed = self.squash_layers([first_layer, second_layer]) squashed = self.squash_layers([first_layer, second_layer])
@ -164,15 +172,15 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_deleted_readded_file(self): def test_deleted_readded_file(self):
third_layer = self.create_layer( third_layer = self.create_layer(
bar = 'another_file') ('another_file', 'bar'))
second_layer = self.create_layer( second_layer = self.create_layer(
foo = 'some_file', ('some_file', 'foo'),
_ = 'another_file', ('another_file', None),
meh = 'third_file') ('third_file', 'meh'))
first_layer = self.create_layer( first_layer = self.create_layer(
newagain = 'another_file') ('another_file', 'newagain'))
squashed = self.squash_layers([first_layer, second_layer, third_layer]) squashed = self.squash_layers([first_layer, second_layer, third_layer])
@ -182,15 +190,15 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_deleted_in_lower_layer(self): def test_deleted_in_lower_layer(self):
third_layer = self.create_layer( third_layer = self.create_layer(
bar = 'deleted_file') ('deleted_file', 'bar'))
second_layer = self.create_layer( second_layer = self.create_layer(
foo = 'some_file', ('some_file', 'foo'),
_ = 'deleted_file', ('deleted_file', None),
meh = 'third_file') ('third_file', 'meh'))
first_layer = self.create_layer( first_layer = self.create_layer(
top = 'top_file') ('top_file', 'top'))
squashed = self.squash_layers([first_layer, second_layer, third_layer]) squashed = self.squash_layers([first_layer, second_layer, third_layer])
@ -201,31 +209,31 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_deleted_in_lower_layer_with_added_dot(self): def test_deleted_in_lower_layer_with_added_dot(self):
third_layer = self.create_layer( third_layer = self.create_layer(
something = './deleted_file') ('./deleted_file', 'something'))
second_layer = self.create_layer( second_layer = self.create_layer(
_ = 'deleted_file') ('deleted_file', None))
squashed = self.squash_layers([second_layer, third_layer]) squashed = self.squash_layers([second_layer, third_layer])
self.assertDoesNotHaveFile(squashed, 'deleted_file') self.assertDoesNotHaveFile(squashed, 'deleted_file')
def test_deleted_in_lower_layer_with_deleted_dot(self): def test_deleted_in_lower_layer_with_deleted_dot(self):
third_layer = self.create_layer( third_layer = self.create_layer(
something = './deleted_file') ('./deleted_file', 'something'))
second_layer = self.create_layer( second_layer = self.create_layer(
_ = './deleted_file') ('./deleted_file', None))
squashed = self.squash_layers([second_layer, third_layer]) squashed = self.squash_layers([second_layer, third_layer])
self.assertDoesNotHaveFile(squashed, 'deleted_file') self.assertDoesNotHaveFile(squashed, 'deleted_file')
def test_directory(self): def test_directory(self):
second_layer = self.create_layer( second_layer = self.create_layer(
foo = 'foo/some_file', ('foo/some_file', 'foo'),
bar = 'foo/another_file') ('foo/another_file', 'bar'))
first_layer = self.create_layer( first_layer = self.create_layer(
top = 'foo/some_file') ('foo/some_file', 'top'))
squashed = self.squash_layers([first_layer, second_layer]) squashed = self.squash_layers([first_layer, second_layer])
@ -234,11 +242,11 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_sub_directory(self): def test_sub_directory(self):
second_layer = self.create_layer( second_layer = self.create_layer(
foo = 'foo/some_file', ('foo/some_file', 'foo'),
bar = 'foo/bar/another_file') ('foo/bar/another_file', 'bar'))
first_layer = self.create_layer( first_layer = self.create_layer(
top = 'foo/some_file') ('foo/some_file', 'top'))
squashed = self.squash_layers([first_layer, second_layer]) squashed = self.squash_layers([first_layer, second_layer])
@ -247,11 +255,11 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_delete_directory(self): def test_delete_directory(self):
second_layer = self.create_layer( second_layer = self.create_layer(
foo = 'foo/some_file', ('foo/some_file', 'foo'),
bar = 'foo/another_file') ('foo/another_file', 'bar'))
first_layer = self.create_layer( first_layer = self.create_layer(
_ = 'foo/') ('foo/', None))
squashed = self.squash_layers([first_layer, second_layer]) squashed = self.squash_layers([first_layer, second_layer])
@ -260,11 +268,11 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_delete_sub_directory(self): def test_delete_sub_directory(self):
second_layer = self.create_layer( second_layer = self.create_layer(
foo = 'foo/some_file', ('foo/some_file', 'foo'),
bar = 'foo/bar/another_file') ('foo/bar/another_file', 'bar'))
first_layer = self.create_layer( first_layer = self.create_layer(
_ = 'foo/bar/') ('foo/bar/', None))
squashed = self.squash_layers([first_layer, second_layer]) squashed = self.squash_layers([first_layer, second_layer])
@ -273,11 +281,11 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_delete_sub_directory_with_dot(self): def test_delete_sub_directory_with_dot(self):
second_layer = self.create_layer( second_layer = self.create_layer(
foo = 'foo/some_file', ('foo/some_file', 'foo'),
bar = 'foo/bar/another_file') ('foo/bar/another_file', 'bar'))
first_layer = self.create_layer( first_layer = self.create_layer(
_ = './foo/bar/') ('./foo/bar/', None))
squashed = self.squash_layers([first_layer, second_layer]) squashed = self.squash_layers([first_layer, second_layer])
@ -286,11 +294,11 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_delete_sub_directory_with_subdot(self): def test_delete_sub_directory_with_subdot(self):
second_layer = self.create_layer( second_layer = self.create_layer(
foo = './foo/some_file', ('./foo/some_file', 'foo'),
bar = './foo/bar/another_file') ('./foo/bar/another_file', 'bar'))
first_layer = self.create_layer( first_layer = self.create_layer(
_ = 'foo/bar/') ('foo/bar/', None))
squashed = self.squash_layers([first_layer, second_layer]) squashed = self.squash_layers([first_layer, second_layer])
@ -300,14 +308,14 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_delete_directory_recreate(self): def test_delete_directory_recreate(self):
third_layer = self.create_layer( third_layer = self.create_layer(
foo = 'foo/some_file', ('foo/some_file', 'foo'),
bar = 'foo/another_file') ('foo/another_file', 'bar'))
second_layer = self.create_layer( second_layer = self.create_layer(
_ = 'foo/') ('foo/', None))
first_layer = self.create_layer( first_layer = self.create_layer(
baz = 'foo/some_file') ('foo/some_file', 'baz'))
squashed = self.squash_layers([first_layer, second_layer, third_layer]) squashed = self.squash_layers([first_layer, second_layer, third_layer])
@ -316,11 +324,11 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_delete_directory_prefix(self): def test_delete_directory_prefix(self):
third_layer = self.create_layer( third_layer = self.create_layer(
foo = 'foobar/some_file', ('foobar/some_file', 'foo'),
bar = 'foo/another_file') ('foo/another_file', 'bar'))
second_layer = self.create_layer( second_layer = self.create_layer(
_ = 'foo/') ('foo/', None))
squashed = self.squash_layers([second_layer, third_layer]) squashed = self.squash_layers([second_layer, third_layer])
@ -330,11 +338,11 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_delete_directory_pre_prefix(self): def test_delete_directory_pre_prefix(self):
third_layer = self.create_layer( third_layer = self.create_layer(
foo = 'foobar/baz/some_file', ('foobar/baz/some_file', 'foo'),
bar = 'foo/another_file') ('foo/another_file', 'bar'))
second_layer = self.create_layer( second_layer = self.create_layer(
_ = 'foo/') ('foo/', None))
squashed = self.squash_layers([second_layer, third_layer]) squashed = self.squash_layers([second_layer, third_layer])
@ -344,11 +352,11 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_delete_root_directory(self): def test_delete_root_directory(self):
third_layer = self.create_layer( third_layer = self.create_layer(
foo = 'build/first_file', ('build/first_file', 'foo'),
bar = 'build/second_file') ('build/second_file', 'bar'))
second_layer = self.create_layer( second_layer = self.create_layer(
_ = 'build') ('build', None))
squashed = self.squash_layers([second_layer, third_layer]) squashed = self.squash_layers([second_layer, third_layer])
@ -358,8 +366,8 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_tar_empty_layer(self): def test_tar_empty_layer(self):
third_layer = self.create_layer( third_layer = self.create_layer(
foo = 'build/first_file', ('build/first_file', 'foo'),
bar = 'build/second_file') ('build/second_file', 'bar'))
empty_layer = self.create_layer() empty_layer = self.create_layer()
@ -371,8 +379,8 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_data_empty_layer(self): def test_data_empty_layer(self):
third_layer = self.create_layer( third_layer = self.create_layer(
foo = 'build/first_file', ('build/first_file', 'foo'),
bar = 'build/second_file') ('build/second_file', 'bar'))
empty_layer = self.create_empty_layer() empty_layer = self.create_empty_layer()
@ -384,8 +392,8 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_broken_layer(self): def test_broken_layer(self):
third_layer = self.create_layer( third_layer = self.create_layer(
foo = 'build/first_file', ('build/first_file', 'foo'),
bar = 'build/second_file') ('build/second_file', 'bar'))
broken_layer = 'not valid data' broken_layer = 'not valid data'
@ -397,9 +405,9 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_single_layer_with_prefix(self): def test_single_layer_with_prefix(self):
tar_layer = self.create_layer( tar_layer = self.create_layer(
foo = 'some_file', ('some_file', 'foo'),
bar = 'another_file', ('another_file', 'bar'),
meh = 'third_file') ('third_file', 'meh'))
squashed = self.squash_layers([tar_layer], path_prefix='foo/') squashed = self.squash_layers([tar_layer], path_prefix='foo/')
@ -409,12 +417,12 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_multiple_layers_overwrite_with_prefix(self): def test_multiple_layers_overwrite_with_prefix(self):
second_layer = self.create_layer( second_layer = self.create_layer(
foo = 'some_file', ('some_file', 'foo'),
bar = 'another_file', ('another_file', 'bar'),
meh = 'third_file') ('third_file', 'meh'))
first_layer = self.create_layer( first_layer = self.create_layer(
top = 'another_file') ('another_file', 'top'))
squashed = self.squash_layers([first_layer, second_layer], path_prefix='foo/') squashed = self.squash_layers([first_layer, second_layer], path_prefix='foo/')
@ -425,7 +433,7 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_superlong_filename(self): def test_superlong_filename(self):
tar_layer = self.create_layer( tar_layer = self.create_layer(
meh = 'this_is_the_filename_that_never_ends_it_goes_on_and_on_my_friend_some_people_started') ('this_is_the_filename_that_never_ends_it_goes_on_and_on_my_friend_some_people_started', 'meh'))
squashed = self.squash_layers([tar_layer], squashed = self.squash_layers([tar_layer],
path_prefix='foo/') path_prefix='foo/')
@ -435,9 +443,9 @@ class TestStreamLayerMerger(unittest.TestCase):
def test_superlong_prefix(self): def test_superlong_prefix(self):
tar_layer = self.create_layer( tar_layer = self.create_layer(
foo = 'some_file', ('some_file', 'foo'),
bar = 'another_file', ('another_file', 'bar'),
meh = 'third_file') ('third_file', 'meh'))
squashed = self.squash_layers([tar_layer], squashed = self.squash_layers([tar_layer],
path_prefix='foo/bar/baz/something/foo/bar/baz/anotherthing/whatever/this/is/a/really/long/filename/that/goes/here/') path_prefix='foo/bar/baz/something/foo/bar/baz/anotherthing/whatever/this/is/a/really/long/filename/that/goes/here/')
@ -447,5 +455,40 @@ class TestStreamLayerMerger(unittest.TestCase):
self.assertHasFile(squashed, 'foo/bar/baz/something/foo/bar/baz/anotherthing/whatever/this/is/a/really/long/filename/that/goes/here/third_file', 'meh') self.assertHasFile(squashed, 'foo/bar/baz/something/foo/bar/baz/anotherthing/whatever/this/is/a/really/long/filename/that/goes/here/third_file', 'meh')
def test_hardlink_to_deleted_file(self):
first_layer = self.create_layer(
('tobedeletedfile', 'somecontents'),
('link_to_deleted_file', 'linkto:tobedeletedfile'),
('third_file', 'meh'))
second_layer = self.create_layer(
('tobedeletedfile', None))
squashed = self.squash_layers([second_layer, first_layer], path_prefix='foo/')
self.assertHasFile(squashed, 'foo/third_file', 'meh')
self.assertHasFile(squashed, 'foo/link_to_deleted_file', 'somecontents')
self.assertDoesNotHaveFile(squashed, 'foo/tobedeletedfile')
def test_multiple_hardlink_to_deleted_file(self):
first_layer = self.create_layer(
('tobedeletedfile', 'somecontents'),
('link_to_deleted_file', 'linkto:tobedeletedfile'),
('another_link_to_deleted_file', 'linkto:tobedeletedfile'),
('third_file', 'meh'))
second_layer = self.create_layer(
('tobedeletedfile', None))
squashed = self.squash_layers([second_layer, first_layer], path_prefix='foo/')
self.assertHasFile(squashed, 'foo/third_file', 'meh')
self.assertHasFile(squashed, 'foo/link_to_deleted_file', 'somecontents')
self.assertHasFile(squashed, 'foo/another_link_to_deleted_file', 'somecontents')
self.assertDoesNotHaveFile(squashed, 'foo/tobedeletedfile')
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -114,6 +114,9 @@ class MetricQueue(object):
'Invalid registry instance key count', 'Invalid registry instance key count',
labelnames=['key_id']) labelnames=['key_id'])
self.verb_action_passes = prom.create_counter('verb_action_passes', 'Verb Pass Count',
labelnames=['kind', 'pass_count'])
# Deprecated: Define an in-memory queue for reporting metrics to CloudWatch or another # Deprecated: Define an in-memory queue for reporting metrics to CloudWatch or another
# provider. # provider.
self._queue = None self._queue = None

View file

@ -1,53 +1,70 @@
import marisa_trie
import os import os
import tarfile
import marisa_trie
from util.registry.aufs import is_aufs_metadata, get_deleted_prefix from util.registry.aufs import is_aufs_metadata, get_deleted_prefix
from util.registry.tarlayerformat import TarLayerFormat from util.registry.tarlayerformat import TarLayerFormat
AUFS_METADATA = u'.wh..wh.'
AUFS_WHITEOUT = u'.wh.'
AUFS_WHITEOUT_PREFIX_LENGTH = len(AUFS_WHITEOUT)
class StreamLayerMerger(TarLayerFormat): class StreamLayerMerger(TarLayerFormat):
""" Class which creates a generator of the combined TAR data for a set of Docker layers. """ """ Class which creates a generator of the combined TAR data for a set of Docker layers. """
def __init__(self, layer_iterator, path_prefix=None): def __init__(self, get_tar_stream_iterator, path_prefix=None, reporter=None):
super(StreamLayerMerger, self).__init__(layer_iterator, path_prefix) super(StreamLayerMerger, self).__init__(get_tar_stream_iterator, path_prefix, reporter=reporter)
self.path_trie = marisa_trie.Trie() self.path_trie = marisa_trie.Trie()
self.path_encountered = [] self.path_encountered = set()
self.prefix_trie = marisa_trie.Trie() self.deleted_prefix_trie = marisa_trie.Trie()
self.prefix_encountered = [] self.deleted_prefixes_encountered = set()
def after_tar_layer(self, current_layer): def after_tar_layer(self):
# Update the tries. # Update the tries.
self.path_trie = marisa_trie.Trie(self.path_encountered) self.path_trie = marisa_trie.Trie(self.path_encountered)
self.prefix_trie = marisa_trie.Trie(self.prefix_encountered) self.deleted_prefix_trie = marisa_trie.Trie(self.deleted_prefixes_encountered)
def check_tar_info(self, tar_info): @staticmethod
absolute = os.path.relpath(tar_info.name.decode('utf-8'), './') def _normalize_path(path):
return os.path.relpath(path.decode('utf-8'), './')
def _check_deleted(self, absolute):
ubsolute = unicode(absolute)
for prefix in self.deleted_prefix_trie.iter_prefixes(ubsolute):
if not os.path.relpath(ubsolute, prefix).startswith('..'):
return True
return False
def is_skipped_file(self, filename):
absolute = StreamLayerMerger._normalize_path(filename)
# Skip metadata. # Skip metadata.
if is_aufs_metadata(absolute): if is_aufs_metadata(absolute):
return False return True
# Add any prefix of deleted paths to the prefix list. # Check if the file is under a deleted path.
deleted_prefix = get_deleted_prefix(absolute) if self._check_deleted(absolute):
if deleted_prefix is not None: return True
self.prefix_encountered.append(deleted_prefix)
return False
# Check if this file has already been encountered somewhere. If so, # Check if this file has already been encountered somewhere. If so,
# skip it. # skip it.
ubsolute = unicode(absolute) ubsolute = unicode(absolute)
if ubsolute in self.path_trie: if ubsolute in self.path_trie:
return True
return False
def should_append_file(self, filename):
if self.is_skipped_file(filename):
return False return False
# Check if this file is under a deleted path. absolute = StreamLayerMerger._normalize_path(filename)
for prefix in self.prefix_trie.iter_prefixes(ubsolute):
if not os.path.relpath(ubsolute, prefix).startswith('..'): # Add any prefix of deleted paths to the prefix list.
return False deleted_prefix = get_deleted_prefix(absolute)
if deleted_prefix is not None:
self.deleted_prefixes_encountered.add(deleted_prefix)
return False
# Otherwise, add the path to the encountered list and return it. # Otherwise, add the path to the encountered list and return it.
self.path_encountered.append(absolute) self.path_encountered.add(absolute)
return True return True

View file

@ -1,56 +0,0 @@
import tarfile
from cStringIO import StringIO
from util.registry.tarlayerformat import TarLayerFormat
from util.registry.gzipwrap import GzipWrap
class TarfileAppender(TarLayerFormat):
""" Helper class which allows for appending entries to a gzipped-tarfile and doing so
in a streaming manner.
"""
def __init__(self, base_tar_file, entries):
super(TarfileAppender, self).__init__(self._get_tar_iterator)
self.entries = entries
self.base_tar_file = base_tar_file
self.first_info = None
def get_stream(self):
return GzipWrap(self.get_generator())
def after_tar_layer(self, current_layer):
pass
def check_tar_info(self, tar_info):
if not self.first_info:
self.first_info = tar_info
return True
def _get_tar_iterator(self):
# Yield the contents of the base tar.
yield self.base_tar_file
# Construct an in-memory tar containing the entries to append, and then yield
# its data.
def add_entry(arch, dir_path, contents=None):
info = tarfile.TarInfo(dir_path)
info.uid = self.first_info.uid
info.gid = self.first_info.gid
info.mode = self.first_info.mode
info.mtime = self.first_info.mtime
info.type = tarfile.REGTYPE if contents else tarfile.DIRTYPE
if contents:
info.size = len(contents)
arch.addfile(info, fileobj=StringIO(contents) if contents else None)
append_tarball = StringIO()
with tarfile.open(fileobj=append_tarball, mode='w') as updated_archive:
for entry in self.entries:
add_entry(updated_archive, entry, self.entries[entry])
# To make tarfile happy.
append_tarball.seek(0)
yield append_tarball

View file

@ -2,93 +2,184 @@ import os
import tarfile import tarfile
import copy import copy
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from six import add_metaclass
from util.abchelpers import nooper
class TarLayerReadException(Exception): class TarLayerReadException(Exception):
""" Exception raised when reading a layer has failed. """ """ Exception raised when reading a layer has failed. """
pass pass
# 9MB (+ padding below) so that it matches the 10MB expected by Gzip.
CHUNK_SIZE = 1024 * 1024 * 9
@add_metaclass(ABCMeta)
class TarLayerFormatterReporter(object):
@abstractmethod
def report_pass(self, stream_count):
""" Reports a formatting pass. """
pass
@nooper
class NoopReporter(TarLayerFormatterReporter):
pass
@add_metaclass(ABCMeta)
class TarLayerFormat(object): class TarLayerFormat(object):
""" Class which creates a generator of the combined TAR data. """ """ Class which creates a generator of the combined TAR data. """
def __init__(self, tar_iterator, path_prefix=None): def __init__(self, tar_stream_getter_iterator, path_prefix=None, reporter=None):
self.tar_iterator = tar_iterator self.tar_stream_getter_iterator = tar_stream_getter_iterator
self.path_prefix = path_prefix self.path_prefix = path_prefix or ''
self.reporter = reporter or NoopReporter()
def get_generator(self): def get_generator(self):
for current_tar in self.tar_iterator(): for stream_getter in self.tar_stream_getter_iterator():
current_tar_stream = stream_getter()
# Read the current TAR. If it is empty, we just continue # Read the current TAR. If it is empty, we just continue
# to the next one. # to the next one.
tar_file = None tar_file = TarLayerFormat._tar_file_from_stream(current_tar_stream)
try:
tar_file = tarfile.open(mode='r|*', fileobj=current_tar)
except tarfile.ReadError as re:
if re.message != 'empty file':
raise TarLayerReadException('Could not read layer')
if not tar_file: if not tar_file:
continue continue
# For each of the tar entries, yield them IF and ONLY IF we have not # For each of the tar entries, yield them IF and ONLY IF we have not
# encountered the path before. # encountered the path before.
dangling_hard_links = defaultdict(list)
# 9MB (+ padding below) so that it matches the 10MB expected by Gzip.
chunk_size = 1024 * 1024 * 9
for tar_info in tar_file: for tar_info in tar_file:
if not self.check_tar_info(tar_info): if not self.should_append_file(tar_info.name):
continue continue
# Note: We use a copy here because we need to make sure we copy over all the internal
# data of the tar header. We cannot use frombuf(tobuf()), however, because it doesn't
# properly handle large filenames.
clone = copy.deepcopy(tar_info)
clone.name = os.path.join(self.path_prefix, clone.name)
# If the entry is a *hard* link, then prefix it as well. Soft links are relative.
if clone.linkname and clone.type == tarfile.LNKTYPE:
# If the entry is a dangling hard link, we skip here. Dangling hard links will be handled
# in a second pass.
if self.is_skipped_file(tar_info.linkname):
dangling_hard_links[tar_info.linkname].append(tar_info)
continue
clone.linkname = os.path.join(self.path_prefix, clone.linkname)
# Yield the tar header. # Yield the tar header.
if self.path_prefix: yield clone.tobuf()
# Note: We use a copy here because we need to make sure we copy over all the internal
# data of the tar header. We cannot use frombuf(tobuf()), however, because it doesn't
# properly handle large filenames.
clone = copy.deepcopy(tar_info)
clone.name = os.path.join(self.path_prefix, clone.name)
# If the entry is a *hard* link, then prefix it as well. Soft links are relative.
if clone.linkname and clone.type == tarfile.LNKTYPE:
clone.linkname = os.path.join(self.path_prefix, clone.linkname)
yield clone.tobuf()
else:
yield tar_info.tobuf()
# Try to extract any file contents for the tar. If found, we yield them as well. # Try to extract any file contents for the tar. If found, we yield them as well.
if tar_info.isreg(): if tar_info.isreg():
file_stream = tar_file.extractfile(tar_info) for block in TarLayerFormat._emit_file(tar_file, tar_info):
if file_stream is not None: yield block
length = 0
while True:
current_block = file_stream.read(chunk_size)
if not len(current_block):
break
yield current_block
length += len(current_block)
file_stream.close()
# Files must be padding to 512 byte multiples.
if length % 512 != 0:
yield '\0' * (512 - (length % 512))
# Close the layer stream now that we're done with it. # Close the layer stream now that we're done with it.
tar_file.close() tar_file.close()
# If there are any dangling hard links, open a new stream and retarget the dangling hard
# links to a new copy of the contents, which will be placed under the *first* dangling hard
# link's name.
if len(dangling_hard_links) > 0:
tar_file = TarLayerFormat._tar_file_from_stream(stream_getter())
if not tar_file:
raise TarLayerReadException('Could not re-read tar layer')
for tar_info in tar_file:
# If we encounter a file that holds the data for a dangling link,
# emit it under the name of the first dangling hard link. All other
# dangling hard links will be retargeted to this first name.
if tar_info.name in dangling_hard_links:
first_dangling = dangling_hard_links[tar_info.name][0]
# Copy the first dangling hard link, change it to a normal file,
# and emit the deleted file's contents for it.
clone = copy.deepcopy(first_dangling)
clone.name = os.path.join(self.path_prefix, first_dangling.name)
clone.type = tar_info.type
clone.size = tar_info.size
clone.pax_headers = tar_info.pax_headers
yield clone.tobuf()
for block in TarLayerFormat._emit_file(tar_file, tar_info):
yield block
elif (tar_info.type == tarfile.LNKTYPE and
tar_info.linkname in dangling_hard_links and
not self.is_skipped_file(tar_info.name)):
# Retarget if necessary. All dangling hard links (but the first) will
# need to be retargeted.
first_dangling = dangling_hard_links[tar_info.linkname][0]
if tar_info.name == first_dangling.name:
# Skip; the first dangling is handled above.
continue
# Retarget the hard link to the first dangling hard link.
clone = copy.deepcopy(tar_info)
clone.name = os.path.join(self.path_prefix, clone.name)
clone.linkname = os.path.join(self.path_prefix, first_dangling.name)
yield clone.tobuf()
# Close the layer stream now that we're done with it.
tar_file.close()
# Conduct any post-tar work. # Conduct any post-tar work.
self.after_tar_layer(current_tar) self.after_tar_layer()
self.reporter.report_pass(2 if len(dangling_hard_links) > 0 else 1)
# Last two records are empty in TAR spec. # Last two records are empty in TAR spec.
yield '\0' * 512 yield '\0' * 512
yield '\0' * 512 yield '\0' * 512
@abstractmethod
def check_tar_info(self, tar_info): def is_skipped_file(self, filename):
""" Returns true if the current tar_info should be added to the combined tar. False """ Returns true if the file with the given name will be skipped during append.
otherwise.
""" """
raise NotImplementedError() pass
def after_tar_layer(self, current_tar): @abstractmethod
def should_append_file(self, filename):
""" Returns true if the file with the given name should be appended when producing
the new TAR.
"""
pass
@abstractmethod
def after_tar_layer(self):
""" Invoked after a TAR layer is added, to do any post-add work. """ """ Invoked after a TAR layer is added, to do any post-add work. """
raise NotImplementedError() pass
@staticmethod
def _tar_file_from_stream(stream):
tar_file = None
try:
tar_file = tarfile.open(mode='r|*', fileobj=stream)
except tarfile.ReadError as re:
if re.message != 'empty file':
raise TarLayerReadException('Could not read layer')
return tar_file
@staticmethod
def _emit_file(tar_file, tar_info):
file_stream = tar_file.extractfile(tar_info)
if file_stream is not None:
length = 0
while True:
current_block = file_stream.read(CHUNK_SIZE)
if not len(current_block):
break
yield current_block
length += len(current_block)
file_stream.close()
# Files must be padding to 512 byte multiples.
if length % 512 != 0:
yield '\0' * (512 - (length % 512))