diff --git a/app.py b/app.py index c3b15d7aa..78243de75 100644 --- a/app.py +++ b/app.py @@ -32,6 +32,7 @@ from util.queuemetrics import QueueMetrics from util.config.provider import FileConfigProvider, TestConfigProvider from util.config.configutil import generate_secret_key from util.config.superusermanager import SuperUserManager +from buildman.jobutil.buildreporter import BuildMetrics OVERRIDE_CONFIG_DIRECTORY = 'conf/stack/' OVERRIDE_CONFIG_YAML_FILENAME = 'conf/stack/config.yaml' @@ -118,6 +119,7 @@ userevents = UserEventsBuilderModule(app) superusers = SuperUserManager(app) signer = Signer(app, OVERRIDE_CONFIG_DIRECTORY) queue_metrics = QueueMetrics(app) +build_metrics = BuildMetrics(app) tf = app.config['DB_TRANSACTION_FACTORY'] diff --git a/buildman/enums.py b/buildman/enums.py new file mode 100644 index 000000000..3d38217fe --- /dev/null +++ b/buildman/enums.py @@ -0,0 +1,12 @@ +class BuildJobResult(object): + """ Build job result enum """ + INCOMPLETE = 'incomplete' + COMPLETE = 'complete' + ERROR = 'error' + + +class BuildServerStatus(object): + """ Build server status enum """ + STARTING = 'starting' + RUNNING = 'running' + SHUTDOWN = 'shutting_down' diff --git a/buildman/jobutil/buildreporter.py b/buildman/jobutil/buildreporter.py new file mode 100644 index 000000000..16dd0ca5b --- /dev/null +++ b/buildman/jobutil/buildreporter.py @@ -0,0 +1,72 @@ +from trollius import From + +from buildman.enums import BuildJobResult +from util.cloudwatch import get_queue + + +class BuildReporter(object): + """ + Base class for reporting build statuses to a metrics service. + """ + def report_completion_status(self, status): + """ + Method to invoke the recording of build's completion status to a metric service. + """ + raise NotImplementedError + + +class NullReporter(BuildReporter): + """ + The /dev/null of BuildReporters. + """ + def report_completion_status(self, *args): + pass + + +class CloudWatchBuildReporter(BuildReporter): + """ + Implements a BuildReporter for Amazon's CloudWatch. + """ + def __init__(self, queue, namespace_name, completed_name, failed_name, incompleted_name): + self._queue = queue + self._namespace_name = namespace_name + self._completed_name = completed_name + self._failed_name = failed_name + self._incompleted_name = incompleted_name + + def _send_to_queue(self, *args, **kwargs): + self._queue.put((args, kwargs)) + + def report_completion_status(self, status): + if status == BuildJobResult.COMPLETE: + status_name = self._completed_name + elif status == BuildJobResult.ERROR: + status_name = self._failed_name + elif status == BuildJobResult.INCOMPLETE: + status_name = self._incompleted_name + else: + return + + self._send_to_queue(self._namespace_name, status_name, 1, unit='Count') + + +class BuildMetrics(object): + """ + BuildMetrics initializes a reporter for recording the status of build completions. + """ + def __init__(self, app=None): + self._app = app + self._reporter = NullReporter() + if app is not None: + reporter_type = app.config.get('BUILD_METRICS_TYPE', 'Null') + if reporter_type == 'CloudWatch': + namespace = app.config['BUILD_METRICS_NAMESPACE'] + completed_name = app.config['BUILD_METRICS_COMPLETED_NAME'] + failed_name = app.config['BUILD_METRICS_FAILED_NAME'] + incompleted_name = app.config['BUILD_METRICS_INCOMPLETED_NAME'] + request_queue = get_queue(app) + self._reporter = CloudWatchBuildReporter(request_queue, namespace, completed_name, + failed_name, incompleted_name) + + def __getattr__(self, name): + return getattr(self._reporter, name, None) diff --git a/buildman/server.py b/buildman/server.py index f6ba9b4bc..855afc212 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -9,14 +9,15 @@ from autobahn.wamp import types from aiowsgi import create_server as create_wsgi_server from flask import Flask from threading import Event +from trollius.tasks import Task from trollius.coroutines import From from datetime import timedelta +from buildman.enums import BuildJobResult, BuildServerStatus from buildman.jobutil.buildstatus import StatusHandler from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from data import database -from data.queue import WorkQueue -from app import app +from app import app, build_metrics logger = logging.getLogger(__name__) @@ -27,12 +28,6 @@ MINIMUM_JOB_EXTENSION = timedelta(minutes=2) HEARTBEAT_PERIOD_SEC = 30 -class BuildJobResult(object): - """ Build job result enum """ - INCOMPLETE = 'incomplete' - COMPLETE = 'complete' - ERROR = 'error' - class BuilderServer(object): """ Server which handles both HTTP and WAMP requests, managing the full state of the build controller. @@ -40,7 +35,7 @@ class BuilderServer(object): def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass, lifecycle_manager_config, manager_hostname): self._loop = None - self._current_status = 'starting' + self._current_status = BuildServerStatus.STARTING self._current_components = [] self._job_count = 0 @@ -60,7 +55,7 @@ class BuilderServer(object): self._lifecycle_manager_config = lifecycle_manager_config self._shutdown_event = Event() - self._current_status = 'running' + self._current_status = BuildServerStatus.RUNNING self._register_controller() @@ -97,8 +92,14 @@ class BuilderServer(object): logger.debug('Starting server on port %s, with controller on port %s', websocket_port, controller_port) + + TASKS = [ + Task(self._initialize(loop, host, websocket_port, controller_port, ssl)), + Task(self._queue_metrics_updater()), + ] + try: - loop.run_until_complete(self._initialize(loop, host, websocket_port, controller_port, ssl)) + loop.run_until_complete(trollius.wait(TASKS)) except KeyboardInterrupt: pass finally: @@ -106,7 +107,7 @@ class BuilderServer(object): def close(self): logger.debug('Requested server shutdown') - self._current_status = 'shutting_down' + self._current_status = BuildServerStatus.SHUTDOWN self._lifecycle_manager.shutdown() self._shutdown_event.wait() logger.debug('Shutting down server') @@ -147,12 +148,14 @@ class BuilderServer(object): self._job_count = self._job_count - 1 - if self._current_status == 'shutting_down' and not self._job_count: + if self._current_status == BuildServerStatus.SHUTDOWN and not self._job_count: self._shutdown_event.set() + build_metrics.report_completion_status(job_status) + @trollius.coroutine def _work_checker(self): - while self._current_status == 'running': + while self._current_status == BuildServerStatus.RUNNING: with database.CloseForLongOperation(app.config): yield From(trollius.sleep(WORK_CHECK_TIMEOUT)) @@ -183,7 +186,11 @@ class BuilderServer(object): logger.debug('All workers are busy. Requeuing.') self._queue.incomplete(job_item, restore_retry=True, retry_after=0) - + @trollius.coroutine + def _queue_metrics_updater(self): + while self._current_status == BuildServerStatus.RUNNING: + yield From(trollius.sleep(30)) + self._queue.update_metrics() @trollius.coroutine def _initialize(self, loop, host, websocket_port, controller_port, ssl=None): diff --git a/local-test.sh b/local-test.sh index a54491969..2a85148bf 100755 --- a/local-test.sh +++ b/local-test.sh @@ -1 +1 @@ -TEST=true python -m unittest discover +TEST=true TROLLIUSDEBUG=1 python -m unittest discover diff --git a/util/cloudwatch.py b/util/cloudwatch.py new file mode 100644 index 000000000..b75dadf31 --- /dev/null +++ b/util/cloudwatch.py @@ -0,0 +1,47 @@ +import logging +import boto + +from Queue import Queue +from threading import Thread + + +logger = logging.getLogger(__name__) + +def get_queue(app): + """ + Returns a queue to the CloudWatchSender. If a queue/sender do not exist, creates them. + """ + access_key = app.config['CLOUDWATCH_AWS_ACCESS_KEY'] + secret_key = app.config['CLOUDWATCH_AWS_SECRET_KEY'] + + queue = Queue() + sender = CloudWatchSender(queue, access_key, secret_key) + sender.start() + return queue + +class CloudWatchSender(Thread): + """ + CloudWatchSender loops indefinitely and pulls metrics off of a queue then sends it to CloudWatch. + """ + def __init__(self, request_queue, aws_access_key, aws_secret_key): + Thread.__init__(self) + self.daemon = True + + self._aws_access_key = aws_access_key + self._aws_secret_key = aws_secret_key + self._put_metrics_queue = request_queue + + def run(self): + try: + logger.debug('Starting CloudWatch sender process.') + connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key) + except: + logger.exception('Failed to connect to CloudWatch.') + + while True: + put_metric_args, kwargs = self._put_metrics_queue.get() + logger.debug('Got queued put metrics request.') + try: + connection.put_metric_data(*put_metric_args, **kwargs) + except: + logger.exception('Failed to write to CloudWatch') diff --git a/util/queuemetrics.py b/util/queuemetrics.py index 1bebfa3a6..9e0a549f4 100644 --- a/util/queuemetrics.py +++ b/util/queuemetrics.py @@ -1,8 +1,6 @@ import logging -import boto -from Queue import Queue -from threading import Thread +from util.cloudwatch import get_queue logger = logging.getLogger(__name__) @@ -13,8 +11,8 @@ class NullReporter(object): pass -class QueueingCloudWatchReporter(object): - """ QueueingCloudWatchReporter reports metrics to the "SendToCloudWatch" process """ +class CloudWatchReporter(object): + """ CloudWatchReporter reports work queue metrics to CloudWatch """ def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name): self._namespace = namespace self._need_capacity_name = need_capacity_name @@ -36,70 +34,23 @@ class QueueingCloudWatchReporter(object): self._send_to_queue(self._namespace, self._build_percent_name, building_percent, unit='Percent') - -class SendToCloudWatch(Thread): - """ SendToCloudWatch loops indefinitely and pulls metrics off of a queue then sends it to - CloudWatch. """ - def __init__(self, request_queue, aws_access_key, aws_secret_key): - Thread.__init__(self) - self.daemon = True - - self._aws_access_key = aws_access_key - self._aws_secret_key = aws_secret_key - self._put_metrics_queue = request_queue - - def run(self): - try: - logger.debug('Starting CloudWatch sender process.') - connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key) - except: - logger.exception('Failed to connect to CloudWatch.') - - while True: - put_metric_args, kwargs = self._put_metrics_queue.get() - logger.debug('Got queued put metrics request.') - try: - connection.put_metric_data(*put_metric_args, **kwargs) - except: - logger.exception('Failed to write to CloudWatch') - - class QueueMetrics(object): + """ + QueueMetrics initializes a reporter for recording metrics of work queues. + """ def __init__(self, app=None): - self.app = app - self.sender = None + self._app = app + self._reporter = NullReporter() if app is not None: - self.state = self.init_app(app) - else: - self.state = None + reporter_type = app.config.get('QUEUE_METRICS_TYPE', 'Null') + if reporter_type == 'CloudWatch': + namespace = app.config['QUEUE_METRICS_NAMESPACE'] + req_capacity_name = app.config['QUEUE_METRICS_CAPACITY_SHORTAGE_NAME'] + build_percent_name = app.config['QUEUE_METRICS_BUILD_PERCENT_NAME'] - def init_app(self, app): - analytics_type = app.config.get('QUEUE_METRICS_TYPE', 'Null') - - if analytics_type == 'CloudWatch': - access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY') - secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY') - namespace = app.config.get('QUEUE_METRICS_NAMESPACE') - req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME') - build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME') - - request_queue = Queue() - reporter = QueueingCloudWatchReporter(request_queue, namespace, req_capacity_name, - build_percent_name) - self.sender = SendToCloudWatch(request_queue, access_key, secret_key) - else: - reporter = NullReporter() - - # register extension with app - app.extensions = getattr(app, 'extensions', {}) - app.extensions['queuemetrics'] = reporter - return reporter - - def run(self): - logger.debug('Asked to start CloudWatch reporter') - if self.sender is not None: - logger.debug('Starting CloudWatch reporter') - self.sender.start() + request_queue = get_queue(app) + self._reporter = CloudWatchReporter(request_queue, namespace, req_capacity_name, + build_percent_name) def __getattr__(self, name): - return getattr(self.state, name, None) + return getattr(self._reporter, name, None) diff --git a/web.py b/web.py index 92b3d6758..7c945cc45 100644 --- a/web.py +++ b/web.py @@ -1,7 +1,7 @@ import logging import logging.config -from app import app as application, queue_metrics +from app import app as application from endpoints.api import api_bp from endpoints.web import web @@ -9,9 +9,6 @@ from endpoints.webhooks import webhooks from endpoints.realtime import realtime from endpoints.callbacks import callback -# Start the cloudwatch reporting. -queue_metrics.run() - application.register_blueprint(web) application.register_blueprint(callback, url_prefix='/oauth2') application.register_blueprint(api_bp, url_prefix='/api')