Refactor the util directory to use subpackages.

This commit is contained in:
Jake Moshenko 2015-08-03 15:49:10 -04:00
parent 974ccaa2e7
commit 18100be481
46 changed files with 36 additions and 39 deletions

View file

31
util/registry/aufs.py Normal file
View file

@ -0,0 +1,31 @@
import os
AUFS_METADATA = u'.wh..wh.'
AUFS_WHITEOUT = u'.wh.'
AUFS_WHITEOUT_PREFIX_LENGTH = len(AUFS_WHITEOUT)
def is_aufs_metadata(absolute):
""" Returns whether the given absolute references an AUFS metadata file. """
filename = os.path.basename(absolute)
return filename.startswith(AUFS_METADATA) or absolute.startswith(AUFS_METADATA)
def get_deleted_filename(absolute):
""" Returns the name of the deleted file referenced by the AUFS whiteout file at
the given path or None if the file path does not reference a whiteout file.
"""
filename = os.path.basename(absolute)
if not filename.startswith(AUFS_WHITEOUT):
return None
return filename[AUFS_WHITEOUT_PREFIX_LENGTH:]
def get_deleted_prefix(absolute):
""" Returns the path prefix of the deleted file referenced by the AUFS whiteout file at
the given path or None if the file path does not reference a whiteout file.
"""
deleted_filename = get_deleted_filename(absolute)
if deleted_filename is None:
return None
dirname = os.path.dirname(absolute)
return os.path.join('/', dirname, deleted_filename)[1:]

75
util/registry/changes.py Normal file
View file

@ -0,0 +1,75 @@
import marisa_trie
import os
import tarfile
from util.registry.aufs import is_aufs_metadata, get_deleted_prefix
ALLOWED_TYPES = {tarfile.REGTYPE, tarfile.AREGTYPE}
def files_and_dirs_from_tar(source_stream, removed_prefix_collector):
try:
tar_stream = tarfile.open(mode='r|*', fileobj=source_stream)
except tarfile.ReadError:
# Empty tar file
return
for tar_info in tar_stream:
absolute = os.path.relpath(tar_info.name.decode('utf-8'), './')
# Skip metadata.
if is_aufs_metadata(absolute):
continue
# Add prefixes of removed paths to the collector.
deleted_prefix = get_deleted_prefix(absolute)
if deleted_prefix is not None:
removed_prefix_collector.add(deleted_prefix)
continue
# Otherwise, yield the path if it is in the allowed types.
if tar_info.type in ALLOWED_TYPES:
yield '/' + absolute
def __compute_removed(base_trie, removed_prefixes):
for prefix in removed_prefixes:
for filename in base_trie.keys(prefix):
yield filename
def __compute_added_changed(base_trie, delta_trie):
added = set()
changed = set()
for filename in delta_trie.keys():
if filename not in base_trie:
added.add(filename)
else:
changed.add(filename)
return added, changed
def __new_fs(base_trie, added, removed):
for filename in base_trie.keys():
if filename not in removed:
yield filename
for filename in added:
yield filename
def empty_fs():
return marisa_trie.Trie()
def compute_new_diffs_and_fs(base_trie, filename_source,
removed_prefix_collector):
new_trie = marisa_trie.Trie(filename_source)
(new_added, new_changed) = __compute_added_changed(base_trie, new_trie)
new_removed = marisa_trie.Trie(__compute_removed(base_trie,
removed_prefix_collector))
new_fs = marisa_trie.Trie(__new_fs(base_trie, new_added, new_removed))
return (new_fs, new_added, new_changed, new_removed.keys())

View file

@ -0,0 +1,76 @@
def _complain_ifclosed(closed):
if closed:
raise ValueError, "I/O operation on closed file"
class GeneratorFile(object):
""" File-like object which wraps a Python generator to produce the file contents.
Modeled on StringIO and comments on the file-like interface copied from there.
"""
def __init__(self, generator):
self._generator = generator
self._closed = False
self._buf = ''
def __iter__(self):
return self
def next(self):
"""A file object is its own iterator, for example iter(f) returns f
(unless f is closed). When a file is used as an iterator, typically
in a for loop (for example, for line in f: print line), the next()
method is called repeatedly. This method returns the next input line,
or raises StopIteration when EOF is hit.
"""
_complain_ifclosed(self._closed)
r = self.read()
if not r:
raise StopIteration
return r
def readline(self):
buf = []
while True:
c = self.read(size=1)
buf.append(c)
if c == '\n' or c == '':
return ''.join(buf)
def flush(self):
_complain_ifclosed(self._closed)
def read(self, size=-1):
"""Read at most size bytes from the file
(less if the read hits EOF before obtaining size bytes).
If the size argument is negative or omitted, read all data until EOF
is reached. The bytes are returned as a string object. An empty
string is returned when EOF is encountered immediately.
"""
_complain_ifclosed(self._closed)
buf = self._buf
while size < 0 or len(buf) < size:
try:
buf = buf + self._generator.next()
except StopIteration:
break
returned = ''
if size >= 1:
self._buf = buf[size:]
returned = buf[:size]
else:
self._buf = ''
returned = buf
return returned
def close(self):
self._closed = True
del self._buf
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self._closed = True

View file

@ -0,0 +1,43 @@
"""
Defines utility methods for working with gzip streams.
"""
import zlib
import time
# Window size for decompressing GZIP streams.
# This results in ZLIB automatically detecting the GZIP headers.
# http://stackoverflow.com/questions/3122145/zlib-error-error-3-while-decompressing-incorrect-header-check/22310760#22310760
ZLIB_GZIP_WINDOW = zlib.MAX_WBITS | 32
CHUNK_SIZE = 5 * 1024 * 1024
class SizeInfo(object):
def __init__(self):
self.uncompressed_size = 0
self.compressed_size = 0
def calculate_size_handler():
""" Returns an object and a SocketReader handler. The handler will gunzip the data it receives,
adding the size found to the object.
"""
size_info = SizeInfo()
decompressor = zlib.decompressobj(ZLIB_GZIP_WINDOW)
def fn(buf):
# Note: We set a maximum CHUNK_SIZE to prevent the decompress from taking too much
# memory. As a result, we have to loop until the unconsumed tail is empty.
current_data = buf
size_info.compressed_size += len(current_data)
while len(current_data) > 0:
size_info.uncompressed_size += len(decompressor.decompress(current_data, CHUNK_SIZE))
current_data = decompressor.unconsumed_tail
# Make sure we allow the scheduler to do other work if we get stuck in this tight loop.
if len(current_data) > 0:
time.sleep(0)
return size_info, fn

56
util/registry/gzipwrap.py Normal file
View file

@ -0,0 +1,56 @@
from gzip import GzipFile
# 256K buffer to Gzip
GZIP_BUFFER_SIZE = 1024 * 256
class GzipWrap(object):
def __init__(self, input, filename=None, compresslevel=1):
self.input = iter(input)
self.buffer = ''
self.zipper = GzipFile(filename, mode='wb', fileobj=self, compresslevel=compresslevel)
self.is_done = False
def read(self, size=-1):
# If the buffer already has enough bytes, then simply pop them off of
# the beginning and return them.
if len(self.buffer) >= size or self.is_done:
ret = self.buffer[0:size]
self.buffer = self.buffer[size:]
return ret
# Otherwise, zip the input until we have enough bytes.
while True:
# Attempt to retrieve the next bytes to write.
is_done = False
input_size = 0
input_buffer = ''
while input_size < GZIP_BUFFER_SIZE:
try:
s = self.input.next()
input_buffer += s
input_size = input_size + len(s)
except StopIteration:
is_done = True
break
self.zipper.write(input_buffer)
if is_done:
self.zipper.flush()
self.zipper.close()
self.is_done = True
if len(self.buffer) >= size or is_done:
ret = self.buffer[0:size]
self.buffer = self.buffer[size:]
return ret
def flush(self):
pass
def write(self, data):
self.buffer += data
def close(self):
self.input.close()

View file

@ -0,0 +1,58 @@
from multiprocessing import Queue
import os
class QueueFile(object):
""" Class which implements a file-like interface and reads from a blocking
multiprocessing queue.
"""
def __init__(self, queue, name=None):
self._queue = queue
self._closed = False
self._done = False
self._buffer = ''
self._total_size = 0
self._name = name
self.raised_exception = False
self._exception_handlers = []
def add_exception_handler(self, handler):
self._exception_handlers.append(handler)
def read(self, size=8192):
if self._closed or self._done:
buf = self._buffer[0:size]
self._buffer = self._buffer[size:]
return buf
while len(self._buffer) < size:
result = self._queue.get(block=True)
if result is None:
self._done = True
break
if isinstance(result, Exception):
self._closed = True
self.raised_exception = True
handled = False
for handler in self._exception_handlers:
handler(result)
handled = True
if handled:
return
raise result
self._buffer += result
self._total_size += len(result)
buf = self._buffer[0:size]
self._buffer = self._buffer[size:]
return buf
def flush(self):
pass
def close(self):
self._closed = True

View file

@ -0,0 +1,71 @@
from multiprocessing import Process, Queue
import logging
import multiprocessing
import os
import time
import sys
import traceback
logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.INFO)
class QueueProcess(object):
""" Helper class which invokes a worker in a process to produce
data for one (or more) queues.
"""
def __init__(self, get_producer, chunk_size, max_size, args, finished=None):
self._get_producer = get_producer
self._queues = []
self._chunk_size = chunk_size
self._max_size = max_size
self._args = args or []
self._finished = finished
def create_queue(self):
""" Adds a multiprocessing queue to the list of queues. Any queues added
will have the data produced appended.
"""
queue = Queue(self._max_size / self._chunk_size)
self._queues.append(queue)
return queue
@staticmethod
def run_process(target, args, finished=None):
def _target(tar, arg, fin):
try:
tar(*args)
finally:
if fin:
fin()
Process(target=_target, args=(target, args, finished)).start()
def run(self):
# Important! gipc is used here because normal multiprocessing does not work
# correctly with gevent when we sleep.
args = (self._get_producer, self._queues, self._chunk_size, self._args)
QueueProcess.run_process(_run, args, finished=self._finished)
def _run(get_producer, queues, chunk_size, args):
producer = get_producer(*args)
while True:
try:
data = producer(chunk_size) or None
except Exception as ex:
message = '%s\n%s' % (ex.message, "".join(traceback.format_exception(*sys.exc_info())))
data = Exception(message)
for queue in queues:
try:
queue.put(data, block=True)
except Exception as ex:
logger.exception('Exception writing to queue.')
return
if data is None or isinstance(data, Exception):
break
# Important! This allows the thread that writes the queue data to the pipe
# to do so. Otherwise, this hangs.
time.sleep(0)

View file

@ -0,0 +1,53 @@
import marisa_trie
import os
from util.registry.aufs import is_aufs_metadata, get_deleted_prefix
from util.registry.tarlayerformat import TarLayerFormat
AUFS_METADATA = u'.wh..wh.'
AUFS_WHITEOUT = u'.wh.'
AUFS_WHITEOUT_PREFIX_LENGTH = len(AUFS_WHITEOUT)
class StreamLayerMerger(TarLayerFormat):
""" Class which creates a generator of the combined TAR data for a set of Docker layers. """
def __init__(self, layer_iterator, path_prefix=None):
super(StreamLayerMerger, self).__init__(layer_iterator, path_prefix)
self.path_trie = marisa_trie.Trie()
self.path_encountered = []
self.prefix_trie = marisa_trie.Trie()
self.prefix_encountered = []
def after_tar_layer(self, current_layer):
# Update the tries.
self.path_trie = marisa_trie.Trie(self.path_encountered)
self.prefix_trie = marisa_trie.Trie(self.prefix_encountered)
def check_tar_info(self, tar_info):
absolute = os.path.relpath(tar_info.name.decode('utf-8'), './')
# Skip metadata.
if is_aufs_metadata(absolute):
return False
# Add any prefix of deleted paths to the prefix list.
deleted_prefix = get_deleted_prefix(absolute)
if deleted_prefix is not None:
self.prefix_encountered.append(deleted_prefix)
return False
# Check if this file has already been encountered somewhere. If so,
# skip it.
ubsolute = unicode(absolute)
if ubsolute in self.path_trie:
return False
# Check if this file is under a deleted path.
for prefix in self.prefix_trie.iter_prefixes(ubsolute):
if not os.path.relpath(ubsolute, prefix).startswith('..'):
return False
# Otherwise, add the path to the encountered list and return it.
self.path_encountered.append(absolute)
return True

View file

@ -0,0 +1,56 @@
import tarfile
from cStringIO import StringIO
from util.registry.tarlayerformat import TarLayerFormat
from util.registry.gzipwrap import GzipWrap
class TarfileAppender(TarLayerFormat):
""" Helper class which allows for appending entries to a gzipped-tarfile and doing so
in a streaming manner.
"""
def __init__(self, base_tar_file, entries):
super(TarfileAppender, self).__init__(self._get_tar_iterator)
self.entries = entries
self.base_tar_file = base_tar_file
self.first_info = None
def get_stream(self):
return GzipWrap(self.get_generator())
def after_tar_layer(self, current_layer):
pass
def check_tar_info(self, tar_info):
if not self.first_info:
self.first_info = tar_info
return True
def _get_tar_iterator(self):
# Yield the contents of the base tar.
yield self.base_tar_file
# Construct an in-memory tar containing the entries to append, and then yield
# its data.
def add_entry(arch, dir_path, contents=None):
info = tarfile.TarInfo(dir_path)
info.uid = self.first_info.uid
info.gid = self.first_info.gid
info.mode = self.first_info.mode
info.mtime = self.first_info.mtime
info.type = tarfile.REGTYPE if contents else tarfile.DIRTYPE
if contents:
info.size = len(contents)
arch.addfile(info, fileobj=StringIO(contents) if contents else None)
append_tarball = StringIO()
with tarfile.open(fileobj=append_tarball, mode='w') as updated_archive:
for entry in self.entries:
add_entry(updated_archive, entry, self.entries[entry])
# To make tarfile happy.
append_tarball.seek(0)
yield append_tarball

View file

@ -0,0 +1,94 @@
import os
import tarfile
import copy
class TarLayerReadException(Exception):
""" Exception raised when reading a layer has failed. """
pass
class TarLayerFormat(object):
""" Class which creates a generator of the combined TAR data. """
def __init__(self, tar_iterator, path_prefix=None):
self.tar_iterator = tar_iterator
self.path_prefix = path_prefix
def get_generator(self):
for current_tar in self.tar_iterator():
# Read the current TAR. If it is empty, we just continue
# to the next one.
tar_file = None
try:
tar_file = tarfile.open(mode='r|*', fileobj=current_tar)
except tarfile.ReadError as re:
if re.message != 'empty file':
raise TarLayerReadException('Could not read layer')
if not tar_file:
continue
# For each of the tar entries, yield them IF and ONLY IF we have not
# encountered the path before.
# 9MB (+ padding below) so that it matches the 10MB expected by Gzip.
chunk_size = 1024 * 1024 * 9
for tar_info in tar_file:
if not self.check_tar_info(tar_info):
continue
# Yield the tar header.
if self.path_prefix:
# Note: We use a copy here because we need to make sure we copy over all the internal
# data of the tar header. We cannot use frombuf(tobuf()), however, because it doesn't
# properly handle large filenames.
clone = copy.deepcopy(tar_info)
clone.name = os.path.join(self.path_prefix, clone.name)
# If the entry is a *hard* link, then prefix it as well. Soft links are relative.
if clone.linkname and clone.type == tarfile.LNKTYPE:
clone.linkname = os.path.join(self.path_prefix, clone.linkname)
yield clone.tobuf()
else:
yield tar_info.tobuf()
# Try to extract any file contents for the tar. If found, we yield them as well.
if tar_info.isreg():
file_stream = tar_file.extractfile(tar_info)
if file_stream is not None:
length = 0
while True:
current_block = file_stream.read(chunk_size)
if not len(current_block):
break
yield current_block
length += len(current_block)
file_stream.close()
# Files must be padding to 512 byte multiples.
if length % 512 != 0:
yield '\0' * (512 - (length % 512))
# Close the layer stream now that we're done with it.
tar_file.close()
# Conduct any post-tar work.
self.after_tar_layer(current_tar)
# Last two records are empty in TAR spec.
yield '\0' * 512
yield '\0' * 512
def check_tar_info(self, tar_info):
""" Returns true if the current tar_info should be added to the combined tar. False
otherwise.
"""
raise NotImplementedError()
def after_tar_layer(self, current_tar):
""" Invoked after a TAR layer is added, to do any post-add work. """
raise NotImplementedError()