Merge pull request #11 from coreos-inc/nimbus
CloudWatch for build job status
This commit is contained in:
commit
47f8cb77c4
8 changed files with 174 additions and 86 deletions
2
app.py
2
app.py
|
@ -32,6 +32,7 @@ from util.queuemetrics import QueueMetrics
|
|||
from util.config.provider import FileConfigProvider, TestConfigProvider
|
||||
from util.config.configutil import generate_secret_key
|
||||
from util.config.superusermanager import SuperUserManager
|
||||
from buildman.jobutil.buildreporter import BuildMetrics
|
||||
|
||||
OVERRIDE_CONFIG_DIRECTORY = 'conf/stack/'
|
||||
OVERRIDE_CONFIG_YAML_FILENAME = 'conf/stack/config.yaml'
|
||||
|
@ -118,6 +119,7 @@ userevents = UserEventsBuilderModule(app)
|
|||
superusers = SuperUserManager(app)
|
||||
signer = Signer(app, OVERRIDE_CONFIG_DIRECTORY)
|
||||
queue_metrics = QueueMetrics(app)
|
||||
build_metrics = BuildMetrics(app)
|
||||
|
||||
tf = app.config['DB_TRANSACTION_FACTORY']
|
||||
|
||||
|
|
12
buildman/enums.py
Normal file
12
buildman/enums.py
Normal file
|
@ -0,0 +1,12 @@
|
|||
class BuildJobResult(object):
|
||||
""" Build job result enum """
|
||||
INCOMPLETE = 'incomplete'
|
||||
COMPLETE = 'complete'
|
||||
ERROR = 'error'
|
||||
|
||||
|
||||
class BuildServerStatus(object):
|
||||
""" Build server status enum """
|
||||
STARTING = 'starting'
|
||||
RUNNING = 'running'
|
||||
SHUTDOWN = 'shutting_down'
|
72
buildman/jobutil/buildreporter.py
Normal file
72
buildman/jobutil/buildreporter.py
Normal file
|
@ -0,0 +1,72 @@
|
|||
from trollius import From
|
||||
|
||||
from buildman.enums import BuildJobResult
|
||||
from util.cloudwatch import get_queue
|
||||
|
||||
|
||||
class BuildReporter(object):
|
||||
"""
|
||||
Base class for reporting build statuses to a metrics service.
|
||||
"""
|
||||
def report_completion_status(self, status):
|
||||
"""
|
||||
Method to invoke the recording of build's completion status to a metric service.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class NullReporter(BuildReporter):
|
||||
"""
|
||||
The /dev/null of BuildReporters.
|
||||
"""
|
||||
def report_completion_status(self, *args):
|
||||
pass
|
||||
|
||||
|
||||
class CloudWatchBuildReporter(BuildReporter):
|
||||
"""
|
||||
Implements a BuildReporter for Amazon's CloudWatch.
|
||||
"""
|
||||
def __init__(self, queue, namespace_name, completed_name, failed_name, incompleted_name):
|
||||
self._queue = queue
|
||||
self._namespace_name = namespace_name
|
||||
self._completed_name = completed_name
|
||||
self._failed_name = failed_name
|
||||
self._incompleted_name = incompleted_name
|
||||
|
||||
def _send_to_queue(self, *args, **kwargs):
|
||||
self._queue.put((args, kwargs))
|
||||
|
||||
def report_completion_status(self, status):
|
||||
if status == BuildJobResult.COMPLETE:
|
||||
status_name = self._completed_name
|
||||
elif status == BuildJobResult.ERROR:
|
||||
status_name = self._failed_name
|
||||
elif status == BuildJobResult.INCOMPLETE:
|
||||
status_name = self._incompleted_name
|
||||
else:
|
||||
return
|
||||
|
||||
self._send_to_queue(self._namespace_name, status_name, 1, unit='Count')
|
||||
|
||||
|
||||
class BuildMetrics(object):
|
||||
"""
|
||||
BuildMetrics initializes a reporter for recording the status of build completions.
|
||||
"""
|
||||
def __init__(self, app=None):
|
||||
self._app = app
|
||||
self._reporter = NullReporter()
|
||||
if app is not None:
|
||||
reporter_type = app.config.get('BUILD_METRICS_TYPE', 'Null')
|
||||
if reporter_type == 'CloudWatch':
|
||||
namespace = app.config['BUILD_METRICS_NAMESPACE']
|
||||
completed_name = app.config['BUILD_METRICS_COMPLETED_NAME']
|
||||
failed_name = app.config['BUILD_METRICS_FAILED_NAME']
|
||||
incompleted_name = app.config['BUILD_METRICS_INCOMPLETED_NAME']
|
||||
request_queue = get_queue(app)
|
||||
self._reporter = CloudWatchBuildReporter(request_queue, namespace, completed_name,
|
||||
failed_name, incompleted_name)
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._reporter, name, None)
|
|
@ -9,14 +9,15 @@ from autobahn.wamp import types
|
|||
from aiowsgi import create_server as create_wsgi_server
|
||||
from flask import Flask
|
||||
from threading import Event
|
||||
from trollius.tasks import Task
|
||||
from trollius.coroutines import From
|
||||
from datetime import timedelta
|
||||
|
||||
from buildman.enums import BuildJobResult, BuildServerStatus
|
||||
from buildman.jobutil.buildstatus import StatusHandler
|
||||
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
|
||||
from data import database
|
||||
from data.queue import WorkQueue
|
||||
from app import app
|
||||
from app import app, build_metrics
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -27,12 +28,6 @@ MINIMUM_JOB_EXTENSION = timedelta(minutes=2)
|
|||
|
||||
HEARTBEAT_PERIOD_SEC = 30
|
||||
|
||||
class BuildJobResult(object):
|
||||
""" Build job result enum """
|
||||
INCOMPLETE = 'incomplete'
|
||||
COMPLETE = 'complete'
|
||||
ERROR = 'error'
|
||||
|
||||
class BuilderServer(object):
|
||||
""" Server which handles both HTTP and WAMP requests, managing the full state of the build
|
||||
controller.
|
||||
|
@ -40,7 +35,7 @@ class BuilderServer(object):
|
|||
def __init__(self, registry_hostname, queue, build_logs, user_files, lifecycle_manager_klass,
|
||||
lifecycle_manager_config, manager_hostname):
|
||||
self._loop = None
|
||||
self._current_status = 'starting'
|
||||
self._current_status = BuildServerStatus.STARTING
|
||||
self._current_components = []
|
||||
self._job_count = 0
|
||||
|
||||
|
@ -60,7 +55,7 @@ class BuilderServer(object):
|
|||
self._lifecycle_manager_config = lifecycle_manager_config
|
||||
|
||||
self._shutdown_event = Event()
|
||||
self._current_status = 'running'
|
||||
self._current_status = BuildServerStatus.RUNNING
|
||||
|
||||
self._register_controller()
|
||||
|
||||
|
@ -97,8 +92,14 @@ class BuilderServer(object):
|
|||
|
||||
logger.debug('Starting server on port %s, with controller on port %s', websocket_port,
|
||||
controller_port)
|
||||
|
||||
TASKS = [
|
||||
Task(self._initialize(loop, host, websocket_port, controller_port, ssl)),
|
||||
Task(self._queue_metrics_updater()),
|
||||
]
|
||||
|
||||
try:
|
||||
loop.run_until_complete(self._initialize(loop, host, websocket_port, controller_port, ssl))
|
||||
loop.run_until_complete(trollius.wait(TASKS))
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
|
@ -106,7 +107,7 @@ class BuilderServer(object):
|
|||
|
||||
def close(self):
|
||||
logger.debug('Requested server shutdown')
|
||||
self._current_status = 'shutting_down'
|
||||
self._current_status = BuildServerStatus.SHUTDOWN
|
||||
self._lifecycle_manager.shutdown()
|
||||
self._shutdown_event.wait()
|
||||
logger.debug('Shutting down server')
|
||||
|
@ -147,12 +148,14 @@ class BuilderServer(object):
|
|||
|
||||
self._job_count = self._job_count - 1
|
||||
|
||||
if self._current_status == 'shutting_down' and not self._job_count:
|
||||
if self._current_status == BuildServerStatus.SHUTDOWN and not self._job_count:
|
||||
self._shutdown_event.set()
|
||||
|
||||
build_metrics.report_completion_status(job_status)
|
||||
|
||||
@trollius.coroutine
|
||||
def _work_checker(self):
|
||||
while self._current_status == 'running':
|
||||
while self._current_status == BuildServerStatus.RUNNING:
|
||||
with database.CloseForLongOperation(app.config):
|
||||
yield From(trollius.sleep(WORK_CHECK_TIMEOUT))
|
||||
|
||||
|
@ -183,7 +186,11 @@ class BuilderServer(object):
|
|||
logger.debug('All workers are busy. Requeuing.')
|
||||
self._queue.incomplete(job_item, restore_retry=True, retry_after=0)
|
||||
|
||||
|
||||
@trollius.coroutine
|
||||
def _queue_metrics_updater(self):
|
||||
while self._current_status == BuildServerStatus.RUNNING:
|
||||
yield From(trollius.sleep(30))
|
||||
self._queue.update_metrics()
|
||||
|
||||
@trollius.coroutine
|
||||
def _initialize(self, loop, host, websocket_port, controller_port, ssl=None):
|
||||
|
|
|
@ -1 +1 @@
|
|||
TEST=true python -m unittest discover
|
||||
TEST=true TROLLIUSDEBUG=1 python -m unittest discover
|
||||
|
|
47
util/cloudwatch.py
Normal file
47
util/cloudwatch.py
Normal file
|
@ -0,0 +1,47 @@
|
|||
import logging
|
||||
import boto
|
||||
|
||||
from Queue import Queue
|
||||
from threading import Thread
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def get_queue(app):
|
||||
"""
|
||||
Returns a queue to the CloudWatchSender. If a queue/sender do not exist, creates them.
|
||||
"""
|
||||
access_key = app.config['CLOUDWATCH_AWS_ACCESS_KEY']
|
||||
secret_key = app.config['CLOUDWATCH_AWS_SECRET_KEY']
|
||||
|
||||
queue = Queue()
|
||||
sender = CloudWatchSender(queue, access_key, secret_key)
|
||||
sender.start()
|
||||
return queue
|
||||
|
||||
class CloudWatchSender(Thread):
|
||||
"""
|
||||
CloudWatchSender loops indefinitely and pulls metrics off of a queue then sends it to CloudWatch.
|
||||
"""
|
||||
def __init__(self, request_queue, aws_access_key, aws_secret_key):
|
||||
Thread.__init__(self)
|
||||
self.daemon = True
|
||||
|
||||
self._aws_access_key = aws_access_key
|
||||
self._aws_secret_key = aws_secret_key
|
||||
self._put_metrics_queue = request_queue
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
logger.debug('Starting CloudWatch sender process.')
|
||||
connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key)
|
||||
except:
|
||||
logger.exception('Failed to connect to CloudWatch.')
|
||||
|
||||
while True:
|
||||
put_metric_args, kwargs = self._put_metrics_queue.get()
|
||||
logger.debug('Got queued put metrics request.')
|
||||
try:
|
||||
connection.put_metric_data(*put_metric_args, **kwargs)
|
||||
except:
|
||||
logger.exception('Failed to write to CloudWatch')
|
|
@ -1,8 +1,6 @@
|
|||
import logging
|
||||
import boto
|
||||
|
||||
from Queue import Queue
|
||||
from threading import Thread
|
||||
from util.cloudwatch import get_queue
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -13,8 +11,8 @@ class NullReporter(object):
|
|||
pass
|
||||
|
||||
|
||||
class QueueingCloudWatchReporter(object):
|
||||
""" QueueingCloudWatchReporter reports metrics to the "SendToCloudWatch" process """
|
||||
class CloudWatchReporter(object):
|
||||
""" CloudWatchReporter reports work queue metrics to CloudWatch """
|
||||
def __init__(self, request_queue, namespace, need_capacity_name, build_percent_name):
|
||||
self._namespace = namespace
|
||||
self._need_capacity_name = need_capacity_name
|
||||
|
@ -36,70 +34,23 @@ class QueueingCloudWatchReporter(object):
|
|||
self._send_to_queue(self._namespace, self._build_percent_name, building_percent,
|
||||
unit='Percent')
|
||||
|
||||
|
||||
class SendToCloudWatch(Thread):
|
||||
""" SendToCloudWatch loops indefinitely and pulls metrics off of a queue then sends it to
|
||||
CloudWatch. """
|
||||
def __init__(self, request_queue, aws_access_key, aws_secret_key):
|
||||
Thread.__init__(self)
|
||||
self.daemon = True
|
||||
|
||||
self._aws_access_key = aws_access_key
|
||||
self._aws_secret_key = aws_secret_key
|
||||
self._put_metrics_queue = request_queue
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
logger.debug('Starting CloudWatch sender process.')
|
||||
connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key)
|
||||
except:
|
||||
logger.exception('Failed to connect to CloudWatch.')
|
||||
|
||||
while True:
|
||||
put_metric_args, kwargs = self._put_metrics_queue.get()
|
||||
logger.debug('Got queued put metrics request.')
|
||||
try:
|
||||
connection.put_metric_data(*put_metric_args, **kwargs)
|
||||
except:
|
||||
logger.exception('Failed to write to CloudWatch')
|
||||
|
||||
|
||||
class QueueMetrics(object):
|
||||
"""
|
||||
QueueMetrics initializes a reporter for recording metrics of work queues.
|
||||
"""
|
||||
def __init__(self, app=None):
|
||||
self.app = app
|
||||
self.sender = None
|
||||
self._app = app
|
||||
self._reporter = NullReporter()
|
||||
if app is not None:
|
||||
self.state = self.init_app(app)
|
||||
else:
|
||||
self.state = None
|
||||
reporter_type = app.config.get('QUEUE_METRICS_TYPE', 'Null')
|
||||
if reporter_type == 'CloudWatch':
|
||||
namespace = app.config['QUEUE_METRICS_NAMESPACE']
|
||||
req_capacity_name = app.config['QUEUE_METRICS_CAPACITY_SHORTAGE_NAME']
|
||||
build_percent_name = app.config['QUEUE_METRICS_BUILD_PERCENT_NAME']
|
||||
|
||||
def init_app(self, app):
|
||||
analytics_type = app.config.get('QUEUE_METRICS_TYPE', 'Null')
|
||||
|
||||
if analytics_type == 'CloudWatch':
|
||||
access_key = app.config.get('QUEUE_METRICS_AWS_ACCESS_KEY')
|
||||
secret_key = app.config.get('QUEUE_METRICS_AWS_SECRET_KEY')
|
||||
namespace = app.config.get('QUEUE_METRICS_NAMESPACE')
|
||||
req_capacity_name = app.config.get('QUEUE_METRICS_CAPACITY_SHORTAGE_NAME')
|
||||
build_percent_name = app.config.get('QUEUE_METRICS_BUILD_PERCENT_NAME')
|
||||
|
||||
request_queue = Queue()
|
||||
reporter = QueueingCloudWatchReporter(request_queue, namespace, req_capacity_name,
|
||||
build_percent_name)
|
||||
self.sender = SendToCloudWatch(request_queue, access_key, secret_key)
|
||||
else:
|
||||
reporter = NullReporter()
|
||||
|
||||
# register extension with app
|
||||
app.extensions = getattr(app, 'extensions', {})
|
||||
app.extensions['queuemetrics'] = reporter
|
||||
return reporter
|
||||
|
||||
def run(self):
|
||||
logger.debug('Asked to start CloudWatch reporter')
|
||||
if self.sender is not None:
|
||||
logger.debug('Starting CloudWatch reporter')
|
||||
self.sender.start()
|
||||
request_queue = get_queue(app)
|
||||
self._reporter = CloudWatchReporter(request_queue, namespace, req_capacity_name,
|
||||
build_percent_name)
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.state, name, None)
|
||||
return getattr(self._reporter, name, None)
|
||||
|
|
5
web.py
5
web.py
|
@ -1,7 +1,7 @@
|
|||
import logging
|
||||
import logging.config
|
||||
|
||||
from app import app as application, queue_metrics
|
||||
from app import app as application
|
||||
|
||||
from endpoints.api import api_bp
|
||||
from endpoints.web import web
|
||||
|
@ -9,9 +9,6 @@ from endpoints.webhooks import webhooks
|
|||
from endpoints.realtime import realtime
|
||||
from endpoints.callbacks import callback
|
||||
|
||||
# Start the cloudwatch reporting.
|
||||
queue_metrics.run()
|
||||
|
||||
application.register_blueprint(web)
|
||||
application.register_blueprint(callback, url_prefix='/oauth2')
|
||||
application.register_blueprint(api_bp, url_prefix='/api')
|
||||
|
|
Reference in a new issue