Extract some boilerplate from the worker and create a base class. Port the diffs worker over to the base.

This commit is contained in:
yackob03 2013-11-15 15:50:20 -05:00
parent f07690956d
commit 14263de7f8
2 changed files with 64 additions and 40 deletions

View file

@ -1,15 +1,11 @@
import logging
import json
import daemon
import time
import argparse
from apscheduler.scheduler import Scheduler
from data.queue import image_diff_queue
from data.database import db as db_connection
from data.model import DataModelException
from endpoints.registry import process_image_changes
from workers.worker import Worker
root_logger = logging.getLogger('')
@ -21,20 +17,13 @@ formatter = logging.Formatter(FORMAT)
logger = logging.getLogger(__name__)
def process_work_items():
logger.debug('Getting work item from queue.')
class DiffsWorker(Worker):
def process_queue_item(self, job_details):
image_id = job_details['image_id']
namespace = job_details['namespace']
repository = job_details['repository']
item = image_diff_queue.get()
while item:
logger.debug('Queue gave us some work: %s' % item.body)
request = json.loads(item.body)
try:
image_id = request['image_id']
namespace = request['namespace']
repository = request['repository']
process_image_changes(namespace, repository, image_id)
except DataModelException:
# This exception is unrecoverable, and the item should continue and be
@ -43,27 +32,7 @@ def process_work_items():
(image_id, namespace, repository))
logger.warning(msg)
image_diff_queue.complete(item)
item = image_diff_queue.get()
logger.debug('No more work.')
if not db_connection.is_closed():
logger.debug('Closing thread db connection.')
db_connection.close()
def start_worker():
logger.debug("Scheduling worker.")
sched = Scheduler()
sched.start()
sched.add_interval_job(process_work_items, seconds=30)
while True:
time.sleep(60 * 60 * 24) # sleep one day, basically forever
return True
parser = argparse.ArgumentParser(description='Worker daemon to compute diffs')
@ -74,15 +43,17 @@ parser.add_argument('--log', default='diffsworker.log',
args = parser.parse_args()
worker = DiffsWorker(image_diff_queue)
if args.D:
handler = logging.FileHandler(args.log)
handler.setFormatter(formatter)
root_logger.addHandler(handler)
with daemon.DaemonContext(files_preserve=[handler.stream]):
start_worker()
worker.start()
else:
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger.addHandler(handler)
start_worker()
worker.start()

53
workers/worker.py Normal file
View file

@ -0,0 +1,53 @@
import logging
import json
from threading import Event
from apscheduler.scheduler import Scheduler
logger = logging.getLogger(__name__)
class Worker(object):
def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300):
self._sched = Scheduler()
self._poll_period_seconds = poll_period_seconds
self._reservation_seconds = reservation_seconds
self._stop = Event()
self._queue = queue
def process_queue_item(self, job_details):
""" Return True if complete, False if it should be retried. """
raise NotImplementedError('Workers must implement run.')
def poll_queue(self):
logger.debug('Getting work item from queue.')
item = self._queue.get()
while item:
logger.debug('Queue gave us some work: %s' % item.body)
job_details = json.loads(item.body)
if self.process_queue_item(job_details):
self._queue.complete(item)
else:
logger.warning('An error occurred processing request: %s' % item.body)
self._queue.incomplete(item)
item = self._queue.get(processing_time=self._reservation_seconds)
logger.debug('No more work.')
def start(self):
logger.debug("Scheduling worker.")
self._sched.start()
self._sched.add_interval_job(self.poll_queue,
seconds=self._poll_period_seconds)
while not self._stop.wait(1):
pass
def join(self):
self._stop.set()