diff --git a/data/database.py b/data/database.py index 2a824e1da..fea62286a 100644 --- a/data/database.py +++ b/data/database.py @@ -347,7 +347,7 @@ class RepositoryBuild(BaseModel): class QueueItem(BaseModel): queue_name = CharField(index=True, max_length=1024) body = TextField() - available_after = DateTimeField(default=datetime.now, index=True) + available_after = DateTimeField(default=datetime.utcnow, index=True) available = BooleanField(default=True, index=True) processing_expires = DateTimeField(null=True, index=True) retries_remaining = IntegerField(default=5) diff --git a/data/queue.py b/data/queue.py index 44d7ad531..79e645ebf 100644 --- a/data/queue.py +++ b/data/queue.py @@ -68,9 +68,8 @@ class WorkQueue(object): 'retries_remaining': retries_remaining, } - if available_after: - available_date = datetime.utcnow() + timedelta(seconds=available_after) - params['available_after'] = available_date + available_date = datetime.utcnow() + timedelta(seconds=available_after or 0) + params['available_after'] = available_date with self._transaction_factory(db): QueueItem.create(**params) diff --git a/endpoints/notificationmethod.py b/endpoints/notificationmethod.py index 9650e79f6..e6c259cb1 100644 --- a/endpoints/notificationmethod.py +++ b/endpoints/notificationmethod.py @@ -10,6 +10,7 @@ import re from flask.ext.mail import Message from app import mail, app, get_app_url from data import model +from workers.worker import JobException logger = logging.getLogger(__name__) @@ -19,6 +20,9 @@ class InvalidNotificationMethodException(Exception): class CannotValidateNotificationMethodException(Exception): pass +class NotificationMethodPerformException(JobException): + pass + class NotificationMethod(object): def __init__(self): @@ -105,19 +109,18 @@ class QuayNotificationMethod(NotificationMethod): repository = notification.repository if not repository: # Probably deleted. - return True + return # Lookup the target user or team to which we'll send the notification. config_data = json.loads(notification.config_json) status, err_message, target_users = self.find_targets(repository, config_data) if not status: - return False + raise NotificationMethodPerformException(err_message) # For each of the target users, create a notification. for target_user in set(target_users or []): model.create_notification(event_handler.event_name(), target_user, metadata=notification_data['event_data']) - return True class EmailMethod(NotificationMethod): @@ -141,7 +144,7 @@ class EmailMethod(NotificationMethod): config_data = json.loads(notification.config_json) email = config_data.get('email', '') if not email: - return False + return msg = Message(event_handler.get_summary(notification_data['event_data'], notification_data), sender='support@quay.io', @@ -153,9 +156,7 @@ class EmailMethod(NotificationMethod): mail.send(msg) except Exception as ex: logger.exception('Email was unable to be sent: %s' % ex.message) - return False - - return True + raise NotificationMethodPerformException(ex.message) class WebhookMethod(NotificationMethod): @@ -172,7 +173,7 @@ class WebhookMethod(NotificationMethod): config_data = json.loads(notification.config_json) url = config_data.get('url', '') if not url: - return False + return payload = notification_data['event_data'] headers = {'Content-type': 'application/json'} @@ -180,15 +181,14 @@ class WebhookMethod(NotificationMethod): try: resp = requests.post(url, data=json.dumps(payload), headers=headers) if resp.status_code/100 != 2: - logger.error('%s response for webhook to url: %s' % (resp.status_code, - url)) - return False + error_message = '%s response for webhook to url: %s' % (resp.status_code, url) + logger.error(error_message) + logger.error(resp.content) + raise NotificationMethodPerformException(error_message) except requests.exceptions.RequestException as ex: logger.exception('Webhook was unable to be sent: %s' % ex.message) - return False - - return True + raise NotificationMethodPerformException(ex.message) class FlowdockMethod(NotificationMethod): @@ -208,12 +208,12 @@ class FlowdockMethod(NotificationMethod): config_data = json.loads(notification.config_json) token = config_data.get('flow_api_token', '') if not token: - return False + return owner = model.get_user(notification.repository.namespace) if not owner: # Something went wrong. - return False + return url = 'https://api.flowdock.com/v1/messages/team_inbox/%s' % token headers = {'Content-type': 'application/json'} @@ -231,16 +231,14 @@ class FlowdockMethod(NotificationMethod): try: resp = requests.post(url, data=json.dumps(payload), headers=headers) if resp.status_code/100 != 2: - logger.error('%s response for flowdock to url: %s' % (resp.status_code, - url)) + error_message = '%s response for flowdock to url: %s' % (resp.status_code, url) + logger.error(error_message) logger.error(resp.content) - return False + raise NotificationMethodPerformException(error_message) except requests.exceptions.RequestException as ex: logger.exception('Flowdock method was unable to be sent: %s' % ex.message) - return False - - return True + raise NotificationMethodPerformException(ex.message) class HipchatMethod(NotificationMethod): @@ -265,12 +263,12 @@ class HipchatMethod(NotificationMethod): room_id = config_data.get('room_id', '') if not token or not room_id: - return False + return owner = model.get_user(notification.repository.namespace) if not owner: # Something went wrong. - return False + return url = 'https://api.hipchat.com/v2/room/%s/notification?auth_token=%s' % (room_id, token) @@ -293,16 +291,14 @@ class HipchatMethod(NotificationMethod): try: resp = requests.post(url, data=json.dumps(payload), headers=headers) if resp.status_code/100 != 2: - logger.error('%s response for hipchat to url: %s' % (resp.status_code, - url)) + error_message = '%s response for hipchat to url: %s' % (resp.status_code, url) + logger.error(error_message) logger.error(resp.content) - return False + raise NotificationMethodPerformException(error_message) except requests.exceptions.RequestException as ex: logger.exception('Hipchat method was unable to be sent: %s' % ex.message) - return False - - return True + raise NotificationMethodPerformException(ex.message) class SlackMethod(NotificationMethod): @@ -334,12 +330,12 @@ class SlackMethod(NotificationMethod): subdomain = config_data.get('subdomain', '') if not token or not subdomain: - return False + return owner = model.get_user(notification.repository.namespace) if not owner: # Something went wrong. - return False + return url = 'https://%s.slack.com/services/hooks/incoming-webhook?token=%s' % (subdomain, token) @@ -370,13 +366,11 @@ class SlackMethod(NotificationMethod): try: resp = requests.post(url, data=json.dumps(payload), headers=headers) if resp.status_code/100 != 2: - logger.error('%s response for Slack to url: %s' % (resp.status_code, - url)) + error_message = '%s response for Slack to url: %s' % (resp.status_code, url) + logger.error(error_message) logger.error(resp.content) - return False + raise NotificationMethodPerformException(error_message) except requests.exceptions.RequestException as ex: logger.exception('Slack method was unable to be sent: %s' % ex.message) - return False - - return True + raise NotificationMethodPerformException(ex.message) diff --git a/workers/diffsworker.py b/workers/diffsworker.py index 70f74f1db..563c61352 100644 --- a/workers/diffsworker.py +++ b/workers/diffsworker.py @@ -33,7 +33,8 @@ class DiffsWorker(Worker): return True -logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) +if __name__ == "__main__": + logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) -worker = DiffsWorker(image_diff_queue) -worker.start() + worker = DiffsWorker(image_diff_queue) + worker.start() diff --git a/workers/notificationworker.py b/workers/notificationworker.py index 2af0954f9..a176d46c8 100644 --- a/workers/notificationworker.py +++ b/workers/notificationworker.py @@ -8,6 +8,7 @@ from workers.worker import Worker from endpoints.notificationmethod import NotificationMethod, InvalidNotificationMethodException from endpoints.notificationevent import NotificationEvent, InvalidNotificationEventException +from workers.worker import JobException from data import model @@ -29,7 +30,7 @@ class NotificationWorker(Worker): notification = model.get_repo_notification(repo_namespace, repo_name, notification_uuid) if not notification: # Probably deleted. - return True + return event_name = notification.event.name method_name = notification.method.name @@ -39,15 +40,17 @@ class NotificationWorker(Worker): method_handler = NotificationMethod.get_method(method_name) except InvalidNotificationMethodException as ex: logger.exception('Cannot find notification method: %s' % ex.message) - return False + raise JobException('Cannot find notification method: %s' % ex.message) except InvalidNotificationEventException as ex: - logger.exception('Cannot find notification method: %s' % ex.message) - return False + logger.exception('Cannot find notification event: %s' % ex.message) + raise JobException('Cannot find notification event: %s' % ex.message) - return method_handler.perform(notification, event_handler, job_details) + method_handler.perform(notification, event_handler, job_details) -logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) -worker = NotificationWorker(notification_queue, poll_period_seconds=15, - reservation_seconds=3600) -worker.start() +if __name__ == "__main__": + logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) + + worker = NotificationWorker(notification_queue, poll_period_seconds=10, reservation_seconds=30, + retry_after_seconds=30) + worker.start() diff --git a/workers/worker.py b/workers/worker.py index 57d4a02d0..03066dc6e 100644 --- a/workers/worker.py +++ b/workers/worker.py @@ -63,11 +63,12 @@ class WorkerStatusHandler(BaseHTTPRequestHandler): class Worker(object): def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300, - watchdog_period_seconds=60): + watchdog_period_seconds=60, retry_after_seconds=300): self._sched = BackgroundScheduler() self._poll_period_seconds = poll_period_seconds self._reservation_seconds = reservation_seconds self._watchdog_period_seconds = watchdog_period_seconds + self._retry_after_seconds = retry_after_seconds self._stop = Event() self._terminated = Event() self._queue = queue @@ -103,7 +104,8 @@ class Worker(object): try: self.watchdog() except WorkerUnhealthyException as exc: - logger.error('The worker has encountered an error via watchdog and will not take new jobs: %s' % exc.message) + logger.error('The worker has encountered an error via watchdog and will not take new jobs') + logger.error(exc.message) self.mark_current_incomplete(restore_retry=True) self._stop.set() @@ -111,7 +113,7 @@ class Worker(object): logger.debug('Getting work item from queue.') with self._current_item_lock: - self.current_queue_item = self._queue.get() + self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds) while True: # Retrieve the current item in the queue over which to operate. We do so under @@ -129,12 +131,14 @@ class Worker(object): self.process_queue_item(job_details) self.mark_current_complete() - except JobException: + except JobException as jex: logger.warning('An error occurred processing request: %s', current_queue_item.body) + logger.warning('Job exception: %s' % jex) self.mark_current_incomplete(restore_retry=False) except WorkerUnhealthyException as exc: - logger.error('The worker has encountered an error via the job and will not take new jobs: %s' % exc.message) + logger.error('The worker has encountered an error via the job and will not take new jobs') + logger.error(exc.message) self.mark_current_incomplete(restore_retry=True) self._stop.set() @@ -190,7 +194,8 @@ class Worker(object): def mark_current_incomplete(self, restore_retry=False): with self._current_item_lock: if self.current_queue_item is not None: - self._queue.incomplete(self.current_queue_item, restore_retry=restore_retry) + self._queue.incomplete(self.current_queue_item, restore_retry=restore_retry, + retry_after=self._retry_after_seconds) self.current_queue_item = None def mark_current_complete(self):