Update the worker code to better handle exceptions, fix the utcdate issue and make sure we send the proper retry. Also updates notification workers to send JobExceptions rather than returning true or false
This commit is contained in:
parent
8c00eabedd
commit
f23038c6ee
6 changed files with 62 additions and 60 deletions
|
@ -33,7 +33,8 @@ class DiffsWorker(Worker):
|
|||
|
||||
return True
|
||||
|
||||
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
|
||||
if __name__ == "__main__":
|
||||
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
|
||||
|
||||
worker = DiffsWorker(image_diff_queue)
|
||||
worker.start()
|
||||
worker = DiffsWorker(image_diff_queue)
|
||||
worker.start()
|
||||
|
|
|
@ -8,6 +8,7 @@ from workers.worker import Worker
|
|||
|
||||
from endpoints.notificationmethod import NotificationMethod, InvalidNotificationMethodException
|
||||
from endpoints.notificationevent import NotificationEvent, InvalidNotificationEventException
|
||||
from workers.worker import JobException
|
||||
|
||||
from data import model
|
||||
|
||||
|
@ -29,7 +30,7 @@ class NotificationWorker(Worker):
|
|||
notification = model.get_repo_notification(repo_namespace, repo_name, notification_uuid)
|
||||
if not notification:
|
||||
# Probably deleted.
|
||||
return True
|
||||
return
|
||||
|
||||
event_name = notification.event.name
|
||||
method_name = notification.method.name
|
||||
|
@ -39,15 +40,17 @@ class NotificationWorker(Worker):
|
|||
method_handler = NotificationMethod.get_method(method_name)
|
||||
except InvalidNotificationMethodException as ex:
|
||||
logger.exception('Cannot find notification method: %s' % ex.message)
|
||||
return False
|
||||
raise JobException('Cannot find notification method: %s' % ex.message)
|
||||
except InvalidNotificationEventException as ex:
|
||||
logger.exception('Cannot find notification method: %s' % ex.message)
|
||||
return False
|
||||
logger.exception('Cannot find notification event: %s' % ex.message)
|
||||
raise JobException('Cannot find notification event: %s' % ex.message)
|
||||
|
||||
return method_handler.perform(notification, event_handler, job_details)
|
||||
method_handler.perform(notification, event_handler, job_details)
|
||||
|
||||
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
|
||||
|
||||
worker = NotificationWorker(notification_queue, poll_period_seconds=15,
|
||||
reservation_seconds=3600)
|
||||
worker.start()
|
||||
if __name__ == "__main__":
|
||||
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
|
||||
|
||||
worker = NotificationWorker(notification_queue, poll_period_seconds=10, reservation_seconds=30,
|
||||
retry_after_seconds=30)
|
||||
worker.start()
|
||||
|
|
|
@ -63,11 +63,12 @@ class WorkerStatusHandler(BaseHTTPRequestHandler):
|
|||
|
||||
class Worker(object):
|
||||
def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300,
|
||||
watchdog_period_seconds=60):
|
||||
watchdog_period_seconds=60, retry_after_seconds=300):
|
||||
self._sched = BackgroundScheduler()
|
||||
self._poll_period_seconds = poll_period_seconds
|
||||
self._reservation_seconds = reservation_seconds
|
||||
self._watchdog_period_seconds = watchdog_period_seconds
|
||||
self._retry_after_seconds = retry_after_seconds
|
||||
self._stop = Event()
|
||||
self._terminated = Event()
|
||||
self._queue = queue
|
||||
|
@ -103,7 +104,8 @@ class Worker(object):
|
|||
try:
|
||||
self.watchdog()
|
||||
except WorkerUnhealthyException as exc:
|
||||
logger.error('The worker has encountered an error via watchdog and will not take new jobs: %s' % exc.message)
|
||||
logger.error('The worker has encountered an error via watchdog and will not take new jobs')
|
||||
logger.error(exc.message)
|
||||
self.mark_current_incomplete(restore_retry=True)
|
||||
self._stop.set()
|
||||
|
||||
|
@ -111,7 +113,7 @@ class Worker(object):
|
|||
logger.debug('Getting work item from queue.')
|
||||
|
||||
with self._current_item_lock:
|
||||
self.current_queue_item = self._queue.get()
|
||||
self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds)
|
||||
|
||||
while True:
|
||||
# Retrieve the current item in the queue over which to operate. We do so under
|
||||
|
@ -129,12 +131,14 @@ class Worker(object):
|
|||
self.process_queue_item(job_details)
|
||||
self.mark_current_complete()
|
||||
|
||||
except JobException:
|
||||
except JobException as jex:
|
||||
logger.warning('An error occurred processing request: %s', current_queue_item.body)
|
||||
logger.warning('Job exception: %s' % jex)
|
||||
self.mark_current_incomplete(restore_retry=False)
|
||||
|
||||
except WorkerUnhealthyException as exc:
|
||||
logger.error('The worker has encountered an error via the job and will not take new jobs: %s' % exc.message)
|
||||
logger.error('The worker has encountered an error via the job and will not take new jobs')
|
||||
logger.error(exc.message)
|
||||
self.mark_current_incomplete(restore_retry=True)
|
||||
self._stop.set()
|
||||
|
||||
|
@ -190,7 +194,8 @@ class Worker(object):
|
|||
def mark_current_incomplete(self, restore_retry=False):
|
||||
with self._current_item_lock:
|
||||
if self.current_queue_item is not None:
|
||||
self._queue.incomplete(self.current_queue_item, restore_retry=restore_retry)
|
||||
self._queue.incomplete(self.current_queue_item, restore_retry=restore_retry,
|
||||
retry_after=self._retry_after_seconds)
|
||||
self.current_queue_item = None
|
||||
|
||||
def mark_current_complete(self):
|
||||
|
|
Reference in a new issue