From ba0963a81cb825b05f315eda26b5b212cb1eb2ae Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 22 Sep 2014 12:52:57 -0400 Subject: [PATCH] Update the worker code to better handle exceptions, fix the utcdate issue and make sure we send the proper retry. Also updates notification workers to send JobExceptions rather than returning true or false --- data/database.py | 2 +- data/queue.py | 5 +-- endpoints/notificationmethod.py | 70 +++++++++++++++------------------ workers/diffsworker.py | 7 ++-- workers/notificationworker.py | 21 +++++----- workers/worker.py | 17 +++++--- 6 files changed, 62 insertions(+), 60 deletions(-) diff --git a/data/database.py b/data/database.py index 2a824e1da..fea62286a 100644 --- a/data/database.py +++ b/data/database.py @@ -347,7 +347,7 @@ class RepositoryBuild(BaseModel): class QueueItem(BaseModel): queue_name = CharField(index=True, max_length=1024) body = TextField() - available_after = DateTimeField(default=datetime.now, index=True) + available_after = DateTimeField(default=datetime.utcnow, index=True) available = BooleanField(default=True, index=True) processing_expires = DateTimeField(null=True, index=True) retries_remaining = IntegerField(default=5) diff --git a/data/queue.py b/data/queue.py index 44d7ad531..79e645ebf 100644 --- a/data/queue.py +++ b/data/queue.py @@ -68,9 +68,8 @@ class WorkQueue(object): 'retries_remaining': retries_remaining, } - if available_after: - available_date = datetime.utcnow() + timedelta(seconds=available_after) - params['available_after'] = available_date + available_date = datetime.utcnow() + timedelta(seconds=available_after or 0) + params['available_after'] = available_date with self._transaction_factory(db): QueueItem.create(**params) diff --git a/endpoints/notificationmethod.py b/endpoints/notificationmethod.py index 9650e79f6..e6c259cb1 100644 --- a/endpoints/notificationmethod.py +++ b/endpoints/notificationmethod.py @@ -10,6 +10,7 @@ import re from flask.ext.mail import Message from app import mail, app, get_app_url from data import model +from workers.worker import JobException logger = logging.getLogger(__name__) @@ -19,6 +20,9 @@ class InvalidNotificationMethodException(Exception): class CannotValidateNotificationMethodException(Exception): pass +class NotificationMethodPerformException(JobException): + pass + class NotificationMethod(object): def __init__(self): @@ -105,19 +109,18 @@ class QuayNotificationMethod(NotificationMethod): repository = notification.repository if not repository: # Probably deleted. - return True + return # Lookup the target user or team to which we'll send the notification. config_data = json.loads(notification.config_json) status, err_message, target_users = self.find_targets(repository, config_data) if not status: - return False + raise NotificationMethodPerformException(err_message) # For each of the target users, create a notification. for target_user in set(target_users or []): model.create_notification(event_handler.event_name(), target_user, metadata=notification_data['event_data']) - return True class EmailMethod(NotificationMethod): @@ -141,7 +144,7 @@ class EmailMethod(NotificationMethod): config_data = json.loads(notification.config_json) email = config_data.get('email', '') if not email: - return False + return msg = Message(event_handler.get_summary(notification_data['event_data'], notification_data), sender='support@quay.io', @@ -153,9 +156,7 @@ class EmailMethod(NotificationMethod): mail.send(msg) except Exception as ex: logger.exception('Email was unable to be sent: %s' % ex.message) - return False - - return True + raise NotificationMethodPerformException(ex.message) class WebhookMethod(NotificationMethod): @@ -172,7 +173,7 @@ class WebhookMethod(NotificationMethod): config_data = json.loads(notification.config_json) url = config_data.get('url', '') if not url: - return False + return payload = notification_data['event_data'] headers = {'Content-type': 'application/json'} @@ -180,15 +181,14 @@ class WebhookMethod(NotificationMethod): try: resp = requests.post(url, data=json.dumps(payload), headers=headers) if resp.status_code/100 != 2: - logger.error('%s response for webhook to url: %s' % (resp.status_code, - url)) - return False + error_message = '%s response for webhook to url: %s' % (resp.status_code, url) + logger.error(error_message) + logger.error(resp.content) + raise NotificationMethodPerformException(error_message) except requests.exceptions.RequestException as ex: logger.exception('Webhook was unable to be sent: %s' % ex.message) - return False - - return True + raise NotificationMethodPerformException(ex.message) class FlowdockMethod(NotificationMethod): @@ -208,12 +208,12 @@ class FlowdockMethod(NotificationMethod): config_data = json.loads(notification.config_json) token = config_data.get('flow_api_token', '') if not token: - return False + return owner = model.get_user(notification.repository.namespace) if not owner: # Something went wrong. - return False + return url = 'https://api.flowdock.com/v1/messages/team_inbox/%s' % token headers = {'Content-type': 'application/json'} @@ -231,16 +231,14 @@ class FlowdockMethod(NotificationMethod): try: resp = requests.post(url, data=json.dumps(payload), headers=headers) if resp.status_code/100 != 2: - logger.error('%s response for flowdock to url: %s' % (resp.status_code, - url)) + error_message = '%s response for flowdock to url: %s' % (resp.status_code, url) + logger.error(error_message) logger.error(resp.content) - return False + raise NotificationMethodPerformException(error_message) except requests.exceptions.RequestException as ex: logger.exception('Flowdock method was unable to be sent: %s' % ex.message) - return False - - return True + raise NotificationMethodPerformException(ex.message) class HipchatMethod(NotificationMethod): @@ -265,12 +263,12 @@ class HipchatMethod(NotificationMethod): room_id = config_data.get('room_id', '') if not token or not room_id: - return False + return owner = model.get_user(notification.repository.namespace) if not owner: # Something went wrong. - return False + return url = 'https://api.hipchat.com/v2/room/%s/notification?auth_token=%s' % (room_id, token) @@ -293,16 +291,14 @@ class HipchatMethod(NotificationMethod): try: resp = requests.post(url, data=json.dumps(payload), headers=headers) if resp.status_code/100 != 2: - logger.error('%s response for hipchat to url: %s' % (resp.status_code, - url)) + error_message = '%s response for hipchat to url: %s' % (resp.status_code, url) + logger.error(error_message) logger.error(resp.content) - return False + raise NotificationMethodPerformException(error_message) except requests.exceptions.RequestException as ex: logger.exception('Hipchat method was unable to be sent: %s' % ex.message) - return False - - return True + raise NotificationMethodPerformException(ex.message) class SlackMethod(NotificationMethod): @@ -334,12 +330,12 @@ class SlackMethod(NotificationMethod): subdomain = config_data.get('subdomain', '') if not token or not subdomain: - return False + return owner = model.get_user(notification.repository.namespace) if not owner: # Something went wrong. - return False + return url = 'https://%s.slack.com/services/hooks/incoming-webhook?token=%s' % (subdomain, token) @@ -370,13 +366,11 @@ class SlackMethod(NotificationMethod): try: resp = requests.post(url, data=json.dumps(payload), headers=headers) if resp.status_code/100 != 2: - logger.error('%s response for Slack to url: %s' % (resp.status_code, - url)) + error_message = '%s response for Slack to url: %s' % (resp.status_code, url) + logger.error(error_message) logger.error(resp.content) - return False + raise NotificationMethodPerformException(error_message) except requests.exceptions.RequestException as ex: logger.exception('Slack method was unable to be sent: %s' % ex.message) - return False - - return True + raise NotificationMethodPerformException(ex.message) diff --git a/workers/diffsworker.py b/workers/diffsworker.py index 70f74f1db..563c61352 100644 --- a/workers/diffsworker.py +++ b/workers/diffsworker.py @@ -33,7 +33,8 @@ class DiffsWorker(Worker): return True -logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) +if __name__ == "__main__": + logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) -worker = DiffsWorker(image_diff_queue) -worker.start() + worker = DiffsWorker(image_diff_queue) + worker.start() diff --git a/workers/notificationworker.py b/workers/notificationworker.py index 2af0954f9..a176d46c8 100644 --- a/workers/notificationworker.py +++ b/workers/notificationworker.py @@ -8,6 +8,7 @@ from workers.worker import Worker from endpoints.notificationmethod import NotificationMethod, InvalidNotificationMethodException from endpoints.notificationevent import NotificationEvent, InvalidNotificationEventException +from workers.worker import JobException from data import model @@ -29,7 +30,7 @@ class NotificationWorker(Worker): notification = model.get_repo_notification(repo_namespace, repo_name, notification_uuid) if not notification: # Probably deleted. - return True + return event_name = notification.event.name method_name = notification.method.name @@ -39,15 +40,17 @@ class NotificationWorker(Worker): method_handler = NotificationMethod.get_method(method_name) except InvalidNotificationMethodException as ex: logger.exception('Cannot find notification method: %s' % ex.message) - return False + raise JobException('Cannot find notification method: %s' % ex.message) except InvalidNotificationEventException as ex: - logger.exception('Cannot find notification method: %s' % ex.message) - return False + logger.exception('Cannot find notification event: %s' % ex.message) + raise JobException('Cannot find notification event: %s' % ex.message) - return method_handler.perform(notification, event_handler, job_details) + method_handler.perform(notification, event_handler, job_details) -logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) -worker = NotificationWorker(notification_queue, poll_period_seconds=15, - reservation_seconds=3600) -worker.start() +if __name__ == "__main__": + logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) + + worker = NotificationWorker(notification_queue, poll_period_seconds=10, reservation_seconds=30, + retry_after_seconds=30) + worker.start() diff --git a/workers/worker.py b/workers/worker.py index 57d4a02d0..03066dc6e 100644 --- a/workers/worker.py +++ b/workers/worker.py @@ -63,11 +63,12 @@ class WorkerStatusHandler(BaseHTTPRequestHandler): class Worker(object): def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300, - watchdog_period_seconds=60): + watchdog_period_seconds=60, retry_after_seconds=300): self._sched = BackgroundScheduler() self._poll_period_seconds = poll_period_seconds self._reservation_seconds = reservation_seconds self._watchdog_period_seconds = watchdog_period_seconds + self._retry_after_seconds = retry_after_seconds self._stop = Event() self._terminated = Event() self._queue = queue @@ -103,7 +104,8 @@ class Worker(object): try: self.watchdog() except WorkerUnhealthyException as exc: - logger.error('The worker has encountered an error via watchdog and will not take new jobs: %s' % exc.message) + logger.error('The worker has encountered an error via watchdog and will not take new jobs') + logger.error(exc.message) self.mark_current_incomplete(restore_retry=True) self._stop.set() @@ -111,7 +113,7 @@ class Worker(object): logger.debug('Getting work item from queue.') with self._current_item_lock: - self.current_queue_item = self._queue.get() + self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds) while True: # Retrieve the current item in the queue over which to operate. We do so under @@ -129,12 +131,14 @@ class Worker(object): self.process_queue_item(job_details) self.mark_current_complete() - except JobException: + except JobException as jex: logger.warning('An error occurred processing request: %s', current_queue_item.body) + logger.warning('Job exception: %s' % jex) self.mark_current_incomplete(restore_retry=False) except WorkerUnhealthyException as exc: - logger.error('The worker has encountered an error via the job and will not take new jobs: %s' % exc.message) + logger.error('The worker has encountered an error via the job and will not take new jobs') + logger.error(exc.message) self.mark_current_incomplete(restore_retry=True) self._stop.set() @@ -190,7 +194,8 @@ class Worker(object): def mark_current_incomplete(self, restore_retry=False): with self._current_item_lock: if self.current_queue_item is not None: - self._queue.incomplete(self.current_queue_item, restore_retry=restore_retry) + self._queue.incomplete(self.current_queue_item, restore_retry=restore_retry, + retry_after=self._retry_after_seconds) self.current_queue_item = None def mark_current_complete(self):