From e3c52fa0eb606bfc2e7b46ddf90906d4ce09c6d8 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 16 Sep 2014 00:18:57 -0400 Subject: [PATCH] Work in progress. This is currently broken! --- application.py | 2 + data/database.py | 1 + endpoints/registry.py | 1 + endpoints/verbs.py | 75 +++++++++++++++++++++ storage/distributedstorage.py | 1 + util/dockerimportformat.py | 123 ++++++++++++++++++++++++++++++++++ util/gzipwrap.py | 42 ++++++++++++ util/streamlayerformat.py | 99 +++++++++++++++++++++++++++ 8 files changed, 344 insertions(+) create mode 100644 endpoints/verbs.py create mode 100644 util/dockerimportformat.py create mode 100644 util/gzipwrap.py create mode 100644 util/streamlayerformat.py diff --git a/application.py b/application.py index 2fb79835b..b10aea363 100644 --- a/application.py +++ b/application.py @@ -17,6 +17,7 @@ from endpoints.index import index from endpoints.web import web from endpoints.tags import tags from endpoints.registry import registry +from endpoints.verbs import verbs from endpoints.webhooks import webhooks from endpoints.realtime import realtime from endpoints.callbacks import callback @@ -43,6 +44,7 @@ application.register_blueprint(callback, url_prefix='/oauth2') application.register_blueprint(index, url_prefix='/v1') application.register_blueprint(tags, url_prefix='/v1') application.register_blueprint(registry, url_prefix='/v1') +application.register_blueprint(verbs, url_prefix='/v1/repositories') application.register_blueprint(api_bp, url_prefix='/api') application.register_blueprint(webhooks, url_prefix='/webhooks') application.register_blueprint(realtime, url_prefix='/realtime') diff --git a/data/database.py b/data/database.py index 96e85a7d2..45e45b057 100644 --- a/data/database.py +++ b/data/database.py @@ -234,6 +234,7 @@ class ImageStorage(BaseModel): comment = TextField(null=True) command = TextField(null=True) image_size = BigIntegerField(null=True) + uncompressed_size = BigIntegerField(null=True) uploading = BooleanField(default=True, null=True) diff --git a/endpoints/registry.py b/endpoints/registry.py index 94719905a..4713ddd75 100644 --- a/endpoints/registry.py +++ b/endpoints/registry.py @@ -453,6 +453,7 @@ def put_image_json(namespace, repository, image_id): # We cleanup any old checksum in case it's a retry after a fail profile.debug('Cleanup old checksum') + repo_image.storage.uncompressed_size = data.get('Size') repo_image.storage.checksum = None repo_image.storage.save() diff --git a/endpoints/verbs.py b/endpoints/verbs.py new file mode 100644 index 000000000..d257898df --- /dev/null +++ b/endpoints/verbs.py @@ -0,0 +1,75 @@ +import logging +import json +import hashlib + +from flask import (make_response, request, session, Response, redirect, + Blueprint, abort, send_file, make_response) + +from app import storage as store, app +from auth.auth import process_auth +from auth.permissions import ReadRepositoryPermission +from data import model +from endpoints.registry import set_cache_headers + +from util.dockerimportformat import build_docker_import_stream + +from werkzeug.wsgi import wrap_file + +verbs = Blueprint('verbs', __name__) +logger = logging.getLogger(__name__) + + +@verbs.route('////squash', methods=['GET']) +@process_auth +@set_cache_headers +def get_squashed_tag(namespace, repository, tag, headers): + permission = ReadRepositoryPermission(namespace, repository) + if permission.can() or model.repository_is_public(namespace, repository): + # Lookup the requested tag. + tag_image = model.get_tag_image(namespace, repository, tag) + if not tag_image: + abort(404) + + # Lookup the tag's image and storage. + repo_image = model.get_repo_image(namespace, repository, tag_image.docker_image_id) + if not repo_image: + abort(404) + + # Calculate a synthetic image ID by hashing the *image storage ID* with our + # secret. This is done to prevent the ID being guessable/overwritable by + # external pushes. + unhashed = str(repo_image.storage.id) + ':' + app.config['SECRET_KEY'] + synthetic_image_id = hashlib.sha256(unhashed).hexdigest() + + # Load the ancestry for the image. + uuid = repo_image.storage.uuid + ancestry_data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid)) + full_image_list = json.loads(ancestry_data) + + # Load the JSON for the image. + json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid)) + layer_json = json.loads(json_data) + + def get_next_image(): + for current_image_id in full_image_list: + yield model.get_repo_image(namespace, repository, current_image_id) + + def get_next_layer(): + for current_image_id in full_image_list: + current_image_entry = model.get_repo_image(namespace, repository, current_image_id) + current_image_path = store.image_layer_path(current_image_entry.storage.uuid) + current_image_stream = store.stream_read_file(current_image_entry.storage.locations, + current_image_path) + + logger.debug('Returning image layer %s: %s' % (current_image_id, current_image_path)) + yield current_image_stream + + stream = build_docker_import_stream(namespace, repository, tag, synthetic_image_id, + layer_json, get_next_image, get_next_layer) + + return app.response_class(wrap_file(request.environ, stream, 1024 * 16), + mimetype='application/octet-stream', + direct_passthrough=True) + + + abort(403) diff --git a/storage/distributedstorage.py b/storage/distributedstorage.py index 1544d9725..d13362d55 100644 --- a/storage/distributedstorage.py +++ b/storage/distributedstorage.py @@ -18,6 +18,7 @@ def _location_aware(unbound_func): storage = self._storages[preferred] if not storage: + print locations storage = self._storages[random.sample(locations, 1)[0]] storage_func = getattr(storage, unbound_func.__name__) diff --git a/util/dockerimportformat.py b/util/dockerimportformat.py new file mode 100644 index 000000000..3ba159736 --- /dev/null +++ b/util/dockerimportformat.py @@ -0,0 +1,123 @@ +from util.gzipwrap import GzipWrap +from util.streamlayerformat import StreamLayerMerger + +import copy +import json +import tarfile + +from itertools import chain, islice +class some_magic_adaptor(object): + def __init__(self, src): + self.src = chain.from_iterable(src) + def read(self, n): + return "".join(islice(self.src, None, n)) + +def build_docker_import_stream(namespace, repository, tag, synthetic_image_id, + layer_json, get_image_iterator, get_layer_iterator): + """ Builds and streams a synthetic .tar.gz that represents a squashed version + of the given layers, in `docker import` V1 format. + """ + return some_magic_adaptor(_import_format_generator(namespace, repository, tag, + synthetic_image_id, layer_json, + get_image_iterator, get_layer_iterator)) + + +def _import_format_generator(namespace, repository, tag, synthetic_image_id, + layer_json, get_image_iterator, get_layer_iterator): + + # Docker import V1 Format (.tar): + # repositories - JSON file containing a repo -> tag -> image map + # {image ID folder}: + # json - The layer JSON + # layer.tar - The TARed contents of the layer + # VERSION - The docker import version: '1.0' + layer_merger = StreamLayerMerger(get_layer_iterator) + + # Yield the repositories file: + synthetic_layer_info = {} + synthetic_layer_info[tag + '.squash'] = synthetic_image_id + + repositories = {} + repositories[namespace + '/' + repository] = synthetic_layer_info + + yield _tar_file('repositories', json.dumps(repositories)) + + # Yield the image ID folder. + yield _tar_folder(synthetic_image_id) + + # Yield the JSON layer data. + layer_json = _build_layer_json(layer_json, synthetic_image_id) + yield _tar_file(synthetic_image_id + '/json', json.dumps(layer_json)) + + # Yield the VERSION file. + yield _tar_file(synthetic_image_id + '/VERSION', '1.0') + + # Yield the merged layer data's header. + estimated_file_size = 0 + for image in get_image_iterator(): + estimated_file_size += image.storage.uncompressed_size or 0 + + yield _tar_file_header(synthetic_image_id + '/layer.tar', estimated_file_size) + + # Yield the contents of the merged layer. + yielded_size = 0 + for entry in layer_merger.get_generator(): + yield entry + yielded_size += len(entry) + + # If the yielded size is less than the estimated size (which is likely), fill the rest with + # zeros. + if yielded_size < estimated_file_size: + yield '\0' * (estimated_file_size - yielded_size) + + print estimated_file_size + print yielded_size + + # Yield any file padding to 512 bytes that is necessary. + yield _tar_file_padding(estimated_file_size) + + # Last two records are empty in TAR spec. + yield '\0' * 512 + yield '\0' * 512 + + +def _build_layer_json(layer_json, synthetic_image_id): + updated_json = copy.deepcopy(layer_json) + updated_json['id'] = synthetic_image_id + + if 'parent' in updated_json: + del updated_json['parent'] + + if 'config' in updated_json and 'Image' in updated_json['config']: + updated_json['config']['Image'] = synthetic_image_id + + if 'container_config' in updated_json and 'Image' in updated_json['container_config']: + updated_json['container_config']['Image'] = synthetic_image_id + + return updated_json + + +def _tar_file(name, contents): + length = len(contents) + tar_data = _tar_file_header(name, length) + tar_data += contents + tar_data += _tar_file_padding(length) + return tar_data + + +def _tar_file_padding(length): + if length % 512 != 0: + return '\0' * (512 - (length % 512)) + + +def _tar_file_header(name, file_size): + info = tarfile.TarInfo(name=name) + info.type = tarfile.REGTYPE + info.size = file_size + return info.tobuf() + + +def _tar_folder(name): + info = tarfile.TarInfo(name=name) + info.type = tarfile.DIRTYPE + return info.tobuf() diff --git a/util/gzipwrap.py b/util/gzipwrap.py new file mode 100644 index 000000000..02be6ae18 --- /dev/null +++ b/util/gzipwrap.py @@ -0,0 +1,42 @@ +from gzip import GzipFile + +class GzipWrap(object): + def __init__(self, input, filename=None, compresslevel=1): + self.input = iter(input) + self.buffer = '' + self.zipper = GzipFile(filename, mode='wb', fileobj=self, compresslevel=compresslevel) + + def read(self, size=-1): + # If the buffer already has enough bytes, then simply pop them off of + # the beginning and return them. + if len(self.buffer) >= size: + ret = self.buffer[0:size] + self.buffer = self.buffer[size:] + return ret + + # Otherwise, zip the input until we have enough bytes. + while True: + # Attempt to retrieve the next bytes to write. + is_done = False + try: + s = self.input.next() + self.zipper.write(s) + except StopIteration: + is_done = True + + if len(self.buffer) < size or is_done: + self.zipper.flush() + + if len(self.buffer) >= size or is_done: + ret = self.buffer[0:size] + self.buffer = self.buffer[size:] + return ret + + def flush(self): + pass + + def write(self, data): + self.buffer += data + + def close(self): + self.input.close() diff --git a/util/streamlayerformat.py b/util/streamlayerformat.py new file mode 100644 index 000000000..c197763f1 --- /dev/null +++ b/util/streamlayerformat.py @@ -0,0 +1,99 @@ +import marisa_trie +import os +import tarfile +import StringIO +import traceback + +AUFS_METADATA = u'.wh..wh.' + +AUFS_WHITEOUT = u'.wh.' +AUFS_WHITEOUT_PREFIX_LENGTH = len(AUFS_WHITEOUT) + +class StreamLayerMerger(object): + """ Class which creates a generator of the combined TAR data for a set of Docker layers. """ + def __init__(self, layer_iterator): + self.trie = marisa_trie.Trie() + self.layer_iterator = layer_iterator + self.encountered = [] + + def get_generator(self): + for current_layer in self.layer_iterator(): + # Read the current layer as TAR. If it is empty, we just continue + # to the next layer. + try: + tar_file = tarfile.open(mode='r|*', fileobj=current_layer) + except tarfile.ReadError as re: + continue + + # For each of the tar entries, yield them IF and ONLY IF we have not + # encountered the path before. + + # 9MB (+ padding below) so that it matches the 10MB expected by Gzip. + chunk_size = 1024 * 1024 * 9 + + for tar_info in tar_file: + result = self.process_tar_info(tar_info) + if not result: + continue + + (tarinfo, filebuf) = result + + yield tarinfo.tobuf() + + if filebuf: + length = 0 + file_stream = tar_file.extractfile(tarinfo) + while True: + current_block = file_stream.read(chunk_size) + if not len(current_block): + break + + yield current_block + length += len(current_block) + + file_stream.close() + + # Files must be padding to 512 byte multiples. + if length % 512 != 0: + yield '\0' * (512 - (length % 512)) + + # Close the layer stream now that we're done with it. + tar_file.close() + + # Update the trie with the new encountered entries. + self.trie = marisa_trie.Trie(self.encountered) + + # Last two records are empty in TAR spec. + yield '\0' * 512 + yield '\0' * 512 + + + def process_tar_info(self, tar_info): + absolute = os.path.relpath(tar_info.name.decode('utf-8'), './') + dirname = os.path.dirname(absolute) + filename = os.path.basename(absolute) + + # Skip directories and metadata + if (filename.startswith(AUFS_METADATA) or + absolute.startswith(AUFS_METADATA)): + # Skip + return None + + elif filename.startswith(AUFS_WHITEOUT): + removed_filename = filename[AUFS_WHITEOUT_PREFIX_LENGTH:] + removed_prefix = os.path.join('/', dirname, removed_filename) + self.encountered.append(removed_prefix) + return None + + # Check if this file has already been encountered somewhere. If so, + # skip it. + if unicode(absolute) in self.trie: + return None + + self.encountered.append(absolute) + + if tar_info.isdir() or tar_info.issym() or tar_info.islnk(): + return (tar_info, False) + + elif tar_info.isfile(): + return (tar_info, True)