Switch to a unified worker system

- Handles logging
- Handles reporting to Sentry
- Removes old code around serving a web endpoint (unused now)
This commit is contained in:
Joseph Schorr 2015-07-28 17:25:12 -04:00
parent dbd9a32c85
commit ac0cca2d90
7 changed files with 264 additions and 268 deletions

View file

@ -1,6 +1,5 @@
import logging
from apscheduler.schedulers.blocking import BlockingScheduler
from peewee import fn
from tempfile import SpooledTemporaryFile
from gzip import GzipFile
@ -10,47 +9,51 @@ from data.archivedlogs import JSON_MIMETYPE
from data.database import RepositoryBuild, db_random_func
from app import build_logs, log_archive
from util.streamingjsonencoder import StreamingJSONEncoder
from workers.worker import Worker
POLL_PERIOD_SECONDS = 30
MEMORY_TEMPFILE_SIZE = 64 * 1024 # Large enough to handle approximately 99% of builds in memory
logger = logging.getLogger(__name__)
sched = BlockingScheduler()
@sched.scheduled_job(trigger='interval', seconds=30)
def archive_redis_buildlogs():
""" Archive a single build, choosing a candidate at random. This process must be idempotent to
avoid needing two-phase commit. """
try:
# Get a random build to archive
to_archive = model.build.archivable_buildlogs_query().order_by(db_random_func()).get()
logger.debug('Archiving: %s', to_archive.uuid)
class ArchiveBuildLogsWorker(Worker):
def __init__(self):
super(ArchiveBuildLogsWorker, self).__init__()
self.add_operation(self._archive_redis_buildlogs, POLL_PERIOD_SECONDS)
length, entries = build_logs.get_log_entries(to_archive.uuid, 0)
to_encode = {
'start': 0,
'total': length,
'logs': entries,
}
def _archive_redis_buildlogs(self):
""" Archive a single build, choosing a candidate at random. This process must be idempotent to
avoid needing two-phase commit. """
try:
# Get a random build to archive
to_archive = model.build.archivable_buildlogs_query().order_by(db_random_func()).get()
logger.debug('Archiving: %s', to_archive.uuid)
with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile:
with GzipFile('testarchive', fileobj=tempfile) as zipstream:
for chunk in StreamingJSONEncoder().iterencode(to_encode):
zipstream.write(chunk)
length, entries = build_logs.get_log_entries(to_archive.uuid, 0)
to_encode = {
'start': 0,
'total': length,
'logs': entries,
}
tempfile.seek(0)
log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip',
file_id=to_archive.uuid)
with SpooledTemporaryFile(MEMORY_TEMPFILE_SIZE) as tempfile:
with GzipFile('testarchive', fileobj=tempfile) as zipstream:
for chunk in StreamingJSONEncoder().iterencode(to_encode):
zipstream.write(chunk)
to_archive.logs_archived = True
to_archive.save()
tempfile.seek(0)
log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding='gzip',
file_id=to_archive.uuid)
build_logs.expire_log_entries(to_archive.uuid)
to_archive.logs_archived = True
to_archive.save()
except RepositoryBuild.DoesNotExist:
logger.debug('No more builds to archive')
build_logs.expire_log_entries(to_archive.uuid)
except RepositoryBuild.DoesNotExist:
logger.debug('No more builds to archive')
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
sched.start()
worker = ArchiveBuildLogsWorker()
worker.start()

View file

@ -3,22 +3,17 @@ import logging
from app import image_diff_queue
from data import model
from endpoints.v1.registry import process_image_changes
from workers.worker import Worker
from workers.queueworker import QueueWorker
logger = logging.getLogger(__name__)
class DiffsWorker(Worker):
class DiffsWorker(QueueWorker):
def process_queue_item(self, job_details):
image_id = job_details['image_id']
repository = job_details['repository']
# TODO switch to the namespace_user_id branch only once exisiting jobs have all gone through
if 'namespace_user_id' in job_details:
namespace = model.get_namespace_by_user_id(job_details['namespace_user_id'])
else:
namespace = job_details['namespace']
namespace = model.user.get_namespace_by_user_id(job_details['namespace_user_id'])
try:
process_image_changes(namespace, repository, image_id)
@ -32,7 +27,5 @@ class DiffsWorker(Worker):
return True
if __name__ == "__main__":
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
worker = DiffsWorker(image_diff_queue)
worker.start()

View file

@ -1,30 +1,30 @@
import logging
from apscheduler.schedulers.blocking import BlockingScheduler
from app import app
from data.database import UseThenDisconnect
from data.model.repository import find_repository_with_garbage, garbage_collect_repo
from workers.worker import Worker
logger = logging.getLogger(__name__)
sched = BlockingScheduler()
@sched.scheduled_job(trigger='interval', seconds=10)
def garbage_collect_repositories():
""" Performs garbage collection on repositories. """
class GarbageCollectionWorker(Worker):
def __init__(self):
super(GarbageCollectionWorker, self).__init__()
self.add_operation(self._garbage_collection_repos, 10)
with UseThenDisconnect(app.config):
repository = find_repository_with_garbage(app.config.get('EXP_ASYNC_GARBAGE_COLLECTION', []))
if repository is None:
logger.debug('No repository with garbage found')
return False
logger.debug('Starting GC of repository #%s (%s)', repository.id, repository.name)
garbage_collect_repo(repository)
logger.debug('Finished GC of repository #%s (%s)', repository.id, repository.name)
return True
def _garbage_collection_repos(self):
""" Performs garbage collection on repositories. """
with UseThenDisconnect(app.config):
repository = find_repository_with_garbage(app.config.get('EXP_ASYNC_GARBAGE_COLLECTION', []))
if repository is None:
logger.debug('No repository with garbage found')
return
logger.debug('Starting GC of repository #%s (%s)', repository.id, repository.name)
garbage_collect_repo(repository)
logger.debug('Finished GC of repository #%s (%s)', repository.id, repository.name)
return
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
sched.start()
worker = GarbageCollectionWorker()
worker.start()

View file

@ -1,19 +1,16 @@
import logging
from app import notification_queue
from workers.worker import Worker
from endpoints.notificationmethod import NotificationMethod, InvalidNotificationMethodException
from endpoints.notificationevent import NotificationEvent, InvalidNotificationEventException
from workers.worker import JobException
from workers.queueworker import QueueWorker, JobException
from data import model
logger = logging.getLogger(__name__)
class NotificationWorker(Worker):
class NotificationWorker(QueueWorker):
def process_queue_item(self, job_details):
notification_uuid = job_details['notification_uuid']
@ -39,8 +36,6 @@ class NotificationWorker(Worker):
if __name__ == "__main__":
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
worker = NotificationWorker(notification_queue, poll_period_seconds=10, reservation_seconds=30,
retry_after_seconds=30)
worker.start()

144
workers/queueworker.py Normal file
View file

@ -0,0 +1,144 @@
import logging
import json
import signal
import sys
from threading import Event, Lock
from datetime import datetime, timedelta
from threading import Thread
from time import sleep
from data.model import db
from data.queue import WorkQueue
from workers.worker import Worker
logger = logging.getLogger(__name__)
class JobException(Exception):
""" A job exception is an exception that is caused by something being malformed in the job. When
a worker raises this exception the job will be terminated and the retry will not be returned
to the queue. """
pass
class WorkerUnhealthyException(Exception):
""" When this exception is raised, the worker is no longer healthy and will not accept any more
work. When this is raised while processing a queue item, the item should be returned to the
queue along with another retry. """
pass
class QueueWorker(Worker):
def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300,
watchdog_period_seconds=60, retry_after_seconds=300):
super(QueueWorker, self).__init__()
self._poll_period_seconds = poll_period_seconds
self._reservation_seconds = reservation_seconds
self._watchdog_period_seconds = watchdog_period_seconds
self._retry_after_seconds = retry_after_seconds
self._stop = Event()
self._terminated = Event()
self._queue = queue
self._current_item_lock = Lock()
self.current_queue_item = None
# Add the various operations.
self.add_operation(self.poll_queue, self._poll_period_seconds)
self.add_operation(self.update_queue_metrics, 60)
self.add_operation(self.run_watchdog, self._watchdog_period_seconds)
def process_queue_item(self, job_details):
""" Return True if complete, False if it should be retried. """
raise NotImplementedError('Workers must implement run.')
def watchdog(self):
""" Function that gets run once every watchdog_period_seconds. """
pass
def _close_db_handle(self):
if not db.is_closed():
logger.debug('Disconnecting from database.')
db.close()
def extend_processing(self, seconds_from_now):
with self._current_item_lock:
if self.current_queue_item is not None:
self._queue.extend_processing(self.current_queue_item, seconds_from_now)
def run_watchdog(self):
logger.debug('Running watchdog.')
try:
self.watchdog()
except WorkerUnhealthyException as exc:
logger.error('The worker has encountered an error via watchdog and will not take new jobs')
logger.error(exc.message)
self.mark_current_incomplete(restore_retry=True)
self._stop.set()
def poll_queue(self):
logger.debug('Getting work item from queue.')
with self._current_item_lock:
self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds)
while True:
# Retrieve the current item in the queue over which to operate. We do so under
# a lock to make sure we are always retrieving an item when in a healthy state.
current_queue_item = None
with self._current_item_lock:
current_queue_item = self.current_queue_item
if current_queue_item is None:
# Close the db handle.
self._close_db_handle()
break
logger.debug('Queue gave us some work: %s', current_queue_item.body)
job_details = json.loads(current_queue_item.body)
try:
self.process_queue_item(job_details)
self.mark_current_complete()
except JobException as jex:
logger.warning('An error occurred processing request: %s', current_queue_item.body)
logger.warning('Job exception: %s' % jex)
self.mark_current_incomplete(restore_retry=False)
except WorkerUnhealthyException as exc:
logger.error('The worker has encountered an error via the job and will not take new jobs')
logger.error(exc.message)
self.mark_current_incomplete(restore_retry=True)
self._stop.set()
finally:
# Close the db handle.
self._close_db_handle()
if not self._stop.is_set():
with self._current_item_lock:
self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds)
if not self._stop.is_set():
logger.debug('No more work.')
def update_queue_metrics(self):
self._queue.update_metrics()
def mark_current_incomplete(self, restore_retry=False):
with self._current_item_lock:
if self.current_queue_item is not None:
self._queue.incomplete(self.current_queue_item, restore_retry=restore_retry,
retry_after=self._retry_after_seconds)
self.current_queue_item = None
def mark_current_complete(self):
with self._current_item_lock:
if self.current_queue_item is not None:
self._queue.complete(self.current_queue_item)
self.current_queue_item = None
def ungracefully_terminated(self):
# Give back the retry that we took for this queue item so that if it were down to zero
# retries it will still be picked up by another worker
self.mark_current_incomplete()

View file

@ -1,54 +1,51 @@
import logging
from apscheduler.schedulers.blocking import BlockingScheduler
from app import app
from data.database import (Repository, LogEntry, RepositoryActionCount, db_random_func, fn,
UseThenDisconnect)
from datetime import date, datetime, timedelta
from workers.worker import Worker
POLL_PERIOD_SECONDS = 30
POLL_PERIOD_SECONDS = 10
logger = logging.getLogger(__name__)
sched = BlockingScheduler()
@sched.scheduled_job(trigger='interval', seconds=10)
def count_repository_actions():
""" Counts actions for a random repository for the previous day. """
class RepositoryActionCountWorker(Worker):
def __init__(self):
super(RepositoryActionCountWorker, self).__init__()
self.add_operation(self._count_repository_actions, POLL_PERIOD_SECONDS)
with UseThenDisconnect(app.config):
try:
# Get a random repository to count.
today = date.today()
yesterday = today - timedelta(days=1)
has_yesterday_actions = (RepositoryActionCount.select(RepositoryActionCount.repository)
.where(RepositoryActionCount.date == yesterday))
def _count_repository_actions(self):
""" Counts actions for a random repository for the previous day. """
to_count = (Repository.select()
.where(~(Repository.id << (has_yesterday_actions)))
.order_by(db_random_func()).get())
logger.debug('Counting: %s', to_count.id)
actions = (LogEntry.select()
.where(LogEntry.repository == to_count,
LogEntry.datetime >= yesterday,
LogEntry.datetime < today)
.count())
# Create the row.
with UseThenDisconnect(app.config):
try:
RepositoryActionCount.create(repository=to_count, date=yesterday, count=actions)
except:
logger.exception('Exception when writing count')
# Get a random repository to count.
today = date.today()
yesterday = today - timedelta(days=1)
has_yesterday_actions = (RepositoryActionCount.select(RepositoryActionCount.repository)
.where(RepositoryActionCount.date == yesterday))
return True
to_count = (Repository.select()
.where(~(Repository.id << (has_yesterday_actions)))
.order_by(db_random_func()).get())
except Repository.DoesNotExist:
logger.debug('No further repositories to count')
return False
logger.debug('Counting: %s', to_count.id)
actions = (LogEntry.select()
.where(LogEntry.repository == to_count,
LogEntry.datetime >= yesterday,
LogEntry.datetime < today)
.count())
# Create the row.
try:
RepositoryActionCount.create(repository=to_count, date=yesterday, count=actions)
except:
logger.exception('Exception when writing count')
except Repository.DoesNotExist:
logger.debug('No further repositories to count')
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
sched.start()
worker = RepositoryActionCountWorker()
worker.start()

View file

@ -1,93 +1,33 @@
import logging
import json
import signal
import sys
import socket
from threading import Event, Lock
from threading import Event
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from threading import Thread
from time import sleep
from raven import Client
from app import app
from data.model import db
from data.queue import WorkQueue
from functools import wraps
logger = logging.getLogger(__name__)
class JobException(Exception):
""" A job exception is an exception that is caused by something being malformed in the job. When
a worker raises this exception the job will be terminated and the retry will not be returned
to the queue. """
pass
class WorkerUnhealthyException(Exception):
""" When this exception is raised, the worker is no longer healthy and will not accept any more
work. When this is raised while processing a queue item, the item should be returned to the
queue along with another retry. """
pass
class WorkerStatusServer(HTTPServer):
def __init__(self, worker, *args, **kwargs):
HTTPServer.__init__(self, *args, **kwargs)
self.worker = worker
class WorkerStatusHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/status':
# Return the worker status
code = 200 if self.server.worker.is_healthy() else 503
self.send_response(code)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
self.wfile.write('OK')
elif self.path == '/terminate':
# Return whether it is safe to terminate the worker process
code = 200 if self.server.worker.is_terminated() else 503
self.send_response(code)
else:
self.send_error(404)
def do_POST(self):
if self.path == '/terminate':
try:
self.server.worker.join()
self.send_response(200)
except:
self.send_response(500)
else:
self.send_error(404)
class Worker(object):
def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300,
watchdog_period_seconds=60, retry_after_seconds=300):
""" Base class for workers which perform some work periodically. """
def __init__(self):
self._sched = BackgroundScheduler()
self._poll_period_seconds = poll_period_seconds
self._reservation_seconds = reservation_seconds
self._watchdog_period_seconds = watchdog_period_seconds
self._retry_after_seconds = retry_after_seconds
self._operations = []
self._stop = Event()
self._terminated = Event()
self._queue = queue
self._current_item_lock = Lock()
self.current_queue_item = None
def process_queue_item(self, job_details):
""" Return True if complete, False if it should be retried. """
raise NotImplementedError('Workers must implement run.')
def watchdog(self):
""" Function that gets run once every watchdog_period_seconds. """
pass
def _close_db_handle(self):
if not db.is_closed():
logger.debug('Disconnecting from database.')
db.close()
if app.config.get('EXCEPTION_LOG_TYPE', 'FakeSentry') == 'Sentry':
worker_name = '%s:worker-%s' % (socket.gethostname(), self.__class__.__name__)
self._raven_client = Client(app.config.get('SENTRY_DSN', ''), name=worker_name)
def is_healthy(self):
return not self._stop.is_set()
@ -95,90 +35,33 @@ class Worker(object):
def is_terminated(self):
return self._terminated.is_set()
def extend_processing(self, seconds_from_now):
with self._current_item_lock:
if self.current_queue_item is not None:
self._queue.extend_processing(self.current_queue_item, seconds_from_now)
def run_watchdog(self):
logger.debug('Running watchdog.')
try:
self.watchdog()
except WorkerUnhealthyException as exc:
logger.error('The worker has encountered an error via watchdog and will not take new jobs')
logger.error(exc.message)
self.mark_current_incomplete(restore_retry=True)
self._stop.set()
def poll_queue(self):
logger.debug('Getting work item from queue.')
with self._current_item_lock:
self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds)
while True:
# Retrieve the current item in the queue over which to operate. We do so under
# a lock to make sure we are always retrieving an item when in a healthy state.
current_queue_item = None
with self._current_item_lock:
current_queue_item = self.current_queue_item
if current_queue_item is None:
# Close the db handle.
self._close_db_handle()
break
logger.debug('Queue gave us some work: %s', current_queue_item.body)
job_details = json.loads(current_queue_item.body)
def ungracefully_terminated(self):
""" Method called when the worker has been terminated in an ungraceful fashion. """
pass
def add_operation(self, operation_func, operation_sec):
@wraps(operation_func)
def _operation_func():
try:
self.process_queue_item(job_details)
self.mark_current_complete()
return operation_func()
except Exception:
logger.exception('Operation raised exception')
if self._raven_client:
logger.debug('Logging exception to Sentry')
self._raven_client.captureException()
except JobException as jex:
logger.warning('An error occurred processing request: %s', current_queue_item.body)
logger.warning('Job exception: %s' % jex)
self.mark_current_incomplete(restore_retry=False)
self._operations.append((_operation_func, operation_sec))
except WorkerUnhealthyException as exc:
logger.error('The worker has encountered an error via the job and will not take new jobs')
logger.error(exc.message)
self.mark_current_incomplete(restore_retry=True)
self._stop.set()
finally:
# Close the db handle.
self._close_db_handle()
if not self._stop.is_set():
with self._current_item_lock:
self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds)
if not self._stop.is_set():
logger.debug('No more work.')
def update_queue_metrics(self):
self._queue.update_metrics()
def start(self, start_status_server_port=None):
if start_status_server_port is not None:
# Start a status server on a thread
server_address = ('', start_status_server_port)
httpd = WorkerStatusServer(self, server_address, WorkerStatusHandler)
server_thread = Thread(target=httpd.serve_forever)
server_thread.daemon = True
server_thread.start()
logger.debug("Scheduling worker.")
def start(self):
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
logger.debug('Scheduling worker.')
soon = datetime.now() + timedelta(seconds=.001)
self._sched.start()
self._sched.add_job(self.poll_queue, 'interval', seconds=self._poll_period_seconds,
start_date=soon, max_instances=1)
self._sched.add_job(self.update_queue_metrics, 'interval', seconds=60, start_date=soon,
max_instances=1)
self._sched.add_job(self.run_watchdog, 'interval', seconds=self._watchdog_period_seconds,
max_instances=1)
for operation_func, operation_sec in self._operations:
self._sched.add_job(operation_func, 'interval', seconds=operation_sec,
start_date=soon, max_instances=1)
signal.signal(signal.SIGTERM, self.terminate)
signal.signal(signal.SIGINT, self.terminate)
@ -192,23 +75,6 @@ class Worker(object):
self._terminated.set()
# Wait forever if we're running a server
while start_status_server_port is not None:
sleep(60)
def mark_current_incomplete(self, restore_retry=False):
with self._current_item_lock:
if self.current_queue_item is not None:
self._queue.incomplete(self.current_queue_item, restore_retry=restore_retry,
retry_after=self._retry_after_seconds)
self.current_queue_item = None
def mark_current_complete(self):
with self._current_item_lock:
if self.current_queue_item is not None:
self._queue.complete(self.current_queue_item)
self.current_queue_item = None
def terminate(self, signal_num=None, stack_frame=None, graceful=False):
if self._terminated.is_set():
sys.exit(1)
@ -218,9 +84,7 @@ class Worker(object):
self._stop.set()
if not graceful:
# Give back the retry that we took for this queue item so that if it were down to zero
# retries it will still be picked up by another worker
self.mark_current_incomplete()
self.ungracefully_terminated()
def join(self):
self.terminate(graceful=True)