From ac0cca2d9035da55f799a5c2fc591a081f975f81 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 28 Jul 2015 17:25:12 -0400 Subject: [PATCH] Switch to a unified worker system - Handles logging - Handles reporting to Sentry - Removes old code around serving a web endpoint (unused now) --- workers/buildlogsarchiver.py | 63 ++++----- workers/diffsworker.py | 13 +- workers/gcworker.py | 36 +++--- workers/notificationworker.py | 9 +- workers/queueworker.py | 144 +++++++++++++++++++++ workers/repositoryactioncounter.py | 69 +++++----- workers/worker.py | 198 +++++------------------------ 7 files changed, 264 insertions(+), 268 deletions(-) create mode 100644 workers/queueworker.py diff --git a/workers/buildlogsarchiver.py b/workers/buildlogsarchiver.py index ea5043c0b..f4882c453 100644 --- a/workers/buildlogsarchiver.py +++ b/workers/buildlogsarchiver.py @@ -1,6 +1,5 @@ import logging -from apscheduler.schedulers.blocking import BlockingScheduler from peewee import fn from tempfile import SpooledTemporaryFile from gzip import GzipFile @@ -10,47 +9,51 @@ from data.archivedlogs import JSON_MIMETYPE from data.database import RepositoryBuild, db_random_func from app import build_logs, log_archive from util.streamingjsonencoder import StreamingJSONEncoder +from workers.worker import Worker POLL_PERIOD_SECONDS = 30 MEMORY_TEMPFILE_SIZE = 64 * 1024 # Large enough to handle approximately 99% of builds in memory logger = logging.getLogger(__name__) -sched = BlockingScheduler() -@sched.scheduled_job(trigger='interval', seconds=30) -def archive_redis_buildlogs(): - """ Archive a single build, choosing a candidate at random. This process must be idempotent to - avoid needing two-phase commit. """ - try: - # Get a random build to archive - to_archive = model.build.archivable_buildlogs_query().order_by(db_random_func()).get() - logger.debug('Archiving: %s', to_archive.uuid) +class ArchiveBuildLogsWorker(Worker): + def __init__(self): + super(ArchiveBuildLogsWorker, self).__init__() + self.add_operation(self._archive_redis_buildlogs, POLL_PERIOD_SECONDS) - length, entries = build_logs.get_log_entries(to_archive.uuid, 0) - to_encode = { - 'start': 0, - 'total': length, - 'logs': entries, - } + def _archive_redis_buildlogs(self): + """ Archive a single build, choosing a candidate at random. This process must be idempotent to + avoid needing two-phase commit. """ + try: + # Get a random build to archive + to_archive = model.build.archivable_buildlogs_query().order_by(db_random_func()).get() + logger.debug('Archiving: %s', to_archive.uuid) - with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile: - with GzipFile('testarchive', fileobj=tempfile) as zipstream: - for chunk in StreamingJSONEncoder().iterencode(to_encode): - zipstream.write(chunk) + length, entries = build_logs.get_log_entries(to_archive.uuid, 0) + to_encode = { + 'start': 0, + 'total': length, + 'logs': entries, + } - tempfile.seek(0) - log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip', - file_id=to_archive.uuid) + with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile: + with GzipFile('testarchive', fileobj=tempfile) as zipstream: + for chunk in StreamingJSONEncoder().iterencode(to_encode): + zipstream.write(chunk) - to_archive.logs_archived = True - to_archive.save() + tempfile.seek(0) + log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip', + file_id=to_archive.uuid) - build_logs.expire_log_entries(to_archive.uuid) + to_archive.logs_archived = True + to_archive.save() - except RepositoryBuild.DoesNotExist: - logger.debug('No more builds to archive') + build_logs.expire_log_entries(to_archive.uuid) + + except RepositoryBuild.DoesNotExist: + logger.debug('No more builds to archive') if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) - sched.start() + worker = ArchiveBuildLogsWorker() + worker.start() diff --git a/workers/diffsworker.py b/workers/diffsworker.py index b8b1379cd..073bfbe9c 100644 --- a/workers/diffsworker.py +++ b/workers/diffsworker.py @@ -3,22 +3,17 @@ import logging from app import image_diff_queue from data import model from endpoints.v1.registry import process_image_changes -from workers.worker import Worker +from workers.queueworker import QueueWorker logger = logging.getLogger(__name__) -class DiffsWorker(Worker): +class DiffsWorker(QueueWorker): def process_queue_item(self, job_details): image_id = job_details['image_id'] repository = job_details['repository'] - - # TODO switch to the namespace_user_id branch only once exisiting jobs have all gone through - if 'namespace_user_id' in job_details: - namespace = model.get_namespace_by_user_id(job_details['namespace_user_id']) - else: - namespace = job_details['namespace'] + namespace = model.user.get_namespace_by_user_id(job_details['namespace_user_id']) try: process_image_changes(namespace, repository, image_id) @@ -32,7 +27,5 @@ class DiffsWorker(Worker): return True if __name__ == "__main__": - logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) - worker = DiffsWorker(image_diff_queue) worker.start() diff --git a/workers/gcworker.py b/workers/gcworker.py index d60dd5719..acbcd0a6c 100644 --- a/workers/gcworker.py +++ b/workers/gcworker.py @@ -1,30 +1,30 @@ import logging -from apscheduler.schedulers.blocking import BlockingScheduler - from app import app from data.database import UseThenDisconnect from data.model.repository import find_repository_with_garbage, garbage_collect_repo +from workers.worker import Worker logger = logging.getLogger(__name__) -sched = BlockingScheduler() -@sched.scheduled_job(trigger='interval', seconds=10) -def garbage_collect_repositories(): - """ Performs garbage collection on repositories. """ +class GarbageCollectionWorker(Worker): + def __init__(self): + super(GarbageCollectionWorker, self).__init__() + self.add_operation(self._garbage_collection_repos, 10) - with UseThenDisconnect(app.config): - repository = find_repository_with_garbage(app.config.get('EXP_ASYNC_GARBAGE_COLLECTION', [])) - if repository is None: - logger.debug('No repository with garbage found') - return False - - logger.debug('Starting GC of repository #%s (%s)', repository.id, repository.name) - garbage_collect_repo(repository) - logger.debug('Finished GC of repository #%s (%s)', repository.id, repository.name) - return True + def _garbage_collection_repos(self): + """ Performs garbage collection on repositories. """ + with UseThenDisconnect(app.config): + repository = find_repository_with_garbage(app.config.get('EXP_ASYNC_GARBAGE_COLLECTION', [])) + if repository is None: + logger.debug('No repository with garbage found') + return + logger.debug('Starting GC of repository #%s (%s)', repository.id, repository.name) + garbage_collect_repo(repository) + logger.debug('Finished GC of repository #%s (%s)', repository.id, repository.name) + return if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) - sched.start() + worker = GarbageCollectionWorker() + worker.start() diff --git a/workers/notificationworker.py b/workers/notificationworker.py index 7371f51dd..7ffe69af0 100644 --- a/workers/notificationworker.py +++ b/workers/notificationworker.py @@ -1,19 +1,16 @@ import logging from app import notification_queue -from workers.worker import Worker from endpoints.notificationmethod import NotificationMethod, InvalidNotificationMethodException from endpoints.notificationevent import NotificationEvent, InvalidNotificationEventException -from workers.worker import JobException +from workers.queueworker import QueueWorker, JobException from data import model - logger = logging.getLogger(__name__) - -class NotificationWorker(Worker): +class NotificationWorker(QueueWorker): def process_queue_item(self, job_details): notification_uuid = job_details['notification_uuid'] @@ -39,8 +36,6 @@ class NotificationWorker(Worker): if __name__ == "__main__": - logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) - worker = NotificationWorker(notification_queue, poll_period_seconds=10, reservation_seconds=30, retry_after_seconds=30) worker.start() diff --git a/workers/queueworker.py b/workers/queueworker.py new file mode 100644 index 000000000..08ba1699d --- /dev/null +++ b/workers/queueworker.py @@ -0,0 +1,144 @@ +import logging +import json +import signal +import sys + +from threading import Event, Lock +from datetime import datetime, timedelta +from threading import Thread +from time import sleep + +from data.model import db +from data.queue import WorkQueue + +from workers.worker import Worker + +logger = logging.getLogger(__name__) + +class JobException(Exception): + """ A job exception is an exception that is caused by something being malformed in the job. When + a worker raises this exception the job will be terminated and the retry will not be returned + to the queue. """ + pass + + +class WorkerUnhealthyException(Exception): + """ When this exception is raised, the worker is no longer healthy and will not accept any more + work. When this is raised while processing a queue item, the item should be returned to the + queue along with another retry. """ + pass + +class QueueWorker(Worker): + def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300, + watchdog_period_seconds=60, retry_after_seconds=300): + super(QueueWorker, self).__init__() + + self._poll_period_seconds = poll_period_seconds + self._reservation_seconds = reservation_seconds + self._watchdog_period_seconds = watchdog_period_seconds + self._retry_after_seconds = retry_after_seconds + self._stop = Event() + self._terminated = Event() + self._queue = queue + self._current_item_lock = Lock() + self.current_queue_item = None + + # Add the various operations. + self.add_operation(self.poll_queue, self._poll_period_seconds) + self.add_operation(self.update_queue_metrics, 60) + self.add_operation(self.run_watchdog, self._watchdog_period_seconds) + + def process_queue_item(self, job_details): + """ Return True if complete, False if it should be retried. """ + raise NotImplementedError('Workers must implement run.') + + def watchdog(self): + """ Function that gets run once every watchdog_period_seconds. """ + pass + + def _close_db_handle(self): + if not db.is_closed(): + logger.debug('Disconnecting from database.') + db.close() + + def extend_processing(self, seconds_from_now): + with self._current_item_lock: + if self.current_queue_item is not None: + self._queue.extend_processing(self.current_queue_item, seconds_from_now) + + def run_watchdog(self): + logger.debug('Running watchdog.') + try: + self.watchdog() + except WorkerUnhealthyException as exc: + logger.error('The worker has encountered an error via watchdog and will not take new jobs') + logger.error(exc.message) + self.mark_current_incomplete(restore_retry=True) + self._stop.set() + + def poll_queue(self): + logger.debug('Getting work item from queue.') + + with self._current_item_lock: + self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds) + + while True: + # Retrieve the current item in the queue over which to operate. We do so under + # a lock to make sure we are always retrieving an item when in a healthy state. + current_queue_item = None + with self._current_item_lock: + current_queue_item = self.current_queue_item + if current_queue_item is None: + # Close the db handle. + self._close_db_handle() + break + + logger.debug('Queue gave us some work: %s', current_queue_item.body) + job_details = json.loads(current_queue_item.body) + + try: + self.process_queue_item(job_details) + self.mark_current_complete() + + except JobException as jex: + logger.warning('An error occurred processing request: %s', current_queue_item.body) + logger.warning('Job exception: %s' % jex) + self.mark_current_incomplete(restore_retry=False) + + except WorkerUnhealthyException as exc: + logger.error('The worker has encountered an error via the job and will not take new jobs') + logger.error(exc.message) + self.mark_current_incomplete(restore_retry=True) + self._stop.set() + + finally: + # Close the db handle. + self._close_db_handle() + + if not self._stop.is_set(): + with self._current_item_lock: + self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds) + + if not self._stop.is_set(): + logger.debug('No more work.') + + def update_queue_metrics(self): + self._queue.update_metrics() + + def mark_current_incomplete(self, restore_retry=False): + with self._current_item_lock: + if self.current_queue_item is not None: + self._queue.incomplete(self.current_queue_item, restore_retry=restore_retry, + retry_after=self._retry_after_seconds) + self.current_queue_item = None + + def mark_current_complete(self): + with self._current_item_lock: + if self.current_queue_item is not None: + self._queue.complete(self.current_queue_item) + self.current_queue_item = None + + def ungracefully_terminated(self): + # Give back the retry that we took for this queue item so that if it were down to zero + # retries it will still be picked up by another worker + self.mark_current_incomplete() diff --git a/workers/repositoryactioncounter.py b/workers/repositoryactioncounter.py index eed5f3420..ec7ed105a 100644 --- a/workers/repositoryactioncounter.py +++ b/workers/repositoryactioncounter.py @@ -1,54 +1,51 @@ import logging -from apscheduler.schedulers.blocking import BlockingScheduler - from app import app from data.database import (Repository, LogEntry, RepositoryActionCount, db_random_func, fn, UseThenDisconnect) from datetime import date, datetime, timedelta +from workers.worker import Worker -POLL_PERIOD_SECONDS = 30 +POLL_PERIOD_SECONDS = 10 logger = logging.getLogger(__name__) -sched = BlockingScheduler() -@sched.scheduled_job(trigger='interval', seconds=10) -def count_repository_actions(): - """ Counts actions for a random repository for the previous day. """ +class RepositoryActionCountWorker(Worker): + def __init__(self): + super(RepositoryActionCountWorker, self).__init__() + self.add_operation(self._count_repository_actions, POLL_PERIOD_SECONDS) - with UseThenDisconnect(app.config): - try: - # Get a random repository to count. - today = date.today() - yesterday = today - timedelta(days=1) - has_yesterday_actions = (RepositoryActionCount.select(RepositoryActionCount.repository) - .where(RepositoryActionCount.date == yesterday)) + def _count_repository_actions(self): + """ Counts actions for a random repository for the previous day. """ - to_count = (Repository.select() - .where(~(Repository.id << (has_yesterday_actions))) - .order_by(db_random_func()).get()) - - logger.debug('Counting: %s', to_count.id) - - actions = (LogEntry.select() - .where(LogEntry.repository == to_count, - LogEntry.datetime >= yesterday, - LogEntry.datetime < today) - .count()) - - # Create the row. + with UseThenDisconnect(app.config): try: - RepositoryActionCount.create(repository=to_count, date=yesterday, count=actions) - except: - logger.exception('Exception when writing count') + # Get a random repository to count. + today = date.today() + yesterday = today - timedelta(days=1) + has_yesterday_actions = (RepositoryActionCount.select(RepositoryActionCount.repository) + .where(RepositoryActionCount.date == yesterday)) - return True + to_count = (Repository.select() + .where(~(Repository.id << (has_yesterday_actions))) + .order_by(db_random_func()).get()) - except Repository.DoesNotExist: - logger.debug('No further repositories to count') - return False + logger.debug('Counting: %s', to_count.id) + actions = (LogEntry.select() + .where(LogEntry.repository == to_count, + LogEntry.datetime >= yesterday, + LogEntry.datetime < today) + .count()) + + # Create the row. + try: + RepositoryActionCount.create(repository=to_count, date=yesterday, count=actions) + except: + logger.exception('Exception when writing count') + except Repository.DoesNotExist: + logger.debug('No further repositories to count') if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) - sched.start() + worker = RepositoryActionCountWorker() + worker.start() diff --git a/workers/worker.py b/workers/worker.py index 66ab38ba4..71b46ade8 100644 --- a/workers/worker.py +++ b/workers/worker.py @@ -1,93 +1,33 @@ import logging -import json import signal import sys +import socket -from threading import Event, Lock +from threading import Event from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime, timedelta -from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler from threading import Thread from time import sleep +from raven import Client +from app import app from data.model import db -from data.queue import WorkQueue +from functools import wraps logger = logging.getLogger(__name__) -class JobException(Exception): - """ A job exception is an exception that is caused by something being malformed in the job. When - a worker raises this exception the job will be terminated and the retry will not be returned - to the queue. """ - pass - - -class WorkerUnhealthyException(Exception): - """ When this exception is raised, the worker is no longer healthy and will not accept any more - work. When this is raised while processing a queue item, the item should be returned to the - queue along with another retry. """ - pass - - -class WorkerStatusServer(HTTPServer): - def __init__(self, worker, *args, **kwargs): - HTTPServer.__init__(self, *args, **kwargs) - self.worker = worker - - -class WorkerStatusHandler(BaseHTTPRequestHandler): - def do_GET(self): - if self.path == '/status': - # Return the worker status - code = 200 if self.server.worker.is_healthy() else 503 - self.send_response(code) - self.send_header('Content-Type', 'text/plain') - self.end_headers() - self.wfile.write('OK') - elif self.path == '/terminate': - # Return whether it is safe to terminate the worker process - code = 200 if self.server.worker.is_terminated() else 503 - self.send_response(code) - else: - self.send_error(404) - - def do_POST(self): - if self.path == '/terminate': - try: - self.server.worker.join() - self.send_response(200) - except: - self.send_response(500) - else: - self.send_error(404) - class Worker(object): - def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300, - watchdog_period_seconds=60, retry_after_seconds=300): + """ Base class for workers which perform some work periodically. """ + def __init__(self): self._sched = BackgroundScheduler() - self._poll_period_seconds = poll_period_seconds - self._reservation_seconds = reservation_seconds - self._watchdog_period_seconds = watchdog_period_seconds - self._retry_after_seconds = retry_after_seconds + self._operations = [] self._stop = Event() self._terminated = Event() - self._queue = queue - self._current_item_lock = Lock() - self.current_queue_item = None - def process_queue_item(self, job_details): - """ Return True if complete, False if it should be retried. """ - raise NotImplementedError('Workers must implement run.') - - def watchdog(self): - """ Function that gets run once every watchdog_period_seconds. """ - pass - - def _close_db_handle(self): - if not db.is_closed(): - logger.debug('Disconnecting from database.') - db.close() + if app.config.get('EXCEPTION_LOG_TYPE', 'FakeSentry') == 'Sentry': + worker_name = '%s:worker-%s' % (socket.gethostname(), self.__class__.__name__) + self._raven_client = Client(app.config.get('SENTRY_DSN', ''), name=worker_name) def is_healthy(self): return not self._stop.is_set() @@ -95,90 +35,33 @@ class Worker(object): def is_terminated(self): return self._terminated.is_set() - def extend_processing(self, seconds_from_now): - with self._current_item_lock: - if self.current_queue_item is not None: - self._queue.extend_processing(self.current_queue_item, seconds_from_now) - - def run_watchdog(self): - logger.debug('Running watchdog.') - try: - self.watchdog() - except WorkerUnhealthyException as exc: - logger.error('The worker has encountered an error via watchdog and will not take new jobs') - logger.error(exc.message) - self.mark_current_incomplete(restore_retry=True) - self._stop.set() - - def poll_queue(self): - logger.debug('Getting work item from queue.') - - with self._current_item_lock: - self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds) - - while True: - # Retrieve the current item in the queue over which to operate. We do so under - # a lock to make sure we are always retrieving an item when in a healthy state. - current_queue_item = None - with self._current_item_lock: - current_queue_item = self.current_queue_item - if current_queue_item is None: - # Close the db handle. - self._close_db_handle() - break - - logger.debug('Queue gave us some work: %s', current_queue_item.body) - job_details = json.loads(current_queue_item.body) + def ungracefully_terminated(self): + """ Method called when the worker has been terminated in an ungraceful fashion. """ + pass + def add_operation(self, operation_func, operation_sec): + @wraps(operation_func) + def _operation_func(): try: - self.process_queue_item(job_details) - self.mark_current_complete() + return operation_func() + except Exception: + logger.exception('Operation raised exception') + if self._raven_client: + logger.debug('Logging exception to Sentry') + self._raven_client.captureException() - except JobException as jex: - logger.warning('An error occurred processing request: %s', current_queue_item.body) - logger.warning('Job exception: %s' % jex) - self.mark_current_incomplete(restore_retry=False) + self._operations.append((_operation_func, operation_sec)) - except WorkerUnhealthyException as exc: - logger.error('The worker has encountered an error via the job and will not take new jobs') - logger.error(exc.message) - self.mark_current_incomplete(restore_retry=True) - self._stop.set() - - finally: - # Close the db handle. - self._close_db_handle() - - if not self._stop.is_set(): - with self._current_item_lock: - self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds) - - if not self._stop.is_set(): - logger.debug('No more work.') - - def update_queue_metrics(self): - self._queue.update_metrics() - - def start(self, start_status_server_port=None): - if start_status_server_port is not None: - # Start a status server on a thread - server_address = ('', start_status_server_port) - httpd = WorkerStatusServer(self, server_address, WorkerStatusHandler) - server_thread = Thread(target=httpd.serve_forever) - server_thread.daemon = True - server_thread.start() - - logger.debug("Scheduling worker.") + def start(self): + logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False) + logger.debug('Scheduling worker.') soon = datetime.now() + timedelta(seconds=.001) self._sched.start() - self._sched.add_job(self.poll_queue, 'interval', seconds=self._poll_period_seconds, - start_date=soon, max_instances=1) - self._sched.add_job(self.update_queue_metrics, 'interval', seconds=60, start_date=soon, - max_instances=1) - self._sched.add_job(self.run_watchdog, 'interval', seconds=self._watchdog_period_seconds, - max_instances=1) + for operation_func, operation_sec in self._operations: + self._sched.add_job(operation_func, 'interval', seconds=operation_sec, + start_date=soon, max_instances=1) signal.signal(signal.SIGTERM, self.terminate) signal.signal(signal.SIGINT, self.terminate) @@ -192,23 +75,6 @@ class Worker(object): self._terminated.set() - # Wait forever if we're running a server - while start_status_server_port is not None: - sleep(60) - - def mark_current_incomplete(self, restore_retry=False): - with self._current_item_lock: - if self.current_queue_item is not None: - self._queue.incomplete(self.current_queue_item, restore_retry=restore_retry, - retry_after=self._retry_after_seconds) - self.current_queue_item = None - - def mark_current_complete(self): - with self._current_item_lock: - if self.current_queue_item is not None: - self._queue.complete(self.current_queue_item) - self.current_queue_item = None - def terminate(self, signal_num=None, stack_frame=None, graceful=False): if self._terminated.is_set(): sys.exit(1) @@ -218,9 +84,7 @@ class Worker(object): self._stop.set() if not graceful: - # Give back the retry that we took for this queue item so that if it were down to zero - # retries it will still be picked up by another worker - self.mark_current_incomplete() + self.ungracefully_terminated() def join(self): self.terminate(graceful=True)