Update the worker code to better handle exceptions, fix the utcdate issue and make sure we send the proper retry. Also updates notification workers to send JobExceptions rather than returning true or false

This commit is contained in:
Joseph Schorr 2014-09-22 12:52:57 -04:00 committed by Jake Moshenko
parent 8dd2330ce7
commit ba0963a81c
6 changed files with 62 additions and 60 deletions

View file

@ -347,7 +347,7 @@ class RepositoryBuild(BaseModel):
class QueueItem(BaseModel):
queue_name = CharField(index=True, max_length=1024)
body = TextField()
available_after = DateTimeField(default=datetime.now, index=True)
available_after = DateTimeField(default=datetime.utcnow, index=True)
available = BooleanField(default=True, index=True)
processing_expires = DateTimeField(null=True, index=True)
retries_remaining = IntegerField(default=5)

View file

@ -68,9 +68,8 @@ class WorkQueue(object):
'retries_remaining': retries_remaining,
}
if available_after:
available_date = datetime.utcnow() + timedelta(seconds=available_after)
params['available_after'] = available_date
available_date = datetime.utcnow() + timedelta(seconds=available_after or 0)
params['available_after'] = available_date
with self._transaction_factory(db):
QueueItem.create(**params)

View file

@ -10,6 +10,7 @@ import re
from flask.ext.mail import Message
from app import mail, app, get_app_url
from data import model
from workers.worker import JobException
logger = logging.getLogger(__name__)
@ -19,6 +20,9 @@ class InvalidNotificationMethodException(Exception):
class CannotValidateNotificationMethodException(Exception):
pass
class NotificationMethodPerformException(JobException):
pass
class NotificationMethod(object):
def __init__(self):
@ -105,19 +109,18 @@ class QuayNotificationMethod(NotificationMethod):
repository = notification.repository
if not repository:
# Probably deleted.
return True
return
# Lookup the target user or team to which we'll send the notification.
config_data = json.loads(notification.config_json)
status, err_message, target_users = self.find_targets(repository, config_data)
if not status:
return False
raise NotificationMethodPerformException(err_message)
# For each of the target users, create a notification.
for target_user in set(target_users or []):
model.create_notification(event_handler.event_name(), target_user,
metadata=notification_data['event_data'])
return True
class EmailMethod(NotificationMethod):
@ -141,7 +144,7 @@ class EmailMethod(NotificationMethod):
config_data = json.loads(notification.config_json)
email = config_data.get('email', '')
if not email:
return False
return
msg = Message(event_handler.get_summary(notification_data['event_data'], notification_data),
sender='support@quay.io',
@ -153,9 +156,7 @@ class EmailMethod(NotificationMethod):
mail.send(msg)
except Exception as ex:
logger.exception('Email was unable to be sent: %s' % ex.message)
return False
return True
raise NotificationMethodPerformException(ex.message)
class WebhookMethod(NotificationMethod):
@ -172,7 +173,7 @@ class WebhookMethod(NotificationMethod):
config_data = json.loads(notification.config_json)
url = config_data.get('url', '')
if not url:
return False
return
payload = notification_data['event_data']
headers = {'Content-type': 'application/json'}
@ -180,15 +181,14 @@ class WebhookMethod(NotificationMethod):
try:
resp = requests.post(url, data=json.dumps(payload), headers=headers)
if resp.status_code/100 != 2:
logger.error('%s response for webhook to url: %s' % (resp.status_code,
url))
return False
error_message = '%s response for webhook to url: %s' % (resp.status_code, url)
logger.error(error_message)
logger.error(resp.content)
raise NotificationMethodPerformException(error_message)
except requests.exceptions.RequestException as ex:
logger.exception('Webhook was unable to be sent: %s' % ex.message)
return False
return True
raise NotificationMethodPerformException(ex.message)
class FlowdockMethod(NotificationMethod):
@ -208,12 +208,12 @@ class FlowdockMethod(NotificationMethod):
config_data = json.loads(notification.config_json)
token = config_data.get('flow_api_token', '')
if not token:
return False
return
owner = model.get_user(notification.repository.namespace)
if not owner:
# Something went wrong.
return False
return
url = 'https://api.flowdock.com/v1/messages/team_inbox/%s' % token
headers = {'Content-type': 'application/json'}
@ -231,16 +231,14 @@ class FlowdockMethod(NotificationMethod):
try:
resp = requests.post(url, data=json.dumps(payload), headers=headers)
if resp.status_code/100 != 2:
logger.error('%s response for flowdock to url: %s' % (resp.status_code,
url))
error_message = '%s response for flowdock to url: %s' % (resp.status_code, url)
logger.error(error_message)
logger.error(resp.content)
return False
raise NotificationMethodPerformException(error_message)
except requests.exceptions.RequestException as ex:
logger.exception('Flowdock method was unable to be sent: %s' % ex.message)
return False
return True
raise NotificationMethodPerformException(ex.message)
class HipchatMethod(NotificationMethod):
@ -265,12 +263,12 @@ class HipchatMethod(NotificationMethod):
room_id = config_data.get('room_id', '')
if not token or not room_id:
return False
return
owner = model.get_user(notification.repository.namespace)
if not owner:
# Something went wrong.
return False
return
url = 'https://api.hipchat.com/v2/room/%s/notification?auth_token=%s' % (room_id, token)
@ -293,16 +291,14 @@ class HipchatMethod(NotificationMethod):
try:
resp = requests.post(url, data=json.dumps(payload), headers=headers)
if resp.status_code/100 != 2:
logger.error('%s response for hipchat to url: %s' % (resp.status_code,
url))
error_message = '%s response for hipchat to url: %s' % (resp.status_code, url)
logger.error(error_message)
logger.error(resp.content)
return False
raise NotificationMethodPerformException(error_message)
except requests.exceptions.RequestException as ex:
logger.exception('Hipchat method was unable to be sent: %s' % ex.message)
return False
return True
raise NotificationMethodPerformException(ex.message)
class SlackMethod(NotificationMethod):
@ -334,12 +330,12 @@ class SlackMethod(NotificationMethod):
subdomain = config_data.get('subdomain', '')
if not token or not subdomain:
return False
return
owner = model.get_user(notification.repository.namespace)
if not owner:
# Something went wrong.
return False
return
url = 'https://%s.slack.com/services/hooks/incoming-webhook?token=%s' % (subdomain, token)
@ -370,13 +366,11 @@ class SlackMethod(NotificationMethod):
try:
resp = requests.post(url, data=json.dumps(payload), headers=headers)
if resp.status_code/100 != 2:
logger.error('%s response for Slack to url: %s' % (resp.status_code,
url))
error_message = '%s response for Slack to url: %s' % (resp.status_code, url)
logger.error(error_message)
logger.error(resp.content)
return False
raise NotificationMethodPerformException(error_message)
except requests.exceptions.RequestException as ex:
logger.exception('Slack method was unable to be sent: %s' % ex.message)
return False
return True
raise NotificationMethodPerformException(ex.message)

View file

@ -33,7 +33,8 @@ class DiffsWorker(Worker):
return True
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
if __name__ == "__main__":
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
worker = DiffsWorker(image_diff_queue)
worker.start()
worker = DiffsWorker(image_diff_queue)
worker.start()

View file

@ -8,6 +8,7 @@ from workers.worker import Worker
from endpoints.notificationmethod import NotificationMethod, InvalidNotificationMethodException
from endpoints.notificationevent import NotificationEvent, InvalidNotificationEventException
from workers.worker import JobException
from data import model
@ -29,7 +30,7 @@ class NotificationWorker(Worker):
notification = model.get_repo_notification(repo_namespace, repo_name, notification_uuid)
if not notification:
# Probably deleted.
return True
return
event_name = notification.event.name
method_name = notification.method.name
@ -39,15 +40,17 @@ class NotificationWorker(Worker):
method_handler = NotificationMethod.get_method(method_name)
except InvalidNotificationMethodException as ex:
logger.exception('Cannot find notification method: %s' % ex.message)
return False
raise JobException('Cannot find notification method: %s' % ex.message)
except InvalidNotificationEventException as ex:
logger.exception('Cannot find notification method: %s' % ex.message)
return False
logger.exception('Cannot find notification event: %s' % ex.message)
raise JobException('Cannot find notification event: %s' % ex.message)
return method_handler.perform(notification, event_handler, job_details)
method_handler.perform(notification, event_handler, job_details)
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
worker = NotificationWorker(notification_queue, poll_period_seconds=15,
reservation_seconds=3600)
worker.start()
if __name__ == "__main__":
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
worker = NotificationWorker(notification_queue, poll_period_seconds=10, reservation_seconds=30,
retry_after_seconds=30)
worker.start()

View file

@ -63,11 +63,12 @@ class WorkerStatusHandler(BaseHTTPRequestHandler):
class Worker(object):
def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300,
watchdog_period_seconds=60):
watchdog_period_seconds=60, retry_after_seconds=300):
self._sched = BackgroundScheduler()
self._poll_period_seconds = poll_period_seconds
self._reservation_seconds = reservation_seconds
self._watchdog_period_seconds = watchdog_period_seconds
self._retry_after_seconds = retry_after_seconds
self._stop = Event()
self._terminated = Event()
self._queue = queue
@ -103,7 +104,8 @@ class Worker(object):
try:
self.watchdog()
except WorkerUnhealthyException as exc:
logger.error('The worker has encountered an error via watchdog and will not take new jobs: %s' % exc.message)
logger.error('The worker has encountered an error via watchdog and will not take new jobs')
logger.error(exc.message)
self.mark_current_incomplete(restore_retry=True)
self._stop.set()
@ -111,7 +113,7 @@ class Worker(object):
logger.debug('Getting work item from queue.')
with self._current_item_lock:
self.current_queue_item = self._queue.get()
self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds)
while True:
# Retrieve the current item in the queue over which to operate. We do so under
@ -129,12 +131,14 @@ class Worker(object):
self.process_queue_item(job_details)
self.mark_current_complete()
except JobException:
except JobException as jex:
logger.warning('An error occurred processing request: %s', current_queue_item.body)
logger.warning('Job exception: %s' % jex)
self.mark_current_incomplete(restore_retry=False)
except WorkerUnhealthyException as exc:
logger.error('The worker has encountered an error via the job and will not take new jobs: %s' % exc.message)
logger.error('The worker has encountered an error via the job and will not take new jobs')
logger.error(exc.message)
self.mark_current_incomplete(restore_retry=True)
self._stop.set()
@ -190,7 +194,8 @@ class Worker(object):
def mark_current_incomplete(self, restore_retry=False):
with self._current_item_lock:
if self.current_queue_item is not None:
self._queue.incomplete(self.current_queue_item, restore_retry=restore_retry)
self._queue.incomplete(self.current_queue_item, restore_retry=restore_retry,
retry_after=self._retry_after_seconds)
self.current_queue_item = None
def mark_current_complete(self):