From 14263de7f8924e52867702ca65ed42e3222fa36e Mon Sep 17 00:00:00 2001 From: yackob03 Date: Fri, 15 Nov 2013 15:50:20 -0500 Subject: [PATCH] Extract some boilerplate from the worker and create a base class. Port the diffs worker over to the base. --- workers/diffsworker.py | 51 +++++++++------------------------------- workers/worker.py | 53 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 40 deletions(-) create mode 100644 workers/worker.py diff --git a/workers/diffsworker.py b/workers/diffsworker.py index cacf48856..1d6ee56af 100644 --- a/workers/diffsworker.py +++ b/workers/diffsworker.py @@ -1,15 +1,11 @@ import logging -import json import daemon -import time import argparse -from apscheduler.scheduler import Scheduler - from data.queue import image_diff_queue -from data.database import db as db_connection from data.model import DataModelException from endpoints.registry import process_image_changes +from workers.worker import Worker root_logger = logging.getLogger('') @@ -21,20 +17,13 @@ formatter = logging.Formatter(FORMAT) logger = logging.getLogger(__name__) -def process_work_items(): - logger.debug('Getting work item from queue.') +class DiffsWorker(Worker): + def process_queue_item(self, job_details): + image_id = job_details['image_id'] + namespace = job_details['namespace'] + repository = job_details['repository'] - item = image_diff_queue.get() - - while item: - logger.debug('Queue gave us some work: %s' % item.body) - - request = json.loads(item.body) try: - image_id = request['image_id'] - namespace = request['namespace'] - repository = request['repository'] - process_image_changes(namespace, repository, image_id) except DataModelException: # This exception is unrecoverable, and the item should continue and be @@ -43,27 +32,7 @@ def process_work_items(): (image_id, namespace, repository)) logger.warning(msg) - image_diff_queue.complete(item) - - item = image_diff_queue.get() - - logger.debug('No more work.') - - if not db_connection.is_closed(): - logger.debug('Closing thread db connection.') - db_connection.close() - - -def start_worker(): - logger.debug("Scheduling worker.") - - sched = Scheduler() - sched.start() - - sched.add_interval_job(process_work_items, seconds=30) - - while True: - time.sleep(60 * 60 * 24) # sleep one day, basically forever + return True parser = argparse.ArgumentParser(description='Worker daemon to compute diffs') @@ -74,15 +43,17 @@ parser.add_argument('--log', default='diffsworker.log', args = parser.parse_args() +worker = DiffsWorker(image_diff_queue) + if args.D: handler = logging.FileHandler(args.log) handler.setFormatter(formatter) root_logger.addHandler(handler) with daemon.DaemonContext(files_preserve=[handler.stream]): - start_worker() + worker.start() else: handler = logging.StreamHandler() handler.setFormatter(formatter) root_logger.addHandler(handler) - start_worker() \ No newline at end of file + worker.start() \ No newline at end of file diff --git a/workers/worker.py b/workers/worker.py new file mode 100644 index 000000000..be4984bdd --- /dev/null +++ b/workers/worker.py @@ -0,0 +1,53 @@ +import logging +import json + +from threading import Event +from apscheduler.scheduler import Scheduler + + +logger = logging.getLogger(__name__) + + +class Worker(object): + def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300): + self._sched = Scheduler() + self._poll_period_seconds = poll_period_seconds + self._reservation_seconds = reservation_seconds + self._stop = Event() + self._queue = queue + + def process_queue_item(self, job_details): + """ Return True if complete, False if it should be retried. """ + raise NotImplementedError('Workers must implement run.') + + def poll_queue(self): + logger.debug('Getting work item from queue.') + + item = self._queue.get() + while item: + logger.debug('Queue gave us some work: %s' % item.body) + + job_details = json.loads(item.body) + + if self.process_queue_item(job_details): + self._queue.complete(item) + else: + logger.warning('An error occurred processing request: %s' % item.body) + self._queue.incomplete(item) + + item = self._queue.get(processing_time=self._reservation_seconds) + + logger.debug('No more work.') + + def start(self): + logger.debug("Scheduling worker.") + + self._sched.start() + self._sched.add_interval_job(self.poll_queue, + seconds=self._poll_period_seconds) + + while not self._stop.wait(1): + pass + + def join(self): + self._stop.set()