Retarget hard links pointing to deleted files by emitting the deleted file contents under the first hard link instance. This fixes a breakage in the squashed TAR where we were pointing hard links to missing data.
Fixes https://jira.coreos.com/browse/QUAY-885
This commit is contained in:
parent
041a7fcd36
commit
110366f656
8 changed files with 337 additions and 252 deletions
|
@ -1,53 +1,70 @@
|
|||
import marisa_trie
|
||||
import os
|
||||
import tarfile
|
||||
|
||||
import marisa_trie
|
||||
|
||||
from util.registry.aufs import is_aufs_metadata, get_deleted_prefix
|
||||
from util.registry.tarlayerformat import TarLayerFormat
|
||||
|
||||
AUFS_METADATA = u'.wh..wh.'
|
||||
|
||||
AUFS_WHITEOUT = u'.wh.'
|
||||
AUFS_WHITEOUT_PREFIX_LENGTH = len(AUFS_WHITEOUT)
|
||||
|
||||
class StreamLayerMerger(TarLayerFormat):
|
||||
""" Class which creates a generator of the combined TAR data for a set of Docker layers. """
|
||||
def __init__(self, layer_iterator, path_prefix=None):
|
||||
super(StreamLayerMerger, self).__init__(layer_iterator, path_prefix)
|
||||
def __init__(self, get_tar_stream_iterator, path_prefix=None):
|
||||
super(StreamLayerMerger, self).__init__(get_tar_stream_iterator, path_prefix)
|
||||
|
||||
self.path_trie = marisa_trie.Trie()
|
||||
self.path_encountered = []
|
||||
self.path_encountered = set()
|
||||
|
||||
self.prefix_trie = marisa_trie.Trie()
|
||||
self.prefix_encountered = []
|
||||
self.deleted_prefix_trie = marisa_trie.Trie()
|
||||
self.deleted_prefixes_encountered = set()
|
||||
|
||||
def after_tar_layer(self, current_layer):
|
||||
# Update the tries.
|
||||
self.path_trie = marisa_trie.Trie(self.path_encountered)
|
||||
self.prefix_trie = marisa_trie.Trie(self.prefix_encountered)
|
||||
def after_tar_layer(self):
|
||||
# Update the tries.
|
||||
self.path_trie = marisa_trie.Trie(self.path_encountered)
|
||||
self.deleted_prefix_trie = marisa_trie.Trie(self.deleted_prefixes_encountered)
|
||||
|
||||
def check_tar_info(self, tar_info):
|
||||
absolute = os.path.relpath(tar_info.name.decode('utf-8'), './')
|
||||
@staticmethod
|
||||
def _normalize_path(path):
|
||||
return os.path.relpath(path.decode('utf-8'), './')
|
||||
|
||||
def _check_deleted(self, absolute):
|
||||
ubsolute = unicode(absolute)
|
||||
for prefix in self.deleted_prefix_trie.iter_prefixes(ubsolute):
|
||||
if not os.path.relpath(ubsolute, prefix).startswith('..'):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def is_skipped_file(self, filename):
|
||||
absolute = StreamLayerMerger._normalize_path(filename)
|
||||
|
||||
# Skip metadata.
|
||||
if is_aufs_metadata(absolute):
|
||||
return False
|
||||
return True
|
||||
|
||||
# Add any prefix of deleted paths to the prefix list.
|
||||
deleted_prefix = get_deleted_prefix(absolute)
|
||||
if deleted_prefix is not None:
|
||||
self.prefix_encountered.append(deleted_prefix)
|
||||
return False
|
||||
# Check if the file is under a deleted path.
|
||||
if self._check_deleted(absolute):
|
||||
return True
|
||||
|
||||
# Check if this file has already been encountered somewhere. If so,
|
||||
# skip it.
|
||||
ubsolute = unicode(absolute)
|
||||
if ubsolute in self.path_trie:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def should_append_file(self, filename):
|
||||
if self.is_skipped_file(filename):
|
||||
return False
|
||||
|
||||
# Check if this file is under a deleted path.
|
||||
for prefix in self.prefix_trie.iter_prefixes(ubsolute):
|
||||
if not os.path.relpath(ubsolute, prefix).startswith('..'):
|
||||
return False
|
||||
absolute = StreamLayerMerger._normalize_path(filename)
|
||||
|
||||
# Add any prefix of deleted paths to the prefix list.
|
||||
deleted_prefix = get_deleted_prefix(absolute)
|
||||
if deleted_prefix is not None:
|
||||
self.deleted_prefixes_encountered.add(deleted_prefix)
|
||||
return False
|
||||
|
||||
# Otherwise, add the path to the encountered list and return it.
|
||||
self.path_encountered.append(absolute)
|
||||
self.path_encountered.add(absolute)
|
||||
return True
|
||||
|
|
|
@ -1,56 +0,0 @@
|
|||
import tarfile
|
||||
|
||||
from cStringIO import StringIO
|
||||
|
||||
from util.registry.tarlayerformat import TarLayerFormat
|
||||
from util.registry.gzipwrap import GzipWrap
|
||||
|
||||
class TarfileAppender(TarLayerFormat):
|
||||
""" Helper class which allows for appending entries to a gzipped-tarfile and doing so
|
||||
in a streaming manner.
|
||||
"""
|
||||
def __init__(self, base_tar_file, entries):
|
||||
super(TarfileAppender, self).__init__(self._get_tar_iterator)
|
||||
self.entries = entries
|
||||
self.base_tar_file = base_tar_file
|
||||
self.first_info = None
|
||||
|
||||
def get_stream(self):
|
||||
return GzipWrap(self.get_generator())
|
||||
|
||||
def after_tar_layer(self, current_layer):
|
||||
pass
|
||||
|
||||
def check_tar_info(self, tar_info):
|
||||
if not self.first_info:
|
||||
self.first_info = tar_info
|
||||
return True
|
||||
|
||||
def _get_tar_iterator(self):
|
||||
# Yield the contents of the base tar.
|
||||
yield self.base_tar_file
|
||||
|
||||
# Construct an in-memory tar containing the entries to append, and then yield
|
||||
# its data.
|
||||
def add_entry(arch, dir_path, contents=None):
|
||||
info = tarfile.TarInfo(dir_path)
|
||||
info.uid = self.first_info.uid
|
||||
info.gid = self.first_info.gid
|
||||
info.mode = self.first_info.mode
|
||||
info.mtime = self.first_info.mtime
|
||||
|
||||
info.type = tarfile.REGTYPE if contents else tarfile.DIRTYPE
|
||||
|
||||
if contents:
|
||||
info.size = len(contents)
|
||||
|
||||
arch.addfile(info, fileobj=StringIO(contents) if contents else None)
|
||||
|
||||
append_tarball = StringIO()
|
||||
with tarfile.open(fileobj=append_tarball, mode='w') as updated_archive:
|
||||
for entry in self.entries:
|
||||
add_entry(updated_archive, entry, self.entries[entry])
|
||||
|
||||
# To make tarfile happy.
|
||||
append_tarball.seek(0)
|
||||
yield append_tarball
|
|
@ -2,93 +2,169 @@ import os
|
|||
import tarfile
|
||||
import copy
|
||||
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from collections import defaultdict
|
||||
from six import add_metaclass
|
||||
|
||||
|
||||
class TarLayerReadException(Exception):
|
||||
""" Exception raised when reading a layer has failed. """
|
||||
pass
|
||||
|
||||
|
||||
# 9MB (+ padding below) so that it matches the 10MB expected by Gzip.
|
||||
CHUNK_SIZE = 1024 * 1024 * 9
|
||||
|
||||
|
||||
@add_metaclass(ABCMeta)
|
||||
class TarLayerFormat(object):
|
||||
""" Class which creates a generator of the combined TAR data. """
|
||||
def __init__(self, tar_iterator, path_prefix=None):
|
||||
self.tar_iterator = tar_iterator
|
||||
self.path_prefix = path_prefix
|
||||
def __init__(self, tar_stream_getter_iterator, path_prefix=None):
|
||||
self.tar_stream_getter_iterator = tar_stream_getter_iterator
|
||||
self.path_prefix = path_prefix or ''
|
||||
|
||||
def get_generator(self):
|
||||
for current_tar in self.tar_iterator():
|
||||
for stream_getter in self.tar_stream_getter_iterator():
|
||||
current_tar_stream = stream_getter()
|
||||
|
||||
# Read the current TAR. If it is empty, we just continue
|
||||
# to the next one.
|
||||
tar_file = None
|
||||
try:
|
||||
tar_file = tarfile.open(mode='r|*', fileobj=current_tar)
|
||||
except tarfile.ReadError as re:
|
||||
if re.message != 'empty file':
|
||||
raise TarLayerReadException('Could not read layer')
|
||||
|
||||
tar_file = TarLayerFormat._tar_file_from_stream(current_tar_stream)
|
||||
if not tar_file:
|
||||
continue
|
||||
|
||||
# For each of the tar entries, yield them IF and ONLY IF we have not
|
||||
# encountered the path before.
|
||||
|
||||
# 9MB (+ padding below) so that it matches the 10MB expected by Gzip.
|
||||
chunk_size = 1024 * 1024 * 9
|
||||
|
||||
dangling_hard_links = defaultdict(list)
|
||||
for tar_info in tar_file:
|
||||
if not self.check_tar_info(tar_info):
|
||||
if not self.should_append_file(tar_info.name):
|
||||
continue
|
||||
|
||||
# Note: We use a copy here because we need to make sure we copy over all the internal
|
||||
# data of the tar header. We cannot use frombuf(tobuf()), however, because it doesn't
|
||||
# properly handle large filenames.
|
||||
clone = copy.deepcopy(tar_info)
|
||||
clone.name = os.path.join(self.path_prefix, clone.name)
|
||||
|
||||
# If the entry is a *hard* link, then prefix it as well. Soft links are relative.
|
||||
if clone.linkname and clone.type == tarfile.LNKTYPE:
|
||||
# If the entry is a dangling hard link, we skip here. Dangling hard links will be handled
|
||||
# in a second pass.
|
||||
if self.is_skipped_file(tar_info.linkname):
|
||||
dangling_hard_links[tar_info.linkname].append(tar_info)
|
||||
continue
|
||||
|
||||
clone.linkname = os.path.join(self.path_prefix, clone.linkname)
|
||||
|
||||
# Yield the tar header.
|
||||
if self.path_prefix:
|
||||
# Note: We use a copy here because we need to make sure we copy over all the internal
|
||||
# data of the tar header. We cannot use frombuf(tobuf()), however, because it doesn't
|
||||
# properly handle large filenames.
|
||||
clone = copy.deepcopy(tar_info)
|
||||
clone.name = os.path.join(self.path_prefix, clone.name)
|
||||
|
||||
# If the entry is a *hard* link, then prefix it as well. Soft links are relative.
|
||||
if clone.linkname and clone.type == tarfile.LNKTYPE:
|
||||
clone.linkname = os.path.join(self.path_prefix, clone.linkname)
|
||||
|
||||
yield clone.tobuf()
|
||||
else:
|
||||
yield tar_info.tobuf()
|
||||
yield clone.tobuf()
|
||||
|
||||
# Try to extract any file contents for the tar. If found, we yield them as well.
|
||||
if tar_info.isreg():
|
||||
file_stream = tar_file.extractfile(tar_info)
|
||||
if file_stream is not None:
|
||||
length = 0
|
||||
while True:
|
||||
current_block = file_stream.read(chunk_size)
|
||||
if not len(current_block):
|
||||
break
|
||||
|
||||
yield current_block
|
||||
length += len(current_block)
|
||||
|
||||
file_stream.close()
|
||||
|
||||
# Files must be padding to 512 byte multiples.
|
||||
if length % 512 != 0:
|
||||
yield '\0' * (512 - (length % 512))
|
||||
for block in TarLayerFormat._emit_file(tar_file, tar_info):
|
||||
yield block
|
||||
|
||||
# Close the layer stream now that we're done with it.
|
||||
tar_file.close()
|
||||
|
||||
# If there are any dangling hard links, open a new stream and retarget the dangling hard
|
||||
# links to a new copy of the contents, which will be placed under the *first* dangling hard
|
||||
# link's name.
|
||||
if len(dangling_hard_links) > 0:
|
||||
tar_file = TarLayerFormat._tar_file_from_stream(stream_getter())
|
||||
if not tar_file:
|
||||
raise TarLayerReadException('Could not re-read tar layer')
|
||||
|
||||
for tar_info in tar_file:
|
||||
# If we encounter a file that holds the data for a dangling link,
|
||||
# emit it under the name of the first dangling hard link. All other
|
||||
# dangling hard links will be retargeted to this first name.
|
||||
if tar_info.name in dangling_hard_links:
|
||||
first_dangling = dangling_hard_links[tar_info.name][0]
|
||||
|
||||
# Copy the first dangling hard link, change it to a normal file,
|
||||
# and emit the deleted file's contents for it.
|
||||
clone = copy.deepcopy(first_dangling)
|
||||
clone.name = os.path.join(self.path_prefix, first_dangling.name)
|
||||
clone.type = tar_info.type
|
||||
clone.size = tar_info.size
|
||||
clone.pax_headers = tar_info.pax_headers
|
||||
yield clone.tobuf()
|
||||
|
||||
for block in TarLayerFormat._emit_file(tar_file, tar_info):
|
||||
yield block
|
||||
|
||||
elif (tar_info.type == tarfile.LNKTYPE and
|
||||
tar_info.linkname in dangling_hard_links and
|
||||
not self.is_skipped_file(tar_info.name)):
|
||||
# Retarget if necessary. All dangling hard links (but the first) will
|
||||
# need to be retargeted.
|
||||
first_dangling = dangling_hard_links[tar_info.linkname][0]
|
||||
if tar_info.name == first_dangling.name:
|
||||
# Skip; the first dangling is handled above.
|
||||
continue
|
||||
|
||||
# Retarget the hard link to the first dangling hard link.
|
||||
clone = copy.deepcopy(tar_info)
|
||||
clone.name = os.path.join(self.path_prefix, clone.name)
|
||||
clone.linkname = os.path.join(self.path_prefix, first_dangling.name)
|
||||
yield clone.tobuf()
|
||||
|
||||
# Close the layer stream now that we're done with it.
|
||||
tar_file.close()
|
||||
|
||||
# Conduct any post-tar work.
|
||||
self.after_tar_layer(current_tar)
|
||||
self.after_tar_layer()
|
||||
|
||||
# Last two records are empty in TAR spec.
|
||||
yield '\0' * 512
|
||||
yield '\0' * 512
|
||||
|
||||
|
||||
def check_tar_info(self, tar_info):
|
||||
""" Returns true if the current tar_info should be added to the combined tar. False
|
||||
otherwise.
|
||||
@abstractmethod
|
||||
def is_skipped_file(self, filename):
|
||||
""" Returns true if the file with the given name will be skipped during append.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
pass
|
||||
|
||||
def after_tar_layer(self, current_tar):
|
||||
@abstractmethod
|
||||
def should_append_file(self, filename):
|
||||
""" Returns true if the file with the given name should be appended when producing
|
||||
the new TAR.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def after_tar_layer(self):
|
||||
""" Invoked after a TAR layer is added, to do any post-add work. """
|
||||
raise NotImplementedError()
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _tar_file_from_stream(stream):
|
||||
tar_file = None
|
||||
try:
|
||||
tar_file = tarfile.open(mode='r|*', fileobj=stream)
|
||||
except tarfile.ReadError as re:
|
||||
if re.message != 'empty file':
|
||||
raise TarLayerReadException('Could not read layer')
|
||||
|
||||
return tar_file
|
||||
|
||||
@staticmethod
|
||||
def _emit_file(tar_file, tar_info):
|
||||
file_stream = tar_file.extractfile(tar_info)
|
||||
if file_stream is not None:
|
||||
length = 0
|
||||
while True:
|
||||
current_block = file_stream.read(CHUNK_SIZE)
|
||||
if not len(current_block):
|
||||
break
|
||||
|
||||
yield current_block
|
||||
length += len(current_block)
|
||||
|
||||
file_stream.close()
|
||||
|
||||
# Files must be padding to 512 byte multiples.
|
||||
if length % 512 != 0:
|
||||
yield '\0' * (512 - (length % 512))
|
||||
|
Reference in a new issue