- Make the layer going over the estimated size raise an exception
- Add a heuristic for estimating the layer size if it is 0 - Add a method where we can add a custom defined map of image -> size
This commit is contained in:
parent
297c8ad29c
commit
746936ce66
4 changed files with 37 additions and 35 deletions
|
@ -1,31 +0,0 @@
|
|||
from data import model
|
||||
from data.database import ImageStorage
|
||||
from app import app, storage as store
|
||||
|
||||
import logging
|
||||
|
||||
def backfill_sizes():
|
||||
count = ImageStorage.select().where(ImageStorage.uncompressed_size == None).count()
|
||||
counter = 0
|
||||
for image_storage in ImageStorage.select().where(ImageStorage.uncompressed_size == None):
|
||||
logging.debug("Backfilling uncompressed size: %s of %s" % (counter, count))
|
||||
|
||||
# Lookup the JSON for the image.
|
||||
uuid = image_storage.uuid
|
||||
with_locations = model.get_storage_by_uuid(uuid)
|
||||
|
||||
json_data = store.get_content(with_locations.locations, store.image_json_path(uuid))
|
||||
size = json_data.get('Size', None)
|
||||
if size is None:
|
||||
continue
|
||||
|
||||
image_storage.uncompressed_size = size
|
||||
image_storage.save()
|
||||
counter += 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logging.getLogger('boto').setLevel(logging.CRITICAL)
|
||||
|
||||
backfill_sizes()
|
|
@ -6,6 +6,14 @@ import copy
|
|||
import json
|
||||
import tarfile
|
||||
|
||||
class FileEstimationException(Exception):
|
||||
""" Exception raised by build_docker_load_stream if the estimated size of the layer TAR
|
||||
was lower than the actual size. This means the sent TAR header is wrong, and we have
|
||||
to fail.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
def build_docker_load_stream(namespace, repository, tag, synthetic_image_id,
|
||||
layer_json, get_image_iterator, get_layer_iterator):
|
||||
""" Builds and streams a synthetic .tar.gz that represents a squashed version
|
||||
|
@ -50,7 +58,9 @@ def _import_format_generator(namespace, repository, tag, synthetic_image_id,
|
|||
# Yield the merged layer data's header.
|
||||
estimated_file_size = 0
|
||||
for image in get_image_iterator():
|
||||
estimated_file_size += image.storage.uncompressed_size or 0
|
||||
estimated_file_size += (image.storage.uncompressed_size or
|
||||
_get_mapped_size(image) or
|
||||
_estimate_size(image))
|
||||
|
||||
yield _tar_file_header(synthetic_image_id + '/layer.tar', estimated_file_size)
|
||||
|
||||
|
@ -60,6 +70,11 @@ def _import_format_generator(namespace, repository, tag, synthetic_image_id,
|
|||
yield entry
|
||||
yielded_size += len(entry)
|
||||
|
||||
# If the yielded size is more than the estimated size (which is unlikely but possible), then
|
||||
# raise an exception since the tar header will be wrong.
|
||||
if yielded_size > estimated_file_size:
|
||||
raise FileEstimationException()
|
||||
|
||||
# If the yielded size is less than the estimated size (which is likely), fill the rest with
|
||||
# zeros.
|
||||
if yielded_size < estimated_file_size:
|
||||
|
@ -113,3 +128,13 @@ def _tar_folder(name):
|
|||
info = tarfile.TarInfo(name=name)
|
||||
info.type = tarfile.DIRTYPE
|
||||
return info.tobuf()
|
||||
|
||||
def _get_mapped_size(image):
|
||||
""" Returns a predefined image size for the given image or None if not found. """
|
||||
return None
|
||||
|
||||
def _estimate_size(image):
|
||||
""" Estimates a file size based on a heuristic. """
|
||||
# More than 1 SD away from the size difference in the DB, as of 9/29/2014
|
||||
return image.storage.image_size * 12
|
||||
|
||||
|
|
|
@ -22,7 +22,11 @@ class QueueFile(object):
|
|||
if result is None:
|
||||
self._done = True
|
||||
break
|
||||
|
||||
|
||||
if isinstance(result, Exception):
|
||||
self._closed = True
|
||||
raise result
|
||||
|
||||
self._buffer += result
|
||||
self._total_size += len(result)
|
||||
|
||||
|
|
|
@ -40,7 +40,11 @@ class QueueProcess(object):
|
|||
def _run(get_producer, queues, chunk_size, args):
|
||||
producer = get_producer(*args)
|
||||
while True:
|
||||
data = producer(chunk_size) or None
|
||||
try:
|
||||
data = producer(chunk_size) or None
|
||||
except Exception as ex:
|
||||
data = ex
|
||||
|
||||
for queue in queues:
|
||||
try:
|
||||
queue.put(data, block=True, timeout=10)
|
||||
|
@ -48,7 +52,7 @@ def _run(get_producer, queues, chunk_size, args):
|
|||
# One of the listeners stopped listening.
|
||||
return
|
||||
|
||||
if data is None:
|
||||
if data is None or isinstance(data, Exception):
|
||||
break
|
||||
|
||||
# Important! This allows the thread that writes the queue data to the pipe
|
||||
|
|
Reference in a new issue