First few changes for the image diffs feature.
This commit is contained in:
parent
b22a4aa24c
commit
93b856bdb3
9 changed files with 189 additions and 2 deletions
|
@ -150,10 +150,19 @@ class RepositoryTag(BaseModel):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class QueueItem(BaseMode):
|
||||||
|
queue_name = CharField(index=True)
|
||||||
|
body = TextField()
|
||||||
|
available_after = DateTimeField(default=datetime.now, index=True)
|
||||||
|
available = BooleanField(default=True, index=True)
|
||||||
|
processing_expires = DateTimeField(null=True, index=True)
|
||||||
|
|
||||||
|
|
||||||
def initialize_db():
|
def initialize_db():
|
||||||
create_model_tables([User, Repository, Image, AccessToken, Role,
|
create_model_tables([User, Repository, Image, AccessToken, Role,
|
||||||
RepositoryPermission, Visibility, RepositoryTag,
|
RepositoryPermission, Visibility, RepositoryTag,
|
||||||
EmailConfirmation, FederatedLogin, LoginService])
|
EmailConfirmation, FederatedLogin, LoginService,
|
||||||
|
QueueItem])
|
||||||
Role.create(name='admin')
|
Role.create(name='admin')
|
||||||
Role.create(name='write')
|
Role.create(name='write')
|
||||||
Role.create(name='read')
|
Role.create(name='read')
|
||||||
|
|
55
data/queue.py
Normal file
55
data/queue.py
Normal file
|
@ -0,0 +1,55 @@
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
|
from database import QueueItem
|
||||||
|
|
||||||
|
|
||||||
|
class WorkQueue(object):
|
||||||
|
def __init__(self, queue_name):
|
||||||
|
self.queue_name = queue_name
|
||||||
|
|
||||||
|
def put(message, available_after=0):
|
||||||
|
"""
|
||||||
|
Put an item, if it shouldn't be processed for some number of seconds,
|
||||||
|
specify that amount as available_after.
|
||||||
|
"""
|
||||||
|
|
||||||
|
params = {
|
||||||
|
'queue_name': self.queue_name,
|
||||||
|
'body': message,
|
||||||
|
}
|
||||||
|
|
||||||
|
if available_after:
|
||||||
|
available_date = datetime.now() + timedelta(seconds=available_after)
|
||||||
|
params['available_after'] = available_date
|
||||||
|
|
||||||
|
QueueItem.create(**params)
|
||||||
|
|
||||||
|
def get(processing_time=300):
|
||||||
|
"""
|
||||||
|
Get an available item and mark it as unavailable for the default of five
|
||||||
|
minutes.
|
||||||
|
"""
|
||||||
|
now = datetime.now()
|
||||||
|
available_or_expired = (QueueItem.available == True |
|
||||||
|
QueueItem.processing_expires <= now)
|
||||||
|
|
||||||
|
# TODO the query and the update should be atomic, but for now we only
|
||||||
|
# have one worker.
|
||||||
|
avaial = QueueItem.select().where(QueueItem.queue_name = self.queue_name,
|
||||||
|
QueueItem.available_after <= now,
|
||||||
|
available_or_expired)
|
||||||
|
|
||||||
|
found = list(avail.limit(1).order_by(QueueItem.available_after))
|
||||||
|
|
||||||
|
if found:
|
||||||
|
item = found[0]
|
||||||
|
item.available = False
|
||||||
|
item.processing_expires = now + timedelta(seconds=processing_time)
|
||||||
|
item.save()
|
||||||
|
|
||||||
|
return item
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def complete(completed_item):
|
||||||
|
item.delete_instance()
|
|
@ -360,6 +360,21 @@ def list_repository_images(namespace, repository):
|
||||||
abort(403)
|
abort(403)
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/api/repository/<path:repository>/image/<image_id>/changes',
|
||||||
|
methods=['GET'])
|
||||||
|
@parse_repository_name
|
||||||
|
def get_repository_changes(namespace, repository, image_id):
|
||||||
|
permission = ReadRepositoryPermission(namespace, repository)
|
||||||
|
if permission.can() or model.repository_is_public(namespace, repository):
|
||||||
|
return jsonify({
|
||||||
|
'added': [],
|
||||||
|
'changed': [],
|
||||||
|
'removed': []
|
||||||
|
})
|
||||||
|
|
||||||
|
abort(403)
|
||||||
|
|
||||||
|
|
||||||
@app.route('/api/repository/<path:repository>/tag/<tag>/images',
|
@app.route('/api/repository/<path:repository>/tag/<tag>/images',
|
||||||
methods=['GET'])
|
methods=['GET'])
|
||||||
@parse_repository_name
|
@parse_repository_name
|
||||||
|
|
|
@ -312,3 +312,9 @@ def delete_repository_storage(namespace, repository):
|
||||||
|
|
||||||
logger.debug('Recursively deleting path: %s' % repository_path)
|
logger.debug('Recursively deleting path: %s' % repository_path)
|
||||||
store.remove(repository_path)
|
store.remove(repository_path)
|
||||||
|
|
||||||
|
|
||||||
|
def process_image_changes(namespace, repository, image_id):
|
||||||
|
return
|
||||||
|
image = model.get_image_by_id(namespace, repository, image_id)
|
||||||
|
model.get_parent_images()
|
|
@ -12,3 +12,5 @@ gunicorn
|
||||||
eventlet
|
eventlet
|
||||||
mixpanel-py
|
mixpanel-py
|
||||||
beautifulsoup4
|
beautifulsoup4
|
||||||
|
marisa-trie
|
||||||
|
apscheduler
|
|
@ -54,6 +54,14 @@ class Storage(object):
|
||||||
def repository_namespace_path(self, namespace, repository):
|
def repository_namespace_path(self, namespace, repository):
|
||||||
return '{0}/{1}/{2}/'.format(self.images, namespace, repository)
|
return '{0}/{1}/{2}/'.format(self.images, namespace, repository)
|
||||||
|
|
||||||
|
def image_file_trie_path(self, namespace, repository, image_id):
|
||||||
|
return '{0}/{1}/{2}/{3}/files.trie'.format(self.images, namespace,
|
||||||
|
repository, image_id)
|
||||||
|
|
||||||
|
def image_file_diffs_trie_path(self, namespace, repository, image_id):
|
||||||
|
return '{0}/{1}/{2}/{3}/diffs.pkl'.format(self.images, namespace,
|
||||||
|
repository, image_id)
|
||||||
|
|
||||||
def get_content(self, path):
|
def get_content(self, path):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
56
util/changes.py
Normal file
56
util/changes.py
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
import marisa_trie
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
AUFS_METADATA = u'.wh..wh.'
|
||||||
|
|
||||||
|
AUFS_WHITEOUT = u'.wh.'
|
||||||
|
AUFS_WHITEOUT_PREFIX_LENGTH = len(AUFS_WHITEOUT)
|
||||||
|
|
||||||
|
|
||||||
|
def files_and_dirs(root_path, removed_prefix_collector):
|
||||||
|
for root, dirs, files in os.walk(unicode(root_path)):
|
||||||
|
|
||||||
|
rel_root = os.path.relpath(root, root_path)
|
||||||
|
yield rel_root
|
||||||
|
|
||||||
|
for one_file in files:
|
||||||
|
if one_file.startswith(AUFS_METADATA):
|
||||||
|
# Skip
|
||||||
|
continue
|
||||||
|
|
||||||
|
elif one_file.startswith(AUFS_WHITEOUT):
|
||||||
|
filename = one_file[AUFS_WHITEOUT_PREFIX_LENGTH:]
|
||||||
|
removed_prefix_collector.add(os.path.join(rel_root, filename))
|
||||||
|
continue
|
||||||
|
|
||||||
|
else:
|
||||||
|
yield os.path.join(rel_root, one_file)
|
||||||
|
|
||||||
|
|
||||||
|
def compute_removed(base_trie, removed_prefixes):
|
||||||
|
for prefix in removed_prefixes:
|
||||||
|
for filename in base_trie.keys(prefix):
|
||||||
|
yield filename
|
||||||
|
|
||||||
|
|
||||||
|
def compute_added_changed(base_trie, delta_trie):
|
||||||
|
added = set()
|
||||||
|
changed = set()
|
||||||
|
|
||||||
|
for filename in delta_trie.keys():
|
||||||
|
if filename not in base_trie:
|
||||||
|
added.add(filename)
|
||||||
|
else:
|
||||||
|
changed.add(filename)
|
||||||
|
|
||||||
|
return added, changed
|
||||||
|
|
||||||
|
|
||||||
|
def new_fs(base_trie, added, removed):
|
||||||
|
for filename in base_trie.keys():
|
||||||
|
if filename not in removed:
|
||||||
|
yield filename
|
||||||
|
|
||||||
|
for filename in added:
|
||||||
|
yield filename
|
0
workers/__init__.py
Normal file
0
workers/__init__.py
Normal file
36
workers/image_diffs.py
Normal file
36
workers/image_diffs.py
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
|
||||||
|
from apscheduler.scheduler import Scheduler
|
||||||
|
|
||||||
|
from data.queue import WorkQueue
|
||||||
|
from endpoints.registry import process_image_changes
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
image_diff_queue = WorkQueue('imagediff')
|
||||||
|
|
||||||
|
|
||||||
|
def process_work_items():
|
||||||
|
logger.debug('Getting work item from queue.')
|
||||||
|
|
||||||
|
item = imagediff.get()
|
||||||
|
|
||||||
|
if item:
|
||||||
|
logger.debug('Queue gave us some work: %s' % item.body)
|
||||||
|
|
||||||
|
request = json.loads(item.body)
|
||||||
|
process_image_changes(request['namepspace'], request['repository'],
|
||||||
|
request['image_id'])
|
||||||
|
else:
|
||||||
|
logger.debug('No work today.')
|
||||||
|
|
||||||
|
|
||||||
|
FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - %(funcName)s - %(message)s'
|
||||||
|
logging.basicConfig(format=FORMAT, level=logging.DEBUG)
|
||||||
|
|
||||||
|
sched = Scheduler()
|
||||||
|
sched.start()
|
||||||
|
|
||||||
|
sched.add_interval_job(process_work_items, seconds=10)
|
Reference in a new issue