Change back to using a docker load format
This commit is contained in:
parent
99d57752f5
commit
e273dca4b4
4 changed files with 160 additions and 31 deletions
|
@ -1,5 +1,6 @@
|
||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
|
import hashlib
|
||||||
|
|
||||||
from flask import redirect, Blueprint, abort, send_file
|
from flask import redirect, Blueprint, abort, send_file
|
||||||
|
|
||||||
|
@ -12,14 +13,18 @@ from data import database
|
||||||
from util.queuefile import QueueFile
|
from util.queuefile import QueueFile
|
||||||
from util.queueprocess import QueueProcess
|
from util.queueprocess import QueueProcess
|
||||||
from util.gzipwrap import GzipWrap
|
from util.gzipwrap import GzipWrap
|
||||||
from util.streamlayerformat import StreamLayerMerger
|
from util.dockerloadformat import build_docker_load_stream
|
||||||
|
|
||||||
|
|
||||||
verbs = Blueprint('verbs', __name__)
|
verbs = Blueprint('verbs', __name__)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def _open_stream(namespace, repository, image_list):
|
def _open_stream(namespace, repository, tag, synthetic_image_id, image_json, image_list):
|
||||||
|
def get_next_image():
|
||||||
|
for current_image_id in image_list:
|
||||||
|
yield model.get_repo_image(namespace, repository, current_image_id)
|
||||||
|
|
||||||
def get_next_layer():
|
def get_next_layer():
|
||||||
for current_image_id in image_list:
|
for current_image_id in image_list:
|
||||||
current_image_entry = model.get_repo_image(namespace, repository, current_image_id)
|
current_image_entry = model.get_repo_image(namespace, repository, current_image_id)
|
||||||
|
@ -31,7 +36,9 @@ def _open_stream(namespace, repository, image_list):
|
||||||
yield current_image_stream
|
yield current_image_stream
|
||||||
|
|
||||||
database.configure(app.config)
|
database.configure(app.config)
|
||||||
stream = GzipWrap(StreamLayerMerger(get_next_layer).get_generator())
|
stream = build_docker_load_stream(namespace, repository, tag, synthetic_image_id, image_json,
|
||||||
|
get_next_image, get_next_layer)
|
||||||
|
|
||||||
return stream.read
|
return stream.read
|
||||||
|
|
||||||
|
|
||||||
|
@ -80,9 +87,16 @@ def get_squashed_tag(namespace, repository, tag):
|
||||||
ancestry_data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid))
|
ancestry_data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid))
|
||||||
full_image_list = json.loads(ancestry_data)
|
full_image_list = json.loads(ancestry_data)
|
||||||
|
|
||||||
|
# Load the image's JSON layer.
|
||||||
|
image_json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid))
|
||||||
|
image_json = json.loads(image_json_data)
|
||||||
|
|
||||||
|
# Calculate a synthetic image ID.
|
||||||
|
synthetic_image_id = hashlib.sha256(tag_image.docker_image_id + ':squash').hexdigest()
|
||||||
|
|
||||||
# Create a queue process to generate the data. The queue files will read from the process
|
# Create a queue process to generate the data. The queue files will read from the process
|
||||||
# and send the results to the client and storage.
|
# and send the results to the client and storage.
|
||||||
args = (namespace, repository, full_image_list)
|
args = (namespace, repository, tag, synthetic_image_id, image_json, full_image_list)
|
||||||
queue_process = QueueProcess(_open_stream, 8 * 1024, 10 * 1024 * 1024, args) # 8K/10M chunk/max
|
queue_process = QueueProcess(_open_stream, 8 * 1024, 10 * 1024 * 1024, args) # 8K/10M chunk/max
|
||||||
|
|
||||||
client_queue_file = QueueFile(queue_process.create_queue(), 'client')
|
client_queue_file = QueueFile(queue_process.create_queue(), 'client')
|
||||||
|
|
|
@ -422,10 +422,10 @@ function RepoCtrl($scope, $sanitize, Restangular, ImageMetadataService, ApiServi
|
||||||
});
|
});
|
||||||
|
|
||||||
if ($scope.currentTag) {
|
if ($scope.currentTag) {
|
||||||
var squash = 'docker import ' + Config.getHost('ACCOUNTNAME:PASSWORDORTOKEN');
|
var squash = 'curl ' + Config.getHost('ACCOUNTNAME:PASSWORDORTOKEN');
|
||||||
squash += '/verbs/v1/' + namespace + '/' + name + '/' + $scope.currentTag.name + '/squash';
|
squash += '/verbs/v1/repositories/' + namespace + '/' + name + '/';
|
||||||
squash += ' ';
|
squash += $scope.currentTag.name + '/squash';
|
||||||
squash += Config.getDomain() + '/' + namespace + '/' + name + '/' + $scope.currentTag.name + '.squash';
|
squash += ' | docker load';
|
||||||
|
|
||||||
$scope.pullCommands.push({
|
$scope.pullCommands.push({
|
||||||
'title': 'Squashed image (Tag ' + $scope.currentTag.name + ')',
|
'title': 'Squashed image (Tag ' + $scope.currentTag.name + ')',
|
||||||
|
|
115
util/dockerloadformat.py
Normal file
115
util/dockerloadformat.py
Normal file
|
@ -0,0 +1,115 @@
|
||||||
|
from util.gzipwrap import GzipWrap
|
||||||
|
from util.streamlayerformat import StreamLayerMerger
|
||||||
|
from app import app
|
||||||
|
|
||||||
|
import copy
|
||||||
|
import json
|
||||||
|
import tarfile
|
||||||
|
|
||||||
|
def build_docker_load_stream(namespace, repository, tag, synthetic_image_id,
|
||||||
|
layer_json, get_image_iterator, get_layer_iterator):
|
||||||
|
""" Builds and streams a synthetic .tar.gz that represents a squashed version
|
||||||
|
of the given layers, in `docker load` V1 format.
|
||||||
|
"""
|
||||||
|
return GzipWrap(_import_format_generator(namespace, repository, tag,
|
||||||
|
synthetic_image_id, layer_json,
|
||||||
|
get_image_iterator, get_layer_iterator))
|
||||||
|
|
||||||
|
|
||||||
|
def _import_format_generator(namespace, repository, tag, synthetic_image_id,
|
||||||
|
layer_json, get_image_iterator, get_layer_iterator):
|
||||||
|
|
||||||
|
# Docker import V1 Format (.tar):
|
||||||
|
# repositories - JSON file containing a repo -> tag -> image map
|
||||||
|
# {image ID folder}:
|
||||||
|
# json - The layer JSON
|
||||||
|
# layer.tar - The TARed contents of the layer
|
||||||
|
# VERSION - The docker import version: '1.0'
|
||||||
|
layer_merger = StreamLayerMerger(get_layer_iterator)
|
||||||
|
|
||||||
|
# Yield the repositories file:
|
||||||
|
synthetic_layer_info = {}
|
||||||
|
synthetic_layer_info[tag + '.squash'] = synthetic_image_id
|
||||||
|
|
||||||
|
hostname = app.config['SERVER_HOSTNAME']
|
||||||
|
repositories = {}
|
||||||
|
repositories[hostname + '/' + namespace + '/' + repository] = synthetic_layer_info
|
||||||
|
|
||||||
|
yield _tar_file('repositories', json.dumps(repositories))
|
||||||
|
|
||||||
|
# Yield the image ID folder.
|
||||||
|
yield _tar_folder(synthetic_image_id)
|
||||||
|
|
||||||
|
# Yield the JSON layer data.
|
||||||
|
layer_json = _build_layer_json(layer_json, synthetic_image_id)
|
||||||
|
yield _tar_file(synthetic_image_id + '/json', json.dumps(layer_json))
|
||||||
|
|
||||||
|
# Yield the VERSION file.
|
||||||
|
yield _tar_file(synthetic_image_id + '/VERSION', '1.0')
|
||||||
|
|
||||||
|
# Yield the merged layer data's header.
|
||||||
|
estimated_file_size = 0
|
||||||
|
for image in get_image_iterator():
|
||||||
|
estimated_file_size += image.storage.uncompressed_size or 0
|
||||||
|
|
||||||
|
yield _tar_file_header(synthetic_image_id + '/layer.tar', estimated_file_size)
|
||||||
|
|
||||||
|
# Yield the contents of the merged layer.
|
||||||
|
yielded_size = 0
|
||||||
|
for entry in layer_merger.get_generator():
|
||||||
|
yield entry
|
||||||
|
yielded_size += len(entry)
|
||||||
|
|
||||||
|
# If the yielded size is less than the estimated size (which is likely), fill the rest with
|
||||||
|
# zeros.
|
||||||
|
if yielded_size < estimated_file_size:
|
||||||
|
yield '\0' * (estimated_file_size - yielded_size)
|
||||||
|
|
||||||
|
# Yield any file padding to 512 bytes that is necessary.
|
||||||
|
yield _tar_file_padding(estimated_file_size)
|
||||||
|
|
||||||
|
# Last two records are empty in TAR spec.
|
||||||
|
yield '\0' * 512
|
||||||
|
yield '\0' * 512
|
||||||
|
|
||||||
|
|
||||||
|
def _build_layer_json(layer_json, synthetic_image_id):
|
||||||
|
updated_json = copy.deepcopy(layer_json)
|
||||||
|
updated_json['id'] = synthetic_image_id
|
||||||
|
|
||||||
|
if 'parent' in updated_json:
|
||||||
|
del updated_json['parent']
|
||||||
|
|
||||||
|
if 'config' in updated_json and 'Image' in updated_json['config']:
|
||||||
|
updated_json['config']['Image'] = synthetic_image_id
|
||||||
|
|
||||||
|
if 'container_config' in updated_json and 'Image' in updated_json['container_config']:
|
||||||
|
updated_json['container_config']['Image'] = synthetic_image_id
|
||||||
|
|
||||||
|
return updated_json
|
||||||
|
|
||||||
|
|
||||||
|
def _tar_file(name, contents):
|
||||||
|
length = len(contents)
|
||||||
|
tar_data = _tar_file_header(name, length)
|
||||||
|
tar_data += contents
|
||||||
|
tar_data += _tar_file_padding(length)
|
||||||
|
return tar_data
|
||||||
|
|
||||||
|
|
||||||
|
def _tar_file_padding(length):
|
||||||
|
if length % 512 != 0:
|
||||||
|
return '\0' * (512 - (length % 512))
|
||||||
|
|
||||||
|
|
||||||
|
def _tar_file_header(name, file_size):
|
||||||
|
info = tarfile.TarInfo(name=name)
|
||||||
|
info.type = tarfile.REGTYPE
|
||||||
|
info.size = file_size
|
||||||
|
return info.tobuf()
|
||||||
|
|
||||||
|
|
||||||
|
def _tar_folder(name):
|
||||||
|
info = tarfile.TarInfo(name=name)
|
||||||
|
info.type = tarfile.DIRTYPE
|
||||||
|
return info.tobuf()
|
|
@ -32,30 +32,30 @@ class StreamLayerMerger(object):
|
||||||
chunk_size = 1024 * 1024 * 9
|
chunk_size = 1024 * 1024 * 9
|
||||||
|
|
||||||
for tar_info in tar_file:
|
for tar_info in tar_file:
|
||||||
result = self.process_tar_info(tar_info)
|
if not self.check_tar_info(tar_info):
|
||||||
if not result:
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
(tarinfo, filebuf) = result
|
# Yield the tar header.
|
||||||
|
yield tar_info.tobuf()
|
||||||
|
|
||||||
yield tarinfo.tobuf()
|
# Try to extract any file contents for the tar. If found, we yield them as well.
|
||||||
|
if tar_info.isreg():
|
||||||
|
file_stream = tar_file.extractfile(tar_info)
|
||||||
|
if file_stream is not None:
|
||||||
|
length = 0
|
||||||
|
while True:
|
||||||
|
current_block = file_stream.read(chunk_size)
|
||||||
|
if not len(current_block):
|
||||||
|
break
|
||||||
|
|
||||||
if filebuf:
|
yield current_block
|
||||||
length = 0
|
length += len(current_block)
|
||||||
file_stream = tar_file.extractfile(tarinfo)
|
|
||||||
while True:
|
|
||||||
current_block = file_stream.read(chunk_size)
|
|
||||||
if not len(current_block):
|
|
||||||
break
|
|
||||||
|
|
||||||
yield current_block
|
file_stream.close()
|
||||||
length += len(current_block)
|
|
||||||
|
|
||||||
file_stream.close()
|
# Files must be padding to 512 byte multiples.
|
||||||
|
if length % 512 != 0:
|
||||||
# Files must be padding to 512 byte multiples.
|
yield '\0' * (512 - (length % 512))
|
||||||
if length % 512 != 0:
|
|
||||||
yield '\0' * (512 - (length % 512))
|
|
||||||
|
|
||||||
# Close the layer stream now that we're done with it.
|
# Close the layer stream now that we're done with it.
|
||||||
tar_file.close()
|
tar_file.close()
|
||||||
|
@ -68,24 +68,24 @@ class StreamLayerMerger(object):
|
||||||
yield '\0' * 512
|
yield '\0' * 512
|
||||||
|
|
||||||
|
|
||||||
def process_tar_info(self, tar_info):
|
def check_tar_info(self, tar_info):
|
||||||
absolute = os.path.relpath(tar_info.name.decode('utf-8'), './')
|
absolute = os.path.relpath(tar_info.name.decode('utf-8'), './')
|
||||||
|
|
||||||
# Skip metadata.
|
# Skip metadata.
|
||||||
if is_aufs_metadata(absolute):
|
if is_aufs_metadata(absolute):
|
||||||
return None
|
return False
|
||||||
|
|
||||||
# Add any prefix of deleted paths to the prefix list.
|
# Add any prefix of deleted paths to the prefix list.
|
||||||
deleted_prefix = get_deleted_prefix(absolute)
|
deleted_prefix = get_deleted_prefix(absolute)
|
||||||
if deleted_prefix is not None:
|
if deleted_prefix is not None:
|
||||||
self.encountered.append(deleted_prefix)
|
self.encountered.append(deleted_prefix)
|
||||||
return None
|
return False
|
||||||
|
|
||||||
# Check if this file has already been encountered somewhere. If so,
|
# Check if this file has already been encountered somewhere. If so,
|
||||||
# skip it.
|
# skip it.
|
||||||
if unicode(absolute) in self.trie:
|
if unicode(absolute) in self.trie:
|
||||||
return None
|
return False
|
||||||
|
|
||||||
# Otherwise, add the path to the encountered list and return it.
|
# Otherwise, add the path to the encountered list and return it.
|
||||||
self.encountered.append(absolute)
|
self.encountered.append(absolute)
|
||||||
return (tar_info, tar_info.isfile() or tar_info.isdev())
|
return True
|
||||||
|
|
Reference in a new issue