- Merge branch 'master' into sha-lom

- Extract out the tar handling from streamlayerformat into tarlayerformat
- Add a new tarfileappender class to make it easy to append data to gzipped tars
- Fix the gzipwrap to properly close
- Have the .git injection use the new appender
This commit is contained in:
Joseph Schorr 2014-10-15 15:51:34 -04:00
commit d43109d7cb
48 changed files with 1232 additions and 532 deletions

View file

@ -4,28 +4,28 @@ AUFS_METADATA = u'.wh..wh.'
AUFS_WHITEOUT = u'.wh.'
AUFS_WHITEOUT_PREFIX_LENGTH = len(AUFS_WHITEOUT)
def is_aufs_metadata(filepath):
""" Returns whether the given filepath references an AUFS metadata file. """
filename = os.path.basename(filepath)
return filename.startswith(AUFS_METADATA) or filepath.startswith(AUFS_METADATA)
def is_aufs_metadata(absolute):
""" Returns whether the given absolute references an AUFS metadata file. """
filename = os.path.basename(absolute)
return filename.startswith(AUFS_METADATA) or absolute.startswith(AUFS_METADATA)
def get_deleted_filename(filepath):
def get_deleted_filename(absolute):
""" Returns the name of the deleted file referenced by the AUFS whiteout file at
the given path or None if the file path does not reference a whiteout file.
"""
filename = os.path.basename(filepath)
filename = os.path.basename(absolute)
if not filename.startswith(AUFS_WHITEOUT):
return None
return filename[AUFS_WHITEOUT_PREFIX_LENGTH:]
def get_deleted_prefix(filepath):
def get_deleted_prefix(absolute):
""" Returns the path prefix of the deleted file referenced by the AUFS whiteout file at
the given path or None if the file path does not reference a whiteout file.
"""
deleted_filename = get_deleted_filename(filepath)
deleted_filename = get_deleted_filename(absolute)
if deleted_filename is None:
return None
dirname = os.path.dirname(filepath)
return os.path.join('/', dirname, deleted_filename)
dirname = os.path.dirname(absolute)
return os.path.join('/', dirname, deleted_filename)[1:]

View file

@ -1,4 +1,4 @@
from util.gzipwrap import GzipWrap
from util.gzipwrap import GzipWrap, GZIP_BUFFER_SIZE
from util.streamlayerformat import StreamLayerMerger
from app import app
@ -75,7 +75,11 @@ def _import_format_generator(namespace, repository, tag, synthetic_image_id,
# If the yielded size is less than the estimated size (which is likely), fill the rest with
# zeros.
if yielded_size < estimated_file_size:
yield '\0' * (estimated_file_size - yielded_size)
to_yield = estimated_file_size - yielded_size
while to_yield > 0:
yielded = min(to_yield, GZIP_BUFFER_SIZE)
yield '\0' * yielded
to_yield -= yielded
# Yield any file padding to 512 bytes that is necessary.
yield _tar_file_padding(estimated_file_size)

View file

@ -1,12 +1,19 @@
from gzip import GzipFile
# 256K buffer to Gzip
GZIP_BUFFER_SIZE = 1024 * 256
class GzipWrap(object):
def __init__(self, input, filename=None, compresslevel=1):
self.input = iter(input)
self.buffer = ''
self.zipper = GzipFile(filename, mode='wb', fileobj=self, compresslevel=compresslevel)
self.is_done = False
def read(self, size=-1):
if self.is_done:
return ''
# If the buffer already has enough bytes, then simply pop them off of
# the beginning and return them.
if len(self.buffer) >= size:
@ -21,7 +28,7 @@ class GzipWrap(object):
input_size = 0
input_buffer = ''
while input_size < 1024 * 256: # 256K buffer to Gzip
while input_size < GZIP_BUFFER_SIZE:
try:
s = self.input.next()
input_buffer += s
@ -34,6 +41,8 @@ class GzipWrap(object):
if is_done:
self.zipper.flush()
self.zipper.close()
self.is_done = True
if len(self.buffer) >= size or is_done:
ret = self.buffer[0:size]

View file

@ -3,7 +3,6 @@ import logging
import multiprocessing
import os
import time
import gipc
import sys
import traceback
@ -31,7 +30,7 @@ class QueueProcess(object):
@staticmethod
def run_process(target, args):
gipc.start_process(target=target, args=args)
Process(target=target, args=args).start()
def run(self):
# Important! gipc is used here because normal multiprocessing does not work
@ -50,9 +49,9 @@ def _run(get_producer, queues, chunk_size, args):
for queue in queues:
try:
queue.put(data, block=True, timeout=10)
queue.put(data, block=True)
except Exception as ex:
# One of the listeners stopped listening.
logger.exception('Exception writing to queue.')
return
if data is None or isinstance(data, Exception):

View file

@ -2,72 +2,29 @@ import marisa_trie
import os
import tarfile
from aufs import is_aufs_metadata, get_deleted_prefix
from util.tarlayerformat import TarLayerFormat
AUFS_METADATA = u'.wh..wh.'
AUFS_WHITEOUT = u'.wh.'
AUFS_WHITEOUT_PREFIX_LENGTH = len(AUFS_WHITEOUT)
class StreamLayerMerger(object):
class StreamLayerMerger(TarLayerFormat):
""" Class which creates a generator of the combined TAR data for a set of Docker layers. """
def __init__(self, layer_iterator):
self.trie = marisa_trie.Trie()
self.layer_iterator = layer_iterator
self.encountered = []
super(StreamLayerMerger, self).__init__(layer_iterator)
def get_generator(self):
for current_layer in self.layer_iterator():
# Read the current layer as TAR. If it is empty, we just continue
# to the next layer.
try:
tar_file = tarfile.open(mode='r|*', fileobj=current_layer)
except tarfile.ReadError as re:
continue
self.path_trie = marisa_trie.Trie()
self.path_encountered = []
# For each of the tar entries, yield them IF and ONLY IF we have not
# encountered the path before.
self.prefix_trie = marisa_trie.Trie()
self.prefix_encountered = []
# 9MB (+ padding below) so that it matches the 10MB expected by Gzip.
chunk_size = 1024 * 1024 * 9
for tar_info in tar_file:
if not self.check_tar_info(tar_info):
continue
# Yield the tar header.
yield tar_info.tobuf()
# Try to extract any file contents for the tar. If found, we yield them as well.
if tar_info.isreg():
file_stream = tar_file.extractfile(tar_info)
if file_stream is not None:
length = 0
while True:
current_block = file_stream.read(chunk_size)
if not len(current_block):
break
yield current_block
length += len(current_block)
file_stream.close()
# Files must be padding to 512 byte multiples.
if length % 512 != 0:
yield '\0' * (512 - (length % 512))
# Close the layer stream now that we're done with it.
tar_file.close()
# Update the trie with the new encountered entries.
self.trie = marisa_trie.Trie(self.encountered)
def after_tar_layer(stream, current_layer):
# Update the tries.
self.path_trie = marisa_trie.Trie(self.path_encountered)
self.prefix_trie = marisa_trie.Trie(self.prefix_encountered)
# Last two records are empty in TAR spec.
yield '\0' * 512
yield '\0' * 512
def check_tar_info(self, tar_info):
absolute = os.path.relpath(tar_info.name.decode('utf-8'), './')
@ -76,16 +33,22 @@ class StreamLayerMerger(object):
return False
# Add any prefix of deleted paths to the prefix list.
deleted_prefix = get_deleted_prefix(absolute)
deleted_prefix = get_deleted_prefix(absolute)
if deleted_prefix is not None:
self.encountered.append(deleted_prefix)
self.prefix_encountered.append(deleted_prefix)
return False
# Check if this file has already been encountered somewhere. If so,
# skip it.
if unicode(absolute) in self.trie:
ubsolute = unicode(absolute)
if ubsolute in self.path_trie:
return False
# Check if this file is under a deleted path.
for prefix in self.prefix_trie.iter_prefixes(ubsolute):
if not os.path.relpath(ubsolute, prefix).startswith('..'):
return False
# Otherwise, add the path to the encountered list and return it.
self.encountered.append(absolute)
self.path_encountered.append(absolute)
return True

52
util/tarfileappender.py Normal file
View file

@ -0,0 +1,52 @@
import tarfile
from cStringIO import StringIO
from util.tarlayerformat import TarLayerFormat
from util.gzipwrap import GzipWrap
class TarfileAppender(TarLayerFormat):
""" Helper class which allows for appending entries to a gzipped-tarfile and doing so
in a streaming manner.
"""
def __init__(self, base_tar_file, entries):
super(TarfileAppender, self).__init__(self._get_tar_iterator)
self.entries = entries
self.base_tar_file = base_tar_file
self.last_info = None
def get_stream(self):
return GzipWrap(self.get_generator())
def after_tar_layer(stream, current_layer):
pass
def check_tar_info(self, tar_info):
self.last_info = tar_info
return True
def _get_tar_iterator(self):
# Yield the contents of the base tar.
yield self.base_tar_file
# Construct an in-memory tar containing the entries to append, and then yield
# its data.
def add_entry(arch, dir_path, contents=None):
info = tarfile.TarInfo(dir_path)
info.uid = self.last_info.uid
info.gid = self.last_info.gid
info.type = tarfile.REGTYPE if contents else tarfile.DIRTYPE
if contents:
info.size = len(contents)
arch.addfile(info, fileobj=StringIO(contents) if contents else None)
append_tarball = StringIO()
with tarfile.open(fileobj=append_tarball, mode='w') as updated_archive:
for entry in self.entries:
add_entry(updated_archive, entry, self.entries[entry])
# To make tarfile happy.
append_tarball.seek(0)
yield append_tarball

70
util/tarlayerformat.py Normal file
View file

@ -0,0 +1,70 @@
import os
import tarfile
class TarLayerFormat(object):
""" Class which creates a generator of the combined TAR data. """
def __init__(self, tar_iterator):
self.tar_iterator = tar_iterator
def get_generator(self):
for current_tar in self.tar_iterator():
# Read the current TAR. If it is empty, we just continue
# to the next one.
try:
tar_file = tarfile.open(mode='r|*', fileobj=current_tar)
except tarfile.ReadError as re:
raise re
continue
# For each of the tar entries, yield them IF and ONLY IF we have not
# encountered the path before.
# 9MB (+ padding below) so that it matches the 10MB expected by Gzip.
chunk_size = 1024 * 1024 * 9
for tar_info in tar_file:
if not self.check_tar_info(tar_info):
continue
# Yield the tar header.
yield tar_info.tobuf()
# Try to extract any file contents for the tar. If found, we yield them as well.
if tar_info.isreg():
file_stream = tar_file.extractfile(tar_info)
if file_stream is not None:
length = 0
while True:
current_block = file_stream.read(chunk_size)
if not len(current_block):
break
yield current_block
length += len(current_block)
file_stream.close()
# Files must be padding to 512 byte multiples.
if length % 512 != 0:
yield '\0' * (512 - (length % 512))
# Close the layer stream now that we're done with it.
tar_file.close()
# Conduct any post-tar work.
self.after_tar_layer(current_tar)
# Last two records are empty in TAR spec.
yield '\0' * 512
yield '\0' * 512
def check_tar_info(self, tar_info):
""" Returns true if the current tar_info should be added to the combined tar. False
otherwise.
"""
raise NotImplementedError()
def after_tar_layer(self, current_tar):
""" Invoked after a TAR layer is added, to do any post-add work. """
raise NotImplementedError()