Get the worker and the registry playing nice together.
This commit is contained in:
parent
63ffa52245
commit
2fcd8df42b
3 changed files with 30 additions and 5 deletions
|
@ -52,4 +52,7 @@ class WorkQueue(object):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def complete(self, completed_item):
|
def complete(self, completed_item):
|
||||||
item.delete_instance()
|
completed_item.delete_instance()
|
||||||
|
|
||||||
|
|
||||||
|
image_diff_queue = WorkQueue('imagediff')
|
||||||
|
|
|
@ -7,6 +7,7 @@ from datetime import datetime
|
||||||
from time import time
|
from time import time
|
||||||
|
|
||||||
import storage
|
import storage
|
||||||
|
from data.queue import image_diff_queue
|
||||||
|
|
||||||
from app import app
|
from app import app
|
||||||
from auth.auth import process_auth, extract_namespace_repo_from_session
|
from auth.auth import process_auth, extract_namespace_repo_from_session
|
||||||
|
@ -142,6 +143,16 @@ def put_image_layer(namespace, repository, image_id):
|
||||||
abort(400) # 'Checksum mismatch, ignoring the layer')
|
abort(400) # 'Checksum mismatch, ignoring the layer')
|
||||||
# Checksum is ok, we remove the marker
|
# Checksum is ok, we remove the marker
|
||||||
store.remove(mark_path)
|
store.remove(mark_path)
|
||||||
|
|
||||||
|
# The layer is ready for download, send a job to the work queue to
|
||||||
|
# process it.
|
||||||
|
logger.debug('Queing diffs job for image: %s' % image_id)
|
||||||
|
image_diff_queue.put(json.dumps({
|
||||||
|
'namespace': namespace,
|
||||||
|
'repository': repository,
|
||||||
|
'image_id': image_id,
|
||||||
|
}))
|
||||||
|
|
||||||
return make_response('true', 200)
|
return make_response('true', 200)
|
||||||
|
|
||||||
|
|
||||||
|
@ -173,6 +184,16 @@ def put_image_checksum(namespace, repository, image_id):
|
||||||
abort(400) # 'Checksum mismatch')
|
abort(400) # 'Checksum mismatch')
|
||||||
# Checksum is ok, we remove the marker
|
# Checksum is ok, we remove the marker
|
||||||
store.remove(mark_path)
|
store.remove(mark_path)
|
||||||
|
|
||||||
|
# The layer is ready for download, send a job to the work queue to
|
||||||
|
# process it.
|
||||||
|
logger.debug('Queing diffs job for image: %s' % image_id)
|
||||||
|
image_diff_queue.put(json.dumps({
|
||||||
|
'namespace': namespace,
|
||||||
|
'repository': repository,
|
||||||
|
'image_id': image_id,
|
||||||
|
}))
|
||||||
|
|
||||||
return make_response('true', 200)
|
return make_response('true', 200)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -5,14 +5,12 @@ import time
|
||||||
|
|
||||||
from apscheduler.scheduler import Scheduler
|
from apscheduler.scheduler import Scheduler
|
||||||
|
|
||||||
from data.queue import WorkQueue
|
from data.queue import image_diff_queue
|
||||||
from endpoints.registry import process_image_changes
|
from endpoints.registry import process_image_changes
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
image_diff_queue = WorkQueue('imagediff')
|
|
||||||
|
|
||||||
|
|
||||||
def process_work_items():
|
def process_work_items():
|
||||||
logger.debug('Getting work item from queue.')
|
logger.debug('Getting work item from queue.')
|
||||||
|
@ -23,8 +21,11 @@ def process_work_items():
|
||||||
logger.debug('Queue gave us some work: %s' % item.body)
|
logger.debug('Queue gave us some work: %s' % item.body)
|
||||||
|
|
||||||
request = json.loads(item.body)
|
request = json.loads(item.body)
|
||||||
process_image_changes(request['namepspace'], request['repository'],
|
process_image_changes(request['namespace'], request['repository'],
|
||||||
request['image_id'])
|
request['image_id'])
|
||||||
|
|
||||||
|
image_diff_queue.complete(item)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.debug('No work today.')
|
logger.debug('No work today.')
|
||||||
|
|
||||||
|
|
Reference in a new issue