Work in progress: Docker -> ACI conversion

This commit is contained in:
Joseph Schorr 2015-01-13 17:46:11 -05:00
parent df9a417207
commit 6ed28930b2
13 changed files with 424 additions and 162 deletions

View file

@ -12,4 +12,4 @@ import registry
if __name__ == '__main__':
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
application.run(port=5000, debug=True, threaded=True, host='0.0.0.0')
application.run(port=80, debug=True, threaded=True, host='0.0.0.0')

View file

@ -1,4 +1,4 @@
bind = '0.0.0.0:5000'
bind = '0.0.0.0:80'
workers = 2
worker_class = 'gevent'
timeout = 2000

View file

@ -16,12 +16,15 @@ from storage import Storage
from util.queuefile import QueueFile
from util.queueprocess import QueueProcess
from util.gzipwrap import GzipWrap
from util.dockerloadformat import build_docker_load_stream
from formats.squashed import SquashedDockerImage
from formats.aci import ACIImage
verbs = Blueprint('verbs', __name__)
logger = logging.getLogger(__name__)
def _open_stream(namespace, repository, tag, synthetic_image_id, image_json, image_id_list):
def _open_stream(formatter, namespace, repository, tag, synthetic_image_id, image_json,
image_id_list):
store = Storage(app)
# For performance reasons, we load the full image list here, cache it, then disconnect from
@ -45,17 +48,17 @@ def _open_stream(namespace, repository, tag, synthetic_image_id, image_json, ima
logger.debug('Returning image layer %s: %s' % (current_image_id, current_image_path))
yield current_image_stream
stream = build_docker_load_stream(namespace, repository, tag, synthetic_image_id, image_json,
stream = formatter.build_stream(namespace, repository, tag, synthetic_image_id, image_json,
get_next_image, get_next_layer)
return stream.read
def _write_synthetic_image_to_storage(linked_storage_uuid, linked_locations, queue_file):
def _write_synthetic_image_to_storage(verb, linked_storage_uuid, linked_locations, queue_file):
store = Storage(app)
def handle_exception(ex):
logger.debug('Exception when building squashed image %s: %s', linked_storage_uuid, ex)
logger.debug('Exception when building %s image %s: %s', verb, linked_storage_uuid, ex)
with database.UseThenDisconnect(app.config):
model.delete_derived_storage_by_uuid(linked_storage_uuid)
@ -73,11 +76,13 @@ def _write_synthetic_image_to_storage(linked_storage_uuid, linked_locations, que
done_uploading.save()
@verbs.route('/squash/<namespace>/<repository>/<tag>', methods=['GET'])
@process_auth
def get_squashed_tag(namespace, repository, tag):
def _repo_verb(namespace, repository, tag, verb, formatter, checker=None, **kwargs):
permission = ReadRepositoryPermission(namespace, repository)
if permission.can() or model.repository_is_public(namespace, repository):
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# TODO: renable auth!
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
if True or permission.can() or model.repository_is_public(namespace, repository):
# Lookup the requested tag.
try:
tag_image = model.get_tag_image(namespace, repository, tag)
@ -89,38 +94,54 @@ def get_squashed_tag(namespace, repository, tag):
if not repo_image:
abort(404)
# Log the action.
track_and_log('repo_verb', repo_image.repository, tag=tag, verb='squash')
# If there is a data checker, call it first.
store = Storage(app)
derived = model.find_or_create_derived_storage(repo_image.storage, 'squash',
uuid = repo_image.storage.uuid
image_json = None
if checker is not None:
image_json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid))
image_json = json.loads(image_json_data)
if not checker(image_json):
logger.debug('Check mismatch on %s/%s:%s, verb %s', namespace, repository, tag, verb)
abort(404)
# Log the action.
track_and_log('repo_verb', repo_image.repository, tag=tag, verb=verb, **kwargs)
derived = model.find_or_create_derived_storage(repo_image.storage, verb,
store.preferred_locations[0])
if not derived.uploading:
logger.debug('Derived image %s exists in storage', derived.uuid)
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# TODO: renable caching!
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
if False and not derived.uploading:
logger.debug('Derived %s image %s exists in storage', verb, derived.uuid)
derived_layer_path = store.image_layer_path(derived.uuid)
download_url = store.get_direct_download_url(derived.locations, derived_layer_path)
if download_url:
logger.debug('Redirecting to download URL for derived image %s', derived.uuid)
logger.debug('Redirecting to download URL for derived %s image %s', verb, derived.uuid)
return redirect(download_url)
# Close the database handle here for this process before we send the long download.
database.close_db_filter(None)
logger.debug('Sending cached derived image %s', derived.uuid)
logger.debug('Sending cached derived %s image %s', verb, derived.uuid)
return send_file(store.stream_read_file(derived.locations, derived_layer_path))
# Load the ancestry for the image.
logger.debug('Building and returning derived image %s', derived.uuid)
uuid = repo_image.storage.uuid
logger.debug('Building and returning derived %s image %s', verb, derived.uuid)
ancestry_data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid))
full_image_list = json.loads(ancestry_data)
# Load the image's JSON layer.
image_json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid))
image_json = json.loads(image_json_data)
if not image_json:
image_json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid))
image_json = json.loads(image_json_data)
# Calculate a synthetic image ID.
synthetic_image_id = hashlib.sha256(tag_image.docker_image_id + ':squash').hexdigest()
synthetic_image_id = hashlib.sha256(tag_image.docker_image_id + ':' + verb).hexdigest()
# Create a queue process to generate the data. The queue files will read from the process
# and send the results to the client and storage.
@ -128,7 +149,7 @@ def get_squashed_tag(namespace, repository, tag):
# Close any existing DB connection once the process has exited.
database.close_db_filter(None)
args = (namespace, repository, tag, synthetic_image_id, image_json, full_image_list)
args = (formatter, namespace, repository, tag, synthetic_image_id, image_json, full_image_list)
queue_process = QueueProcess(_open_stream,
8 * 1024, 10 * 1024 * 1024, # 8K/10M chunk/max
args, finished=_cleanup)
@ -140,7 +161,7 @@ def get_squashed_tag(namespace, repository, tag):
queue_process.run()
# Start the storage saving.
storage_args = (derived.uuid, derived.locations, storage_queue_file)
storage_args = (verb, derived.uuid, derived.locations, storage_queue_file)
QueueProcess.run_process(_write_synthetic_image_to_storage, storage_args, finished=_cleanup)
# Close the database handle here for this process before we send the long download.
@ -150,3 +171,29 @@ def get_squashed_tag(namespace, repository, tag):
return send_file(client_queue_file)
abort(403)
@verbs.route('/aci/<server>/<namespace>/<repository>/<tag>/aci/<os>/<arch>/', methods=['GET'])
@process_auth
def get_rocket_image(server, namespace, repository, tag, os, arch):
def checker(image_json):
# Verify the architecture and os.
operating_system = image_json.get('os', 'linux')
if operating_system != os:
return False
architecture = image_json.get('architecture', 'amd64')
if architecture != arch:
return False
return True
return _repo_verb(namespace, repository, tag, 'aci', ACIImage(),
checker=checker, os=os, arch=arch)
@verbs.route('/squash/<namespace>/<repository>/<tag>', methods=['GET'])
@process_auth
def get_squashed_tag(namespace, repository, tag):
return _repo_verb(namespace, repository, tag, 'squash', SquashedDockerImage())

View file

@ -140,6 +140,11 @@ def repository(path):
return index('')
@web.route('/<namespace>/<repository>', methods=['GET'])
@no_cache
def repository_test(namespace, repository):
return index('')
@web.route('/security/')
@no_cache
def security():

0
formats/__init__.py Normal file
View file

185
formats/aci.py Normal file
View file

@ -0,0 +1,185 @@
from app import app
from util.streamlayerformat import StreamLayerMerger
from formats.tarimageformatter import TarImageFormatter
import json
class ACIImage(TarImageFormatter):
""" Image formatter which produces an ACI-compatible TAR.
"""
def stream_generator(self, namespace, repository, tag, synthetic_image_id,
layer_json, get_image_iterator, get_layer_iterator):
# ACI Format (.tar):
# manifest - The JSON manifest
# rootfs - The root file system
# Yield the manifest.
yield self.tar_file('manifest', self._build_manifest(namespace, repository, tag, layer_json,
synthetic_image_id))
# Yield the merged layer dtaa.
yield self.tar_folder('rootfs')
layer_merger = StreamLayerMerger(get_layer_iterator, path_prefix='rootfs/')
for entry in layer_merger.get_generator():
yield entry
def _build_isolators(self, docker_config):
""" Builds ACI isolator config from the docker config. """
def _isolate_memory(memory):
return {
"name": "memory/limit",
"value": str(memory) + 'B'
}
def _isolate_swap(memory):
return {
"name": "memory/swap",
"value": str(memory) + 'B'
}
def _isolate_cpu(cpu):
return {
"name": "cpu/shares",
"value": str(cpu)
}
def _isolate_capabilities(capabilities_set):
return {
"name": "capabilities/bounding-set",
"value": str(capabilities_set)
}
mappers = {
'Memory': _isolate_memory,
'MemorySwap': _isolate_swap,
'CpuShares': _isolate_cpu,
'Cpuset': _isolate_capabilities
}
isolators = []
for config_key in mappers:
value = docker_config.get(config_key)
if value:
isolators.append(mappers[config_key](value))
return isolators
def _build_ports(self, docker_config):
""" Builds the ports definitions for the ACI. """
ports = []
for docker_port_definition in docker_config.get('ports', {}):
# Formats:
# port/tcp
# port/udp
# port
protocol = 'tcp'
port_number = -1
if '/' in docker_port_definition:
(port_number, protocol) = docker_port_definition.split('/')
else:
port_number = docker_port_definition
try:
port_number = int(port_number)
ports.append({
"name": "port-%s" % port_number,
"port": port_number,
"protocol": protocol
})
except ValueError:
pass
return ports
def _build_volumes(self, docker_config):
""" Builds the volumes definitions for the ACI. """
volumes = []
names = set()
def get_name(docker_volume_path):
parts = docker_volume_path.split('/')
name = ''
while True:
name = name + parts[-1]
parts = parts[0:-1]
if names.add(name):
break
name = '/' + name
return name
for docker_volume_path in docker_config.get('volumes', {}):
volumes.append({
"name": get_name(docker_volume_path),
"path": docker_volume_path,
"readOnly": False
})
return volumes
def _build_manifest(self, namespace, repository, tag, docker_layer_data, synthetic_image_id):
""" Builds an ACI manifest from the docker layer data. """
config = docker_layer_data.get('config', {})
config.update(docker_layer_data.get('container_config', {}))
source_url = "%s://%s/%s/%s:%s" % (app.config['PREFERRED_URL_SCHEME'],
app.config['SERVER_HOSTNAME'],
namespace, repository, tag)
exec_path = config.get('Cmd', [])
if exec_path:
if not exec_path[0].startswith('/'):
exec_path[0] = '/bin/' + exec_path[0]
# TODO: ACI doesn't support : in the name, so remove any ports.
hostname = app.config['SERVER_HOSTNAME']
hostname = hostname.split(':', 1)[0]
manifest = {
"acKind": "ImageManifest",
"acVersion": "0.1.1",
"name": '%s/%s/%s/%s' % (hostname, namespace, repository, tag),
"labels": [
{
"name": "version",
"value": "1.0.0"
},
{
"name": "arch",
"value": docker_layer_data.get('architecture', 'amd64')
},
{
"name": "os",
"value": docker_layer_data.get('os', 'linux')
}
],
"app": {
"exec": exec_path,
"user": config.get('User', '') or 'root',
"group": config.get('Group', '') or 'root',
"eventHandlers": [],
"workingDirectory": config.get('WorkingDir', ''),
"environment": {key:value for (key, value) in [e.split('=') for e in config.get('Env')]},
"isolators": self._build_isolators(config),
"mountPoints": self._build_volumes(config),
"ports": self._build_ports(config),
"annotations": [
{"name": "created", "value": docker_layer_data.get('created', '')},
{"name": "homepage", "value": source_url},
{"name": "quay.io/derived_image", "value": synthetic_image_id},
]
},
}
return json.dumps(manifest)

102
formats/squashed.py Normal file
View file

@ -0,0 +1,102 @@
from app import app
from util.gzipwrap import GZIP_BUFFER_SIZE
from util.streamlayerformat import StreamLayerMerger
from formats.tarimageformatter import TarImageFormatter
import copy
import json
import tarfile
class FileEstimationException(Exception):
""" Exception raised by build_docker_load_stream if the estimated size of the layer TAR
was lower than the actual size. This means the sent TAR header is wrong, and we have
to fail.
"""
pass
class SquashedDockerImage(TarImageFormatter):
""" Image formatter which produces a squashed image compatible with the `docker load`
command.
"""
def stream_generator(self, namespace, repository, tag, synthetic_image_id,
layer_json, get_image_iterator, get_layer_iterator):
# Docker import V1 Format (.tar):
# repositories - JSON file containing a repo -> tag -> image map
# {image ID folder}:
# json - The layer JSON
# layer.tar - The TARed contents of the layer
# VERSION - The docker import version: '1.0'
layer_merger = StreamLayerMerger(get_layer_iterator)
# Yield the repositories file:
synthetic_layer_info = {}
synthetic_layer_info[tag + '.squash'] = synthetic_image_id
hostname = app.config['SERVER_HOSTNAME']
repositories = {}
repositories[hostname + '/' + namespace + '/' + repository] = synthetic_layer_info
yield self.tar_file('repositories', json.dumps(repositories))
# Yield the image ID folder.
yield self.tar_folder(synthetic_image_id)
# Yield the JSON layer data.
layer_json = self._build_layer_json(layer_json, synthetic_image_id)
yield self.tar_file(synthetic_image_id + '/json', json.dumps(layer_json))
# Yield the VERSION file.
yield self.tar_file(synthetic_image_id + '/VERSION', '1.0')
# Yield the merged layer data's header.
estimated_file_size = 0
for image in get_image_iterator():
estimated_file_size += image.storage.uncompressed_size
yield self.tar_file_header(synthetic_image_id + '/layer.tar', estimated_file_size)
# Yield the contents of the merged layer.
yielded_size = 0
for entry in layer_merger.get_generator():
yield entry
yielded_size += len(entry)
# If the yielded size is more than the estimated size (which is unlikely but possible), then
# raise an exception since the tar header will be wrong.
if yielded_size > estimated_file_size:
raise FileEstimationException()
# If the yielded size is less than the estimated size (which is likely), fill the rest with
# zeros.
if yielded_size < estimated_file_size:
to_yield = estimated_file_size - yielded_size
while to_yield > 0:
yielded = min(to_yield, GZIP_BUFFER_SIZE)
yield '\0' * yielded
to_yield -= yielded
# Yield any file padding to 512 bytes that is necessary.
yield self.tar_file_padding(estimated_file_size)
# Last two records are empty in TAR spec.
yield '\0' * 512
yield '\0' * 512
def _build_layer_json(self, layer_json, synthetic_image_id):
updated_json = copy.deepcopy(layer_json)
updated_json['id'] = synthetic_image_id
if 'parent' in updated_json:
del updated_json['parent']
if 'config' in updated_json and 'Image' in updated_json['config']:
updated_json['config']['Image'] = synthetic_image_id
if 'container_config' in updated_json and 'Image' in updated_json['container_config']:
updated_json['container_config']['Image'] = synthetic_image_id
return updated_json

View file

@ -0,0 +1,46 @@
import tarfile
from util.gzipwrap import GzipWrap
class TarImageFormatter(object):
""" Base class for classes which produce a TAR containing image and layer data. """
def build_stream(self, namespace, repository, tag, synthetic_image_id, layer_json,
get_image_iterator, get_layer_iterator):
""" Builds and streams a synthetic .tar.gz that represents the formatted TAR created by this
class's implementation.
"""
return GzipWrap(self.stream_generator(namespace, repository, tag,
synthetic_image_id, layer_json,
get_image_iterator, get_layer_iterator))
def stream_generator(self, namespace, repository, tag, synthetic_image_id,
layer_json, get_image_iterator, get_layer_iterator):
raise NotImplementedError
def tar_file(self, name, contents):
""" Returns the TAR binary representation for a file with the given name and file contents. """
length = len(contents)
tar_data = self.tar_file_header(name, length)
tar_data += contents
tar_data += self.tar_file_padding(length)
return tar_data
def tar_file_padding(self, length):
""" Returns TAR file padding for file data of the given length. """
if length % 512 != 0:
return '\0' * (512 - (length % 512))
return ''
def tar_file_header(self, name, file_size):
""" Returns TAR file header data for a file with the given name and size. """
info = tarfile.TarInfo(name=name)
info.type = tarfile.REGTYPE
info.size = file_size
return info.tobuf()
def tar_folder(self, name):
""" Returns TAR file header data for a folder with the given name. """
info = tarfile.TarInfo(name=name)
info.type = tarfile.DIRTYPE
return info.tobuf()

View file

@ -255,6 +255,7 @@ def initialize_database():
ImageStorageLocation.create(name='local_us')
ImageStorageTransformation.create(name='squash')
ImageStorageTransformation.create(name='aci')
# NOTE: These MUST be copied over to NotificationKind, since every external
# notification can also generate a Quay.io notification.

View file

@ -10,6 +10,8 @@
<meta id="descriptionTag" name="description" content="Hosted private docker repositories. Includes full user management and history. Free for public repositories."></meta>
<meta name="google-site-verification" content="GalDznToijTsHYmLjJvE4QaB9uk_IP16aaGDz5D75T4" />
<meta name="fragment" content="!" />
<meta name="ac-discovery" content="10.0.2.2 http://10.0.2.2/c1/aci/{name}/{version}/{ext}/{os}/{arch}/">
{% endblock %}
{% block body_content %}

View file

@ -1,132 +0,0 @@
from util.gzipwrap import GzipWrap, GZIP_BUFFER_SIZE
from util.streamlayerformat import StreamLayerMerger
from app import app
import copy
import json
import tarfile
class FileEstimationException(Exception):
""" Exception raised by build_docker_load_stream if the estimated size of the layer TAR
was lower than the actual size. This means the sent TAR header is wrong, and we have
to fail.
"""
pass
def build_docker_load_stream(namespace, repository, tag, synthetic_image_id,
layer_json, get_image_iterator, get_layer_iterator):
""" Builds and streams a synthetic .tar.gz that represents a squashed version
of the given layers, in `docker load` V1 format.
"""
return GzipWrap(_import_format_generator(namespace, repository, tag,
synthetic_image_id, layer_json,
get_image_iterator, get_layer_iterator))
def _import_format_generator(namespace, repository, tag, synthetic_image_id,
layer_json, get_image_iterator, get_layer_iterator):
# Docker import V1 Format (.tar):
# repositories - JSON file containing a repo -> tag -> image map
# {image ID folder}:
# json - The layer JSON
# layer.tar - The TARed contents of the layer
# VERSION - The docker import version: '1.0'
layer_merger = StreamLayerMerger(get_layer_iterator)
# Yield the repositories file:
synthetic_layer_info = {}
synthetic_layer_info[tag + '.squash'] = synthetic_image_id
hostname = app.config['SERVER_HOSTNAME']
repositories = {}
repositories[hostname + '/' + namespace + '/' + repository] = synthetic_layer_info
yield _tar_file('repositories', json.dumps(repositories))
# Yield the image ID folder.
yield _tar_folder(synthetic_image_id)
# Yield the JSON layer data.
layer_json = _build_layer_json(layer_json, synthetic_image_id)
yield _tar_file(synthetic_image_id + '/json', json.dumps(layer_json))
# Yield the VERSION file.
yield _tar_file(synthetic_image_id + '/VERSION', '1.0')
# Yield the merged layer data's header.
estimated_file_size = 0
for image in get_image_iterator():
estimated_file_size += image.storage.uncompressed_size
yield _tar_file_header(synthetic_image_id + '/layer.tar', estimated_file_size)
# Yield the contents of the merged layer.
yielded_size = 0
for entry in layer_merger.get_generator():
yield entry
yielded_size += len(entry)
# If the yielded size is more than the estimated size (which is unlikely but possible), then
# raise an exception since the tar header will be wrong.
if yielded_size > estimated_file_size:
raise FileEstimationException()
# If the yielded size is less than the estimated size (which is likely), fill the rest with
# zeros.
if yielded_size < estimated_file_size:
to_yield = estimated_file_size - yielded_size
while to_yield > 0:
yielded = min(to_yield, GZIP_BUFFER_SIZE)
yield '\0' * yielded
to_yield -= yielded
# Yield any file padding to 512 bytes that is necessary.
yield _tar_file_padding(estimated_file_size)
# Last two records are empty in TAR spec.
yield '\0' * 512
yield '\0' * 512
def _build_layer_json(layer_json, synthetic_image_id):
updated_json = copy.deepcopy(layer_json)
updated_json['id'] = synthetic_image_id
if 'parent' in updated_json:
del updated_json['parent']
if 'config' in updated_json and 'Image' in updated_json['config']:
updated_json['config']['Image'] = synthetic_image_id
if 'container_config' in updated_json and 'Image' in updated_json['container_config']:
updated_json['container_config']['Image'] = synthetic_image_id
return updated_json
def _tar_file(name, contents):
length = len(contents)
tar_data = _tar_file_header(name, length)
tar_data += contents
tar_data += _tar_file_padding(length)
return tar_data
def _tar_file_padding(length):
if length % 512 != 0:
return '\0' * (512 - (length % 512))
return ''
def _tar_file_header(name, file_size):
info = tarfile.TarInfo(name=name)
info.type = tarfile.REGTYPE
info.size = file_size
return info.tobuf()
def _tar_folder(name):
info = tarfile.TarInfo(name=name)
info.type = tarfile.DIRTYPE
return info.tobuf()

View file

@ -11,8 +11,8 @@ AUFS_WHITEOUT_PREFIX_LENGTH = len(AUFS_WHITEOUT)
class StreamLayerMerger(TarLayerFormat):
""" Class which creates a generator of the combined TAR data for a set of Docker layers. """
def __init__(self, layer_iterator):
super(StreamLayerMerger, self).__init__(layer_iterator)
def __init__(self, layer_iterator, path_prefix=None):
super(StreamLayerMerger, self).__init__(layer_iterator, path_prefix)
self.path_trie = marisa_trie.Trie()
self.path_encountered = []

View file

@ -8,8 +8,9 @@ class TarLayerReadException(Exception):
class TarLayerFormat(object):
""" Class which creates a generator of the combined TAR data. """
def __init__(self, tar_iterator):
def __init__(self, tar_iterator, path_prefix=None):
self.tar_iterator = tar_iterator
self.path_prefix = path_prefix
def get_generator(self):
for current_tar in self.tar_iterator():
@ -36,7 +37,12 @@ class TarLayerFormat(object):
continue
# Yield the tar header.
yield tar_info.tobuf()
if self.path_prefix:
clone = tarfile.TarInfo.frombuf(tar_info.tobuf())
clone.name = os.path.join(self.path_prefix, clone.name)
yield clone.tobuf()
else:
yield tar_info.tobuf()
# Try to extract any file contents for the tar. If found, we yield them as well.
if tar_info.isreg():