From 93b856bdb314c0e96b1d50f2743f5944ce589a9d Mon Sep 17 00:00:00 2001 From: yackob03 Date: Thu, 17 Oct 2013 18:25:19 -0400 Subject: [PATCH] First few changes for the image diffs feature. --- data/database.py | 11 ++++++++- data/queue.py | 55 +++++++++++++++++++++++++++++++++++++++++ endpoints/api.py | 15 +++++++++++ endpoints/registry.py | 6 +++++ requirements-nover.txt | 4 ++- storage/__init__.py | 8 ++++++ util/changes.py | 56 ++++++++++++++++++++++++++++++++++++++++++ workers/__init__.py | 0 workers/image_diffs.py | 36 +++++++++++++++++++++++++++ 9 files changed, 189 insertions(+), 2 deletions(-) create mode 100644 data/queue.py create mode 100644 util/changes.py create mode 100644 workers/__init__.py create mode 100644 workers/image_diffs.py diff --git a/data/database.py b/data/database.py index 04fd33b8a..da7c19282 100644 --- a/data/database.py +++ b/data/database.py @@ -150,10 +150,19 @@ class RepositoryTag(BaseModel): ) +class QueueItem(BaseMode): + queue_name = CharField(index=True) + body = TextField() + available_after = DateTimeField(default=datetime.now, index=True) + available = BooleanField(default=True, index=True) + processing_expires = DateTimeField(null=True, index=True) + + def initialize_db(): create_model_tables([User, Repository, Image, AccessToken, Role, RepositoryPermission, Visibility, RepositoryTag, - EmailConfirmation, FederatedLogin, LoginService]) + EmailConfirmation, FederatedLogin, LoginService, + QueueItem]) Role.create(name='admin') Role.create(name='write') Role.create(name='read') diff --git a/data/queue.py b/data/queue.py new file mode 100644 index 000000000..fb3421466 --- /dev/null +++ b/data/queue.py @@ -0,0 +1,55 @@ +from datetime import datetime, timedelta + +from database import QueueItem + + +class WorkQueue(object): + def __init__(self, queue_name): + self.queue_name = queue_name + + def put(message, available_after=0): + """ + Put an item, if it shouldn't be processed for some number of seconds, + specify that amount as available_after. + """ + + params = { + 'queue_name': self.queue_name, + 'body': message, + } + + if available_after: + available_date = datetime.now() + timedelta(seconds=available_after) + params['available_after'] = available_date + + QueueItem.create(**params) + + def get(processing_time=300): + """ + Get an available item and mark it as unavailable for the default of five + minutes. + """ + now = datetime.now() + available_or_expired = (QueueItem.available == True | + QueueItem.processing_expires <= now) + + # TODO the query and the update should be atomic, but for now we only + # have one worker. + avaial = QueueItem.select().where(QueueItem.queue_name = self.queue_name, + QueueItem.available_after <= now, + available_or_expired) + + found = list(avail.limit(1).order_by(QueueItem.available_after)) + + if found: + item = found[0] + item.available = False + item.processing_expires = now + timedelta(seconds=processing_time) + item.save() + + return item + + return None + + def complete(completed_item): + item.delete_instance() diff --git a/endpoints/api.py b/endpoints/api.py index 08344e38b..bac9b29da 100644 --- a/endpoints/api.py +++ b/endpoints/api.py @@ -360,6 +360,21 @@ def list_repository_images(namespace, repository): abort(403) +@app.route('/api/repository//image//changes', + methods=['GET']) +@parse_repository_name +def get_repository_changes(namespace, repository, image_id): + permission = ReadRepositoryPermission(namespace, repository) + if permission.can() or model.repository_is_public(namespace, repository): + return jsonify({ + 'added': [], + 'changed': [], + 'removed': [] + }) + + abort(403) + + @app.route('/api/repository//tag//images', methods=['GET']) @parse_repository_name diff --git a/endpoints/registry.py b/endpoints/registry.py index 24f08ddef..6f15c0c09 100644 --- a/endpoints/registry.py +++ b/endpoints/registry.py @@ -312,3 +312,9 @@ def delete_repository_storage(namespace, repository): logger.debug('Recursively deleting path: %s' % repository_path) store.remove(repository_path) + + +def process_image_changes(namespace, repository, image_id): + return + image = model.get_image_by_id(namespace, repository, image_id) + model.get_parent_images() \ No newline at end of file diff --git a/requirements-nover.txt b/requirements-nover.txt index cd844d0cb..f35cc670b 100644 --- a/requirements-nover.txt +++ b/requirements-nover.txt @@ -11,4 +11,6 @@ stripe gunicorn eventlet mixpanel-py -beautifulsoup4 \ No newline at end of file +beautifulsoup4 +marisa-trie +apscheduler \ No newline at end of file diff --git a/storage/__init__.py b/storage/__init__.py index 2a26b5883..2a02e22b5 100644 --- a/storage/__init__.py +++ b/storage/__init__.py @@ -54,6 +54,14 @@ class Storage(object): def repository_namespace_path(self, namespace, repository): return '{0}/{1}/{2}/'.format(self.images, namespace, repository) + def image_file_trie_path(self, namespace, repository, image_id): + return '{0}/{1}/{2}/{3}/files.trie'.format(self.images, namespace, + repository, image_id) + + def image_file_diffs_trie_path(self, namespace, repository, image_id): + return '{0}/{1}/{2}/{3}/diffs.pkl'.format(self.images, namespace, + repository, image_id) + def get_content(self, path): raise NotImplementedError diff --git a/util/changes.py b/util/changes.py new file mode 100644 index 000000000..807862635 --- /dev/null +++ b/util/changes.py @@ -0,0 +1,56 @@ +import marisa_trie +import os + + +AUFS_METADATA = u'.wh..wh.' + +AUFS_WHITEOUT = u'.wh.' +AUFS_WHITEOUT_PREFIX_LENGTH = len(AUFS_WHITEOUT) + + +def files_and_dirs(root_path, removed_prefix_collector): + for root, dirs, files in os.walk(unicode(root_path)): + + rel_root = os.path.relpath(root, root_path) + yield rel_root + + for one_file in files: + if one_file.startswith(AUFS_METADATA): + # Skip + continue + + elif one_file.startswith(AUFS_WHITEOUT): + filename = one_file[AUFS_WHITEOUT_PREFIX_LENGTH:] + removed_prefix_collector.add(os.path.join(rel_root, filename)) + continue + + else: + yield os.path.join(rel_root, one_file) + + +def compute_removed(base_trie, removed_prefixes): + for prefix in removed_prefixes: + for filename in base_trie.keys(prefix): + yield filename + + +def compute_added_changed(base_trie, delta_trie): + added = set() + changed = set() + + for filename in delta_trie.keys(): + if filename not in base_trie: + added.add(filename) + else: + changed.add(filename) + + return added, changed + + +def new_fs(base_trie, added, removed): + for filename in base_trie.keys(): + if filename not in removed: + yield filename + + for filename in added: + yield filename \ No newline at end of file diff --git a/workers/__init__.py b/workers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/workers/image_diffs.py b/workers/image_diffs.py new file mode 100644 index 000000000..93bcd1b45 --- /dev/null +++ b/workers/image_diffs.py @@ -0,0 +1,36 @@ +import logging +import json + +from apscheduler.scheduler import Scheduler + +from data.queue import WorkQueue +from endpoints.registry import process_image_changes + + +logger = logging.getLogger(__name__) + +image_diff_queue = WorkQueue('imagediff') + + +def process_work_items(): + logger.debug('Getting work item from queue.') + + item = imagediff.get() + + if item: + logger.debug('Queue gave us some work: %s' % item.body) + + request = json.loads(item.body) + process_image_changes(request['namepspace'], request['repository'], + request['image_id']) + else: + logger.debug('No work today.') + + +FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - %(funcName)s - %(message)s' +logging.basicConfig(format=FORMAT, level=logging.DEBUG) + +sched = Scheduler() +sched.start() + +sched.add_interval_job(process_work_items, seconds=10)