Work in progress. This is currently broken!

This commit is contained in:
Joseph Schorr 2014-09-16 00:18:57 -04:00
parent 820d5c0476
commit e3c52fa0eb
8 changed files with 344 additions and 0 deletions

View file

@ -17,6 +17,7 @@ from endpoints.index import index
from endpoints.web import web from endpoints.web import web
from endpoints.tags import tags from endpoints.tags import tags
from endpoints.registry import registry from endpoints.registry import registry
from endpoints.verbs import verbs
from endpoints.webhooks import webhooks from endpoints.webhooks import webhooks
from endpoints.realtime import realtime from endpoints.realtime import realtime
from endpoints.callbacks import callback from endpoints.callbacks import callback
@ -43,6 +44,7 @@ application.register_blueprint(callback, url_prefix='/oauth2')
application.register_blueprint(index, url_prefix='/v1') application.register_blueprint(index, url_prefix='/v1')
application.register_blueprint(tags, url_prefix='/v1') application.register_blueprint(tags, url_prefix='/v1')
application.register_blueprint(registry, url_prefix='/v1') application.register_blueprint(registry, url_prefix='/v1')
application.register_blueprint(verbs, url_prefix='/v1/repositories')
application.register_blueprint(api_bp, url_prefix='/api') application.register_blueprint(api_bp, url_prefix='/api')
application.register_blueprint(webhooks, url_prefix='/webhooks') application.register_blueprint(webhooks, url_prefix='/webhooks')
application.register_blueprint(realtime, url_prefix='/realtime') application.register_blueprint(realtime, url_prefix='/realtime')

View file

@ -234,6 +234,7 @@ class ImageStorage(BaseModel):
comment = TextField(null=True) comment = TextField(null=True)
command = TextField(null=True) command = TextField(null=True)
image_size = BigIntegerField(null=True) image_size = BigIntegerField(null=True)
uncompressed_size = BigIntegerField(null=True)
uploading = BooleanField(default=True, null=True) uploading = BooleanField(default=True, null=True)

View file

@ -453,6 +453,7 @@ def put_image_json(namespace, repository, image_id):
# We cleanup any old checksum in case it's a retry after a fail # We cleanup any old checksum in case it's a retry after a fail
profile.debug('Cleanup old checksum') profile.debug('Cleanup old checksum')
repo_image.storage.uncompressed_size = data.get('Size')
repo_image.storage.checksum = None repo_image.storage.checksum = None
repo_image.storage.save() repo_image.storage.save()

75
endpoints/verbs.py Normal file
View file

@ -0,0 +1,75 @@
import logging
import json
import hashlib
from flask import (make_response, request, session, Response, redirect,
Blueprint, abort, send_file, make_response)
from app import storage as store, app
from auth.auth import process_auth
from auth.permissions import ReadRepositoryPermission
from data import model
from endpoints.registry import set_cache_headers
from util.dockerimportformat import build_docker_import_stream
from werkzeug.wsgi import wrap_file
verbs = Blueprint('verbs', __name__)
logger = logging.getLogger(__name__)
@verbs.route('/<namespace>/<repository>/<tag>/squash', methods=['GET'])
@process_auth
@set_cache_headers
def get_squashed_tag(namespace, repository, tag, headers):
permission = ReadRepositoryPermission(namespace, repository)
if permission.can() or model.repository_is_public(namespace, repository):
# Lookup the requested tag.
tag_image = model.get_tag_image(namespace, repository, tag)
if not tag_image:
abort(404)
# Lookup the tag's image and storage.
repo_image = model.get_repo_image(namespace, repository, tag_image.docker_image_id)
if not repo_image:
abort(404)
# Calculate a synthetic image ID by hashing the *image storage ID* with our
# secret. This is done to prevent the ID being guessable/overwritable by
# external pushes.
unhashed = str(repo_image.storage.id) + ':' + app.config['SECRET_KEY']
synthetic_image_id = hashlib.sha256(unhashed).hexdigest()
# Load the ancestry for the image.
uuid = repo_image.storage.uuid
ancestry_data = store.get_content(repo_image.storage.locations, store.image_ancestry_path(uuid))
full_image_list = json.loads(ancestry_data)
# Load the JSON for the image.
json_data = store.get_content(repo_image.storage.locations, store.image_json_path(uuid))
layer_json = json.loads(json_data)
def get_next_image():
for current_image_id in full_image_list:
yield model.get_repo_image(namespace, repository, current_image_id)
def get_next_layer():
for current_image_id in full_image_list:
current_image_entry = model.get_repo_image(namespace, repository, current_image_id)
current_image_path = store.image_layer_path(current_image_entry.storage.uuid)
current_image_stream = store.stream_read_file(current_image_entry.storage.locations,
current_image_path)
logger.debug('Returning image layer %s: %s' % (current_image_id, current_image_path))
yield current_image_stream
stream = build_docker_import_stream(namespace, repository, tag, synthetic_image_id,
layer_json, get_next_image, get_next_layer)
return app.response_class(wrap_file(request.environ, stream, 1024 * 16),
mimetype='application/octet-stream',
direct_passthrough=True)
abort(403)

View file

@ -18,6 +18,7 @@ def _location_aware(unbound_func):
storage = self._storages[preferred] storage = self._storages[preferred]
if not storage: if not storage:
print locations
storage = self._storages[random.sample(locations, 1)[0]] storage = self._storages[random.sample(locations, 1)[0]]
storage_func = getattr(storage, unbound_func.__name__) storage_func = getattr(storage, unbound_func.__name__)

123
util/dockerimportformat.py Normal file
View file

@ -0,0 +1,123 @@
from util.gzipwrap import GzipWrap
from util.streamlayerformat import StreamLayerMerger
import copy
import json
import tarfile
from itertools import chain, islice
class some_magic_adaptor(object):
def __init__(self, src):
self.src = chain.from_iterable(src)
def read(self, n):
return "".join(islice(self.src, None, n))
def build_docker_import_stream(namespace, repository, tag, synthetic_image_id,
layer_json, get_image_iterator, get_layer_iterator):
""" Builds and streams a synthetic .tar.gz that represents a squashed version
of the given layers, in `docker import` V1 format.
"""
return some_magic_adaptor(_import_format_generator(namespace, repository, tag,
synthetic_image_id, layer_json,
get_image_iterator, get_layer_iterator))
def _import_format_generator(namespace, repository, tag, synthetic_image_id,
layer_json, get_image_iterator, get_layer_iterator):
# Docker import V1 Format (.tar):
# repositories - JSON file containing a repo -> tag -> image map
# {image ID folder}:
# json - The layer JSON
# layer.tar - The TARed contents of the layer
# VERSION - The docker import version: '1.0'
layer_merger = StreamLayerMerger(get_layer_iterator)
# Yield the repositories file:
synthetic_layer_info = {}
synthetic_layer_info[tag + '.squash'] = synthetic_image_id
repositories = {}
repositories[namespace + '/' + repository] = synthetic_layer_info
yield _tar_file('repositories', json.dumps(repositories))
# Yield the image ID folder.
yield _tar_folder(synthetic_image_id)
# Yield the JSON layer data.
layer_json = _build_layer_json(layer_json, synthetic_image_id)
yield _tar_file(synthetic_image_id + '/json', json.dumps(layer_json))
# Yield the VERSION file.
yield _tar_file(synthetic_image_id + '/VERSION', '1.0')
# Yield the merged layer data's header.
estimated_file_size = 0
for image in get_image_iterator():
estimated_file_size += image.storage.uncompressed_size or 0
yield _tar_file_header(synthetic_image_id + '/layer.tar', estimated_file_size)
# Yield the contents of the merged layer.
yielded_size = 0
for entry in layer_merger.get_generator():
yield entry
yielded_size += len(entry)
# If the yielded size is less than the estimated size (which is likely), fill the rest with
# zeros.
if yielded_size < estimated_file_size:
yield '\0' * (estimated_file_size - yielded_size)
print estimated_file_size
print yielded_size
# Yield any file padding to 512 bytes that is necessary.
yield _tar_file_padding(estimated_file_size)
# Last two records are empty in TAR spec.
yield '\0' * 512
yield '\0' * 512
def _build_layer_json(layer_json, synthetic_image_id):
updated_json = copy.deepcopy(layer_json)
updated_json['id'] = synthetic_image_id
if 'parent' in updated_json:
del updated_json['parent']
if 'config' in updated_json and 'Image' in updated_json['config']:
updated_json['config']['Image'] = synthetic_image_id
if 'container_config' in updated_json and 'Image' in updated_json['container_config']:
updated_json['container_config']['Image'] = synthetic_image_id
return updated_json
def _tar_file(name, contents):
length = len(contents)
tar_data = _tar_file_header(name, length)
tar_data += contents
tar_data += _tar_file_padding(length)
return tar_data
def _tar_file_padding(length):
if length % 512 != 0:
return '\0' * (512 - (length % 512))
def _tar_file_header(name, file_size):
info = tarfile.TarInfo(name=name)
info.type = tarfile.REGTYPE
info.size = file_size
return info.tobuf()
def _tar_folder(name):
info = tarfile.TarInfo(name=name)
info.type = tarfile.DIRTYPE
return info.tobuf()

42
util/gzipwrap.py Normal file
View file

@ -0,0 +1,42 @@
from gzip import GzipFile
class GzipWrap(object):
def __init__(self, input, filename=None, compresslevel=1):
self.input = iter(input)
self.buffer = ''
self.zipper = GzipFile(filename, mode='wb', fileobj=self, compresslevel=compresslevel)
def read(self, size=-1):
# If the buffer already has enough bytes, then simply pop them off of
# the beginning and return them.
if len(self.buffer) >= size:
ret = self.buffer[0:size]
self.buffer = self.buffer[size:]
return ret
# Otherwise, zip the input until we have enough bytes.
while True:
# Attempt to retrieve the next bytes to write.
is_done = False
try:
s = self.input.next()
self.zipper.write(s)
except StopIteration:
is_done = True
if len(self.buffer) < size or is_done:
self.zipper.flush()
if len(self.buffer) >= size or is_done:
ret = self.buffer[0:size]
self.buffer = self.buffer[size:]
return ret
def flush(self):
pass
def write(self, data):
self.buffer += data
def close(self):
self.input.close()

99
util/streamlayerformat.py Normal file
View file

@ -0,0 +1,99 @@
import marisa_trie
import os
import tarfile
import StringIO
import traceback
AUFS_METADATA = u'.wh..wh.'
AUFS_WHITEOUT = u'.wh.'
AUFS_WHITEOUT_PREFIX_LENGTH = len(AUFS_WHITEOUT)
class StreamLayerMerger(object):
""" Class which creates a generator of the combined TAR data for a set of Docker layers. """
def __init__(self, layer_iterator):
self.trie = marisa_trie.Trie()
self.layer_iterator = layer_iterator
self.encountered = []
def get_generator(self):
for current_layer in self.layer_iterator():
# Read the current layer as TAR. If it is empty, we just continue
# to the next layer.
try:
tar_file = tarfile.open(mode='r|*', fileobj=current_layer)
except tarfile.ReadError as re:
continue
# For each of the tar entries, yield them IF and ONLY IF we have not
# encountered the path before.
# 9MB (+ padding below) so that it matches the 10MB expected by Gzip.
chunk_size = 1024 * 1024 * 9
for tar_info in tar_file:
result = self.process_tar_info(tar_info)
if not result:
continue
(tarinfo, filebuf) = result
yield tarinfo.tobuf()
if filebuf:
length = 0
file_stream = tar_file.extractfile(tarinfo)
while True:
current_block = file_stream.read(chunk_size)
if not len(current_block):
break
yield current_block
length += len(current_block)
file_stream.close()
# Files must be padding to 512 byte multiples.
if length % 512 != 0:
yield '\0' * (512 - (length % 512))
# Close the layer stream now that we're done with it.
tar_file.close()
# Update the trie with the new encountered entries.
self.trie = marisa_trie.Trie(self.encountered)
# Last two records are empty in TAR spec.
yield '\0' * 512
yield '\0' * 512
def process_tar_info(self, tar_info):
absolute = os.path.relpath(tar_info.name.decode('utf-8'), './')
dirname = os.path.dirname(absolute)
filename = os.path.basename(absolute)
# Skip directories and metadata
if (filename.startswith(AUFS_METADATA) or
absolute.startswith(AUFS_METADATA)):
# Skip
return None
elif filename.startswith(AUFS_WHITEOUT):
removed_filename = filename[AUFS_WHITEOUT_PREFIX_LENGTH:]
removed_prefix = os.path.join('/', dirname, removed_filename)
self.encountered.append(removed_prefix)
return None
# Check if this file has already been encountered somewhere. If so,
# skip it.
if unicode(absolute) in self.trie:
return None
self.encountered.append(absolute)
if tar_info.isdir() or tar_info.issym() or tar_info.islnk():
return (tar_info, False)
elif tar_info.isfile():
return (tar_info, True)