From e273dca4b499636c273d11e3c858fdbcbce30d36 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 19 Sep 2014 12:22:54 -0400 Subject: [PATCH] Change back to using a docker load format --- endpoints/verbs.py | 22 ++++++-- static/js/controllers.js | 8 +-- util/dockerloadformat.py | 115 ++++++++++++++++++++++++++++++++++++++ util/streamlayerformat.py | 46 +++++++-------- 4 files changed, 160 insertions(+), 31 deletions(-) create mode 100644 util/dockerloadformat.py diff --git a/endpoints/verbs.py b/endpoints/verbs.py index 92d523537..8cc421098 100644 --- a/endpoints/verbs.py +++ b/endpoints/verbs.py @@ -1,5 +1,6 @@ import logging import json +import hashlib from flask import redirect, Blueprint, abort, send_file @@ -12,14 +13,18 @@ from data import database from util.queuefile import QueueFile from util.queueprocess import QueueProcess from util.gzipwrap import GzipWrap -from util.streamlayerformat import StreamLayerMerger +from util.dockerloadformat import build_docker_load_stream verbs = Blueprint('verbs', __name__) logger = logging.getLogger(__name__) -def _open_stream(namespace, repository, image_list): +def _open_stream(namespace, repository, tag, synthetic_image_id, image_json, image_list): + def get_next_image(): + for current_image_id in image_list: + yield model.get_repo_image(namespace, repository, current_image_id) + def get_next_layer(): for current_image_id in image_list: current_image_entry = model.get_repo_image(namespace, repository, current_image_id) @@ -31,7 +36,9 @@ def _open_stream(namespace, repository, image_list): yield current_image_stream database.configure(app.config) - stream = GzipWrap(StreamLayerMerger(get_next_layer).get_generator()) + stream = build_docker_load_stream(namespace, repository, tag, synthetic_image_id, image_json, + get_next_image, get_next_layer) + return stream.read @@ -80,9 +87,16 @@ def get_squashed_tag(namespace, repository, tag): ancestry_data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid)) full_image_list = json.loads(ancestry_data) + # Load the image's JSON layer. + image_json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid)) + image_json = json.loads(image_json_data) + + # Calculate a synthetic image ID. + synthetic_image_id = hashlib.sha256(tag_image.docker_image_id + ':squash').hexdigest() + # Create a queue process to generate the data. The queue files will read from the process # and send the results to the client and storage. - args = (namespace, repository, full_image_list) + args = (namespace, repository, tag, synthetic_image_id, image_json, full_image_list) queue_process = QueueProcess(_open_stream, 8 * 1024, 10 * 1024 * 1024, args) # 8K/10M chunk/max client_queue_file = QueueFile(queue_process.create_queue(), 'client') diff --git a/static/js/controllers.js b/static/js/controllers.js index f781f4dac..d4ef7fe56 100644 --- a/static/js/controllers.js +++ b/static/js/controllers.js @@ -422,10 +422,10 @@ function RepoCtrl($scope, $sanitize, Restangular, ImageMetadataService, ApiServi }); if ($scope.currentTag) { - var squash = 'docker import ' + Config.getHost('ACCOUNTNAME:PASSWORDORTOKEN'); - squash += '/verbs/v1/' + namespace + '/' + name + '/' + $scope.currentTag.name + '/squash'; - squash += ' '; - squash += Config.getDomain() + '/' + namespace + '/' + name + '/' + $scope.currentTag.name + '.squash'; + var squash = 'curl ' + Config.getHost('ACCOUNTNAME:PASSWORDORTOKEN'); + squash += '/verbs/v1/repositories/' + namespace + '/' + name + '/'; + squash += $scope.currentTag.name + '/squash'; + squash += ' | docker load'; $scope.pullCommands.push({ 'title': 'Squashed image (Tag ' + $scope.currentTag.name + ')', diff --git a/util/dockerloadformat.py b/util/dockerloadformat.py new file mode 100644 index 000000000..2979bc70b --- /dev/null +++ b/util/dockerloadformat.py @@ -0,0 +1,115 @@ +from util.gzipwrap import GzipWrap +from util.streamlayerformat import StreamLayerMerger +from app import app + +import copy +import json +import tarfile + +def build_docker_load_stream(namespace, repository, tag, synthetic_image_id, + layer_json, get_image_iterator, get_layer_iterator): + """ Builds and streams a synthetic .tar.gz that represents a squashed version + of the given layers, in `docker load` V1 format. + """ + return GzipWrap(_import_format_generator(namespace, repository, tag, + synthetic_image_id, layer_json, + get_image_iterator, get_layer_iterator)) + + +def _import_format_generator(namespace, repository, tag, synthetic_image_id, + layer_json, get_image_iterator, get_layer_iterator): + + # Docker import V1 Format (.tar): + # repositories - JSON file containing a repo -> tag -> image map + # {image ID folder}: + # json - The layer JSON + # layer.tar - The TARed contents of the layer + # VERSION - The docker import version: '1.0' + layer_merger = StreamLayerMerger(get_layer_iterator) + + # Yield the repositories file: + synthetic_layer_info = {} + synthetic_layer_info[tag + '.squash'] = synthetic_image_id + + hostname = app.config['SERVER_HOSTNAME'] + repositories = {} + repositories[hostname + '/' + namespace + '/' + repository] = synthetic_layer_info + + yield _tar_file('repositories', json.dumps(repositories)) + + # Yield the image ID folder. + yield _tar_folder(synthetic_image_id) + + # Yield the JSON layer data. + layer_json = _build_layer_json(layer_json, synthetic_image_id) + yield _tar_file(synthetic_image_id + '/json', json.dumps(layer_json)) + + # Yield the VERSION file. + yield _tar_file(synthetic_image_id + '/VERSION', '1.0') + + # Yield the merged layer data's header. + estimated_file_size = 0 + for image in get_image_iterator(): + estimated_file_size += image.storage.uncompressed_size or 0 + + yield _tar_file_header(synthetic_image_id + '/layer.tar', estimated_file_size) + + # Yield the contents of the merged layer. + yielded_size = 0 + for entry in layer_merger.get_generator(): + yield entry + yielded_size += len(entry) + + # If the yielded size is less than the estimated size (which is likely), fill the rest with + # zeros. + if yielded_size < estimated_file_size: + yield '\0' * (estimated_file_size - yielded_size) + + # Yield any file padding to 512 bytes that is necessary. + yield _tar_file_padding(estimated_file_size) + + # Last two records are empty in TAR spec. + yield '\0' * 512 + yield '\0' * 512 + + +def _build_layer_json(layer_json, synthetic_image_id): + updated_json = copy.deepcopy(layer_json) + updated_json['id'] = synthetic_image_id + + if 'parent' in updated_json: + del updated_json['parent'] + + if 'config' in updated_json and 'Image' in updated_json['config']: + updated_json['config']['Image'] = synthetic_image_id + + if 'container_config' in updated_json and 'Image' in updated_json['container_config']: + updated_json['container_config']['Image'] = synthetic_image_id + + return updated_json + + +def _tar_file(name, contents): + length = len(contents) + tar_data = _tar_file_header(name, length) + tar_data += contents + tar_data += _tar_file_padding(length) + return tar_data + + +def _tar_file_padding(length): + if length % 512 != 0: + return '\0' * (512 - (length % 512)) + + +def _tar_file_header(name, file_size): + info = tarfile.TarInfo(name=name) + info.type = tarfile.REGTYPE + info.size = file_size + return info.tobuf() + + +def _tar_folder(name): + info = tarfile.TarInfo(name=name) + info.type = tarfile.DIRTYPE + return info.tobuf() diff --git a/util/streamlayerformat.py b/util/streamlayerformat.py index 757d1b4ef..e8ae3eb3d 100644 --- a/util/streamlayerformat.py +++ b/util/streamlayerformat.py @@ -32,30 +32,30 @@ class StreamLayerMerger(object): chunk_size = 1024 * 1024 * 9 for tar_info in tar_file: - result = self.process_tar_info(tar_info) - if not result: + if not self.check_tar_info(tar_info): continue - (tarinfo, filebuf) = result + # Yield the tar header. + yield tar_info.tobuf() - yield tarinfo.tobuf() + # Try to extract any file contents for the tar. If found, we yield them as well. + if tar_info.isreg(): + file_stream = tar_file.extractfile(tar_info) + if file_stream is not None: + length = 0 + while True: + current_block = file_stream.read(chunk_size) + if not len(current_block): + break - if filebuf: - length = 0 - file_stream = tar_file.extractfile(tarinfo) - while True: - current_block = file_stream.read(chunk_size) - if not len(current_block): - break + yield current_block + length += len(current_block) - yield current_block - length += len(current_block) + file_stream.close() - file_stream.close() - - # Files must be padding to 512 byte multiples. - if length % 512 != 0: - yield '\0' * (512 - (length % 512)) + # Files must be padding to 512 byte multiples. + if length % 512 != 0: + yield '\0' * (512 - (length % 512)) # Close the layer stream now that we're done with it. tar_file.close() @@ -68,24 +68,24 @@ class StreamLayerMerger(object): yield '\0' * 512 - def process_tar_info(self, tar_info): + def check_tar_info(self, tar_info): absolute = os.path.relpath(tar_info.name.decode('utf-8'), './') # Skip metadata. if is_aufs_metadata(absolute): - return None + return False # Add any prefix of deleted paths to the prefix list. deleted_prefix = get_deleted_prefix(absolute) if deleted_prefix is not None: self.encountered.append(deleted_prefix) - return None + return False # Check if this file has already been encountered somewhere. If so, # skip it. if unicode(absolute) in self.trie: - return None + return False # Otherwise, add the path to the encountered list and return it. self.encountered.append(absolute) - return (tar_info, tar_info.isfile() or tar_info.isdev()) + return True