From a1164269be33eb929d38623d0365f6aea6a094c2 Mon Sep 17 00:00:00 2001 From: yackob03 Date: Fri, 18 Oct 2013 14:31:14 -0400 Subject: [PATCH] Complete the diff generating functionality. --- endpoints/api.py | 18 +++++++++----- endpoints/registry.py | 50 ++++++++++++++++++++++++++++++++++--- storage/__init__.py | 9 ++++--- storage/local.py | 4 +++ storage/s3.py | 7 ++++++ util/changes.py | 57 ++++++++++++++++++++++++++++--------------- 6 files changed, 113 insertions(+), 32 deletions(-) diff --git a/endpoints/api.py b/endpoints/api.py index bac9b29da..ae76eac16 100644 --- a/endpoints/api.py +++ b/endpoints/api.py @@ -1,5 +1,6 @@ import logging import stripe +import re from flask import request, make_response, jsonify, abort from flask.ext.login import login_required, current_user, logout_user @@ -7,6 +8,8 @@ from flask.ext.principal import identity_changed, AnonymousIdentity from functools import wraps from collections import defaultdict +import storage + from data import model from app import app from util.email import send_confirmation_email, send_recovery_email @@ -17,8 +20,9 @@ from auth.permissions import (ReadRepositoryPermission, AdministerRepositoryPermission) from endpoints import registry from endpoints.web import common_login -import re + +store = storage.load() logger = logging.getLogger(__name__) @@ -366,11 +370,13 @@ def list_repository_images(namespace, repository): def get_repository_changes(namespace, repository, image_id): permission = ReadRepositoryPermission(namespace, repository) if permission.can() or model.repository_is_public(namespace, repository): - return jsonify({ - 'added': [], - 'changed': [], - 'removed': [] - }) + diffs_path = store.image_file_diffs_path(namespace, repository, image_id) + + try: + response_json = store.get_content(diffs_path) + return make_response(response_json) + except IOError: + abort(404) abort(403) diff --git a/endpoints/registry.py b/endpoints/registry.py index 6f15c0c09..0ea629035 100644 --- a/endpoints/registry.py +++ b/endpoints/registry.py @@ -1,4 +1,3 @@ - import logging import json @@ -11,7 +10,7 @@ import storage from app import app from auth.auth import process_auth, extract_namespace_repo_from_session -from util import checksums +from util import checksums, changes from auth.permissions import (ReadRepositoryPermission, ModifyRepositoryPermission) from data import model @@ -315,6 +314,49 @@ def delete_repository_storage(namespace, repository): def process_image_changes(namespace, repository, image_id): - return + logger.debug('Generating diffs for image: %s' % image_id) + + image_diffs_path = store.image_file_diffs_path(namespace, repository, + image_id) + if store.exists(image_diffs_path): + logger.debug('Diffs already exist for image: %s' % image_id) + return + image = model.get_image_by_id(namespace, repository, image_id) - model.get_parent_images() \ No newline at end of file + parents = model.get_parent_images(image) + + # Compute the diffs and fs for the parent first if necessary + parent_trie_path = None + if parents: + parent_trie_path = process_image_changes(namespace, repository, + parents[-1].docker_image_id) + + # Read in the collapsed layer state of the filesystem for the parent + parent_trie = changes.empty_fs() + if parent_trie_path: + with store.stream_read_file(parent_trie_path) as parent_trie_stream: + parent_trie.read(parent_trie_stream) + + # Read in the file entries from the layer tar file + layer_path = store.image_layer_path(namespace, repository, image_id) + with store.stream_read_file(layer_path) as layer_tar_stream: + removed_files = set() + layer_files = changes.files_and_dirs_from_tar(layer_tar_stream, + removed_files) + + new_metadata = changes.compute_new_diffs_and_fs(parent_trie, layer_files, + removed_files) + (new_trie, added, changed, removed) = new_metadata + + # Write out the new trie + new_trie_path = store.image_file_trie_path(namespace, repository, image_id) + store.put_content(new_trie_path, new_trie.tobytes()) + + # Write out the diffs + diffs = {} + sections = ('added', 'changed', 'removed') + for section, source_trie in zip(sections, new_metadata[1:]): + diffs[section] = list(source_trie) + store.put_content(image_diffs_path, json.dumps(diffs, indent=2)) + + return new_trie_path diff --git a/storage/__init__.py b/storage/__init__.py index 2a02e22b5..8c0efab31 100644 --- a/storage/__init__.py +++ b/storage/__init__.py @@ -58,9 +58,9 @@ class Storage(object): return '{0}/{1}/{2}/{3}/files.trie'.format(self.images, namespace, repository, image_id) - def image_file_diffs_trie_path(self, namespace, repository, image_id): - return '{0}/{1}/{2}/{3}/diffs.pkl'.format(self.images, namespace, - repository, image_id) + def image_file_diffs_path(self, namespace, repository, image_id): + return '{0}/{1}/{2}/{3}/diffs.json'.format(self.images, namespace, + repository, image_id) def get_content(self, path): raise NotImplementedError @@ -71,6 +71,9 @@ class Storage(object): def stream_read(self, path): raise NotImplementedError + def stream_read_file(self, path): + raise NotImplementedError + def stream_write(self, path, fp): raise NotImplementedError diff --git a/storage/local.py b/storage/local.py index 42980713c..93b2103ca 100644 --- a/storage/local.py +++ b/storage/local.py @@ -38,6 +38,10 @@ class LocalStorage(Storage): break yield buf + def stream_read_file(self, path): + path = self._init_path(path) + return open(path, mode='rb') + def stream_write(self, path, fp): # Size is mandatory path = self._init_path(path, create=True) diff --git a/storage/s3.py b/storage/s3.py index d6e4fe5d1..26e73d127 100644 --- a/storage/s3.py +++ b/storage/s3.py @@ -60,6 +60,13 @@ class S3Storage(Storage): break yield buf + def stream_read_file(self, path): + path = self._init_path(path) + key = boto.s3.key.Key(self._s3_bucket, path) + if not key.exists(): + raise IOError('No such key: \'{0}\''.format(path)) + return key + def stream_write(self, path, fp): # Minimum size of upload part size on S3 is 5MB buffer_size = 5 * 1024 * 1024 diff --git a/util/changes.py b/util/changes.py index 807862635..617ae5529 100644 --- a/util/changes.py +++ b/util/changes.py @@ -1,5 +1,6 @@ import marisa_trie import os +import tarfile AUFS_METADATA = u'.wh..wh.' @@ -8,33 +9,34 @@ AUFS_WHITEOUT = u'.wh.' AUFS_WHITEOUT_PREFIX_LENGTH = len(AUFS_WHITEOUT) -def files_and_dirs(root_path, removed_prefix_collector): - for root, dirs, files in os.walk(unicode(root_path)): +def files_and_dirs_from_tar(source_stream, removed_prefix_collector): + tar_stream = tarfile.open(mode='r|*', fileobj=source_stream) + + for tar_info in tar_stream: + absolute = os.path.relpath(unicode(tar_info.name), './') + filename = os.path.basename(absolute) - rel_root = os.path.relpath(root, root_path) - yield rel_root + if (filename.startswith(AUFS_METADATA) or + absolute.startswith(AUFS_METADATA)): + # Skip + continue - for one_file in files: - if one_file.startswith(AUFS_METADATA): - # Skip - continue + elif filename.startswith(AUFS_WHITEOUT): + filename = filename[AUFS_WHITEOUT_PREFIX_LENGTH:] + removed_prefix_collector.add(absolute) + continue - elif one_file.startswith(AUFS_WHITEOUT): - filename = one_file[AUFS_WHITEOUT_PREFIX_LENGTH:] - removed_prefix_collector.add(os.path.join(rel_root, filename)) - continue - - else: - yield os.path.join(rel_root, one_file) + else: + yield absolute -def compute_removed(base_trie, removed_prefixes): +def __compute_removed(base_trie, removed_prefixes): for prefix in removed_prefixes: for filename in base_trie.keys(prefix): yield filename -def compute_added_changed(base_trie, delta_trie): +def __compute_added_changed(base_trie, delta_trie): added = set() changed = set() @@ -47,10 +49,27 @@ def compute_added_changed(base_trie, delta_trie): return added, changed -def new_fs(base_trie, added, removed): +def __new_fs(base_trie, added, removed): for filename in base_trie.keys(): if filename not in removed: yield filename for filename in added: - yield filename \ No newline at end of file + yield filename + + +def empty_fs(): + return marisa_trie.Trie() + + +def compute_new_diffs_and_fs(base_trie, filename_source, + removed_prefix_collector): + new_trie = marisa_trie.Trie(filename_source) + (new_added, new_changed) = __compute_added_changed(base_trie, new_trie) + + new_removed = marisa_trie.Trie(__compute_removed(base_trie, + removed_prefix_collector)) + + new_fs = marisa_trie.Trie(__new_fs(base_trie, new_added, new_removed)) + + return (new_fs, new_added, new_changed, new_removed.keys())