Further updates to the Prometheus client code

This commit is contained in:
Joseph Schorr 2016-06-28 14:36:17 -04:00
parent 668a8edc50
commit 713ba3abaf
9 changed files with 165 additions and 127 deletions

4
app.py
View file

@ -23,7 +23,7 @@ from data.billing import Billing
from data.buildlogs import BuildLogs from data.buildlogs import BuildLogs
from data.archivedlogs import LogArchive from data.archivedlogs import LogArchive
from data.userevent import UserEventsBuilderModule from data.userevent import UserEventsBuilderModule
from data.queue import WorkQueue, MetricQueueReporter from data.queue import WorkQueue, BuildMetricQueueReporter
from util import get_app_url from util import get_app_url
from util.saas.prometheus import PrometheusPlugin from util.saas.prometheus import PrometheusPlugin
from util.saas.analytics import Analytics from util.saas.analytics import Analytics
@ -197,7 +197,7 @@ oauth_apps = [github_login, github_trigger, gitlab_trigger, google_login, dex_lo
image_replication_queue = WorkQueue(app.config['REPLICATION_QUEUE_NAME'], tf) image_replication_queue = WorkQueue(app.config['REPLICATION_QUEUE_NAME'], tf)
dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf, dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf,
reporter=MetricQueueReporter(metric_queue)) reporter=BuildMetricQueueReporter(metric_queue))
notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf) notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf)
secscan_notification_queue = WorkQueue(app.config['SECSCAN_NOTIFICATION_QUEUE_NAME'], tf) secscan_notification_queue = WorkQueue(app.config['SECSCAN_NOTIFICATION_QUEUE_NAME'], tf)
secscan_api = SecurityScannerAPI(app, app.config, storage) secscan_api = SecurityScannerAPI(app, app.config, storage)

View file

@ -340,7 +340,8 @@ class EphemeralBuilderManager(BaseManager):
try: try:
builder_id = yield From(self._executor.start_builder(realm, token, build_uuid)) builder_id = yield From(self._executor.start_builder(realm, token, build_uuid))
metric_queue.put('EC2BuilderStarted', 1, unit='Count') metric_queue.put_deprecated('EC2BuilderStarted', 1, unit='Count')
metric_queue.ephemeral_build_workers.Inc(labelvalues=[builder_id, build_uuid])
except: except:
logger.exception('Exception when starting builder for job: %s', build_uuid) logger.exception('Exception when starting builder for job: %s', build_uuid)
raise Return(False, EC2_API_TIMEOUT) raise Return(False, EC2_API_TIMEOUT)

View file

@ -152,7 +152,8 @@ class EC2Executor(BuilderExecutor):
)) ))
except boto.exception.EC2ResponseError as ec2e: except boto.exception.EC2ResponseError as ec2e:
logger.exception('Unable to spawn builder instance') logger.exception('Unable to spawn builder instance')
metric_queue.put('EC2BuildStartFailure', 1, unit='Count') metric_queue.put_deprecated('EC2BuildStartFailure', 1, unit='Count')
metric_queue.ephemeral_build_worker_failure.Inc(labelvalues=[build_uuid])
raise ec2e raise ec2e
if not reservation.instances: if not reservation.instances:

View file

@ -18,10 +18,8 @@ from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
from data import database from data import database
from app import app, metric_queue from app import app, metric_queue
from app import app from app import app
from util.prometheus import Counter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
build_counter = Counter('builds', 'Number of builds', labelnames=['name'])
WORK_CHECK_TIMEOUT = 10 WORK_CHECK_TIMEOUT = 10
TIMEOUT_PERIOD_MINUTES = 20 TIMEOUT_PERIOD_MINUTES = 20
@ -240,5 +238,5 @@ def report_completion_status(status):
else: else:
return return
metric_queue.put(status_name, 1, unit='Count') metric_queue.put_deprecated(status_name, 1, unit='Count')
build_counter.Inc(labelvalues=[status_name]) metric_queue.build_counter.Inc(labelvalues=[status_name])

View file

@ -15,21 +15,23 @@ class NoopWith:
pass pass
class MetricQueueReporter(object): class BuildMetricQueueReporter(object):
""" Metric queue reporter for the build system. """
def __init__(self, metric_queue): def __init__(self, metric_queue):
self._metric_queue = metric_queue self._metric_queue = metric_queue
def __call__(self, currently_processing, running_count, total_count): def __call__(self, currently_processing, running_count, total_count):
need_capacity_count = total_count - running_count need_capacity_count = total_count - running_count
self._metric_queue.put('BuildCapacityShortage', need_capacity_count, unit='Count') self._metric_queue.put_deprecated('BuildCapacityShortage', need_capacity_count, unit='Count')
self._metric_queue.build_capacity_shortage.Set(need_capacity_count) self._metric_queue.build_capacity_shortage.Set(need_capacity_count)
building_percent = 100 if currently_processing else 0 building_percent = 100 if currently_processing else 0
self._metric_queue.put('PercentBuilding', building_percent, unit='Percent') self._metric_queue.put_deprecated('PercentBuilding', building_percent, unit='Percent')
self._metric_queue.percent_building.Set(building_percent) self._metric_queue.percent_building.Set(building_percent)
class WorkQueue(object): class WorkQueue(object):
""" Work queue defines methods for interacting with a queue backed by the database. """
def __init__(self, queue_name, transaction_factory, def __init__(self, queue_name, transaction_factory,
canonical_name_match_list=None, reporter=None, metric_queue=None): canonical_name_match_list=None, reporter=None, metric_queue=None):
self._queue_name = queue_name self._queue_name = queue_name
@ -107,9 +109,13 @@ class WorkQueue(object):
if self._metric_queue: if self._metric_queue:
dim = {'queue': self._queue_name} dim = {'queue': self._queue_name}
self._metric_queue.put('Running', running_count, dimensions=dim) self._metric_queue.put_deprecated('Running', running_count, dimensions=dim)
self._metric_queue.put('AvailableNotRunning', available_not_running_count, dimensions=dim) self._metric_queue.put_deprecated('AvailableNotRunning', available_not_running_count,
self._metric_queue.put('Available', available_count, dimensions=dim) dimensions=dim)
self._metric_queue.put_deprecated('Available', available_count, dimensions=dim)
self._metric_queue.work_queue_running.set(running_count, labelvalues=[self._queue_name])
self._metric_queue.work_queue_available.set(available_count, labelvalues=[self._queue_name])
if self._reporter: if self._reporter:
self._reporter(self._currently_processing, running_count, self._reporter(self._currently_processing, running_count,
@ -141,8 +147,10 @@ class WorkQueue(object):
with self._transaction_factory(db): with self._transaction_factory(db):
r = str(QueueItem.create(**params).id) r = str(QueueItem.create(**params).id)
if self._metric_queue: if self._metric_queue:
self._metric_queue.put('Added', 1, dimensions={'queue': self._queue_name}) self._metric_queue.put_deprecated('Added', 1, dimensions={'queue': self._queue_name})
return r return r
def get(self, processing_time=300, ordering_required=False): def get(self, processing_time=300, ordering_required=False):

View file

@ -14,7 +14,7 @@ from uuid import uuid4
from collections import namedtuple from collections import namedtuple
from util.registry import filelike from util.registry import filelike
from storage.basestorage import BaseStorageV2, InvalidChunkException from storage.basestorage import BaseStorageV2
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -163,7 +163,7 @@ class _CloudStorage(BaseStorageV2):
metadata['Content-Encoding'] = content_encoding metadata['Content-Encoding'] = content_encoding
if self._metric_queue is not None: if self._metric_queue is not None:
self._metric_queue.put('MultipartUploadStart', 1) self._metric_queue.put_deprecated('MultipartUploadStart', 1)
self._metric_queue.multipart_upload_start.Inc() self._metric_queue.multipart_upload_start.Inc()
return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata, return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
@ -204,7 +204,7 @@ class _CloudStorage(BaseStorageV2):
error = ex error = ex
if self._metric_queue is not None: if self._metric_queue is not None:
self._metric_queue.put('MultipartUploadFailure', 1) self._metric_queue.put_deprecated('MultipartUploadFailure', 1)
self._metric_queue.multipart_upload_end.Inc(labelvalues=['failure']) self._metric_queue.multipart_upload_end.Inc(labelvalues=['failure'])
if cancel_on_error: if cancel_on_error:
@ -215,7 +215,7 @@ class _CloudStorage(BaseStorageV2):
if total_bytes_written > 0: if total_bytes_written > 0:
if self._metric_queue is not None: if self._metric_queue is not None:
self._metric_queue.put('MultipartUploadSuccess', 1) self._metric_queue.put_deprecated('MultipartUploadSuccess', 1)
self._metric_queue.multipart_upload_end.Inc(labelvalues=['success']) self._metric_queue.multipart_upload_end.Inc(labelvalues=['success'])
mp.complete_upload() mp.complete_upload()

View file

@ -48,7 +48,7 @@ class CloudWatchSender(Thread):
connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key) connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key)
except: except:
logger.exception('Failed to connect to CloudWatch.') logger.exception('Failed to connect to CloudWatch.')
self._metrics.enable() self._metrics.enable_deprecated()
while True: while True:
metrics = { metrics = {
@ -59,12 +59,12 @@ class CloudWatchSender(Thread):
'dimensions': [], 'dimensions': [],
} }
metric = self._metrics.get() metric = self._metrics.get_deprecated()
append_metric(metrics, metric) append_metric(metrics, metric)
while len(metrics['name']) < MAX_BATCH_METRICS: while len(metrics['name']) < MAX_BATCH_METRICS:
try: try:
metric = self._metrics.get_nowait() metric = self._metrics.get_nowait_deprecated()
append_metric(metrics, metric) append_metric(metrics, metric)
except Empty: except Empty:
break break
@ -74,7 +74,7 @@ class CloudWatchSender(Thread):
logger.debug('Sent %d CloudWatch metrics', len(metrics['name'])) logger.debug('Sent %d CloudWatch metrics', len(metrics['name']))
except: except:
for i in range(len(metrics['name'])): for i in range(len(metrics['name'])):
self._metrics.put(metrics['name'][i], metrics['value'][i], self._metrics.put_deprecated(metrics['name'][i], metrics['value'][i],
unit=metrics['unit'][i], unit=metrics['unit'][i],
dimensions=metrics['dimensions'][i], dimensions=metrics['dimensions'][i],
timestamp=metrics['timestamp'][i], timestamp=metrics['timestamp'][i],

View file

@ -10,15 +10,19 @@ from flask import g, request
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Buckets for the API response times.
API_BUCKETS = [.01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0] API_RESPONSE_TIME_BUCKETS = [.01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0]
class MetricQueue(object): class MetricQueue(object):
""" Object to which various metrics are written, for distribution to metrics collection
system(s) such Prometheus.
"""
def __init__(self, prom): def __init__(self, prom):
self._queue = None # Define the various exported metrics.
self.resp_time = prom.create_histogram('response_time', 'HTTP response time in seconds', self.resp_time = prom.create_histogram('response_time', 'HTTP response time in seconds',
labelnames=['endpoint'], buckets=API_BUCKETS) labelnames=['endpoint'],
buckets=API_RESPONSE_TIME_BUCKETS)
self.resp_code = prom.create_counter('response_code', 'HTTP response code', self.resp_code = prom.create_counter('response_code', 'HTTP response code',
labelnames=['endpoint', 'code']) labelnames=['endpoint', 'code'])
self.non_200 = prom.create_counter('response_non200', 'Non-200 HTTP response codes', self.non_200 = prom.create_counter('response_non200', 'Non-200 HTTP response codes',
@ -30,11 +34,26 @@ class MetricQueue(object):
self.build_capacity_shortage = prom.create_gauge('build_capacity_shortage', self.build_capacity_shortage = prom.create_gauge('build_capacity_shortage',
'Build capacity shortage.') 'Build capacity shortage.')
self.percent_building = prom.create_gauge('build_percent_building', 'Percent building.') self.percent_building = prom.create_gauge('build_percent_building', 'Percent building.')
self.build_counter = prom.create_counter('builds', 'Number of builds', labelnames=['name'])
self.ephemeral_build_workers = prom.create_counter('ephemeral_build_workers',
'Number of started ephemeral build workers', labelnames=['name', 'build_uuid'])
self.ephemeral_build_worker_failure = prom.create_counter('ephemeral_build_worker_failure',
'Number of failed-to-start ephemeral build workers', labelnames=['build_uuid'])
def enable(self, maxsize=10000): self.work_queue_running = prom.create_gauge('work_queue_running', 'Running items in a queue',
labelnames=['queue_name'])
self.work_queue_available = prom.create_gauge('work_queue_available',
'Available items in a queue',
labelnames=['queue_name'])
# Deprecated: Define an in-memory queue for reporting metrics to CloudWatch or another
# provider.
self._queue = None
def enable_deprecated(self, maxsize=10000):
self._queue = Queue(maxsize) self._queue = Queue(maxsize)
def put(self, name, value, **kwargs): def put_deprecated(self, name, value, **kwargs):
if self._queue is None: if self._queue is None:
logger.debug('No metric queue %s %s %s', name, value, kwargs) logger.debug('No metric queue %s %s %s', name, value, kwargs)
return return
@ -46,23 +65,38 @@ class MetricQueue(object):
except Full: except Full:
logger.error('Metric queue full') logger.error('Metric queue full')
def get(self): def get_deprecated(self):
return self._queue.get() return self._queue.get()
def get_nowait(self): def get_nowait_deprecated(self):
return self._queue.get_nowait() return self._queue.get_nowait()
def time_decorator(name, metric_queue):
""" Decorates an endpoint method to have its request time logged to the metrics queue. """
after = _time_after_request(name, metric_queue)
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
_time_before_request()
rv = func(*args, **kwargs)
after(rv)
return rv
return wrapper
return decorator
def time_blueprint(bp, metric_queue): def time_blueprint(bp, metric_queue):
bp.before_request(time_before_request) """ Decorates a blueprint to have its request time logged to the metrics queue. """
bp.after_request(time_after_request(bp.name, metric_queue)) bp.before_request(_time_before_request)
bp.after_request(_time_after_request(bp.name, metric_queue))
def time_before_request(): def _time_before_request():
g._request_start_time = time.time() g._request_start_time = time.time()
def time_after_request(name, metric_queue): def _time_after_request(name, metric_queue):
def f(r): def f(r):
start = getattr(g, '_request_start_time', None) start = getattr(g, '_request_start_time', None)
if start is None: if start is None:
@ -71,30 +105,18 @@ def time_after_request(name, metric_queue):
dur = time.time() - start dur = time.time() - start
dims = {'endpoint': request.endpoint} dims = {'endpoint': request.endpoint}
metric_queue.put('ResponseTime', dur, dimensions=dims, unit='Seconds') metric_queue.put_deprecated('ResponseTime', dur, dimensions=dims, unit='Seconds')
metric_queue.put('ResponseCode', r.status_code, dimensions=dims) metric_queue.put_deprecated('ResponseCode', r.status_code, dimensions=dims)
metric_queue.resp_time.Observe(dur, labelvalues=[request.endpoint]) metric_queue.resp_time.Observe(dur, labelvalues=[request.endpoint])
metric_queue.resp_code.Inc(labelvalues=[request.endpoint, r.status_code]) metric_queue.resp_code.Inc(labelvalues=[request.endpoint, r.status_code])
if r.status_code >= 500: if r.status_code >= 500:
metric_queue.put('5XXResponse', 1, dimensions={'name': name}) metric_queue.put_deprecated('5XXResponse', 1, dimensions={'name': name})
elif r.status_code < 200 or r.status_code >= 300: elif r.status_code < 200 or r.status_code >= 300:
metric_queue.put('Non200Response', 1, dimensions={'name': name}) metric_queue.put_deprecated('Non200Response', 1, dimensions={'name': name})
metric_queue.non_200.Inc(labelvalues=[request.endpoint]) metric_queue.non_200.Inc(labelvalues=[request.endpoint])
return r return r
return f return f
def time_decorator(name, metric_queue):
after = time_after_request(name, metric_queue)
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
time_before_request()
rv = func(*args, **kwargs)
after(rv)
return rv
return wrapper
return decorator

View file

@ -14,8 +14,89 @@ QUEUE_MAX = 1000
MAX_BATCH_SIZE = 100 MAX_BATCH_SIZE = 100
REGISTER_WAIT = datetime.timedelta(hours=1) REGISTER_WAIT = datetime.timedelta(hours=1)
class PrometheusPlugin(object):
""" Application plugin for reporting metrics to Prometheus. """
def __init__(self, app=None):
self.app = app
if app is not None:
self.state = self.init_app(app)
else:
self.state = None
def init_app(self, app):
prom_url = app.config.get('PROMETHEUS_AGGREGATOR_URL')
logger.debug('Initializing prometheus with aggregator url: %s', prom_url)
prometheus = Prometheus(prom_url)
# register extension with app
app.extensions = getattr(app, 'extensions', {})
app.extensions['prometheus'] = prometheus
return prometheus
def __getattr__(self, name):
return getattr(self.state, name, None)
class Prometheus(object):
""" Aggregator for collecting stats that are reported to Prometheus. """
def __init__(self, url=None):
self._registered = []
self._url = url
if url is not None:
self._queue = Queue(QUEUE_MAX)
self._sender = _QueueSender(self._queue, url, self._registered)
self._sender.start()
logger.debug('Prometheus aggregator sending to %s', url)
else:
self._queue = None
logger.debug('Prometheus aggregator disabled')
def enqueue(self, call, data):
if not self._queue:
return
v = json.dumps({
'Call': call,
'Data': data,
})
if call == 'register':
self._registered.append(v)
return
try:
self._queue.put_nowait(v)
except Full:
# If the queue is full, it is because 1) no aggregator was enabled or 2)
# the aggregator is taking a long time to respond to requests. In the case
# of 1, it's probably enterprise mode and we don't care. In the case of 2,
# the response timeout error is printed at another place. In either case,
# we don't need to print an error here.
pass
def create_gauge(self, *args, **kwargs):
return self._create_collector('Gauge', args, kwargs)
def create_counter(self, *args, **kwargs):
return self._create_collector('Counter', args, kwargs)
def create_summary(self, *args, **kwargs):
return self._create_collector('Summary', args, kwargs)
def create_histogram(self, *args, **kwargs):
return self._create_collector('Histogram', args, kwargs)
def create_untyped(self, *args, **kwargs):
return self._create_collector('Untyped', args, kwargs)
def _create_collector(self, collector_type, args, kwargs):
return _Collector(self.enqueue, collector_type, *args, **kwargs)
class _QueueSender(Thread): class _QueueSender(Thread):
""" Helper class which uses a thread to asynchronously send metrics to the local Prometheus
aggregator. """
def __init__(self, queue, url, registered): def __init__(self, queue, url, registered):
Thread.__init__(self) Thread.__init__(self)
self.daemon = True self.daemon = True
@ -52,83 +133,8 @@ class _QueueSender(Thread):
logger.exception('Failed to write to prometheus aggregator: %s', reqs) logger.exception('Failed to write to prometheus aggregator: %s', reqs)
class Prometheus(object):
def __init__(self, url):
self._registered = []
self._url = url
if url is not None:
self._queue = Queue(QUEUE_MAX)
self._sender = _QueueSender(self._queue, url, self._registered)
self._sender.start()
logger.debug('Prometheus aggregator sending to %s', url)
else:
self._queue = None
logger.debug('Prometheus aggregator disabled')
def enqueue(self, call, data):
if not self._queue:
return
v = json.dumps({
'Call': call,
'Data': data,
})
if call == 'register':
self._registered.append(v)
return
try:
self._queue.put_nowait(v)
except Full:
# If the queue is full, it is because 1) no aggregator was enabled or 2)
# the aggregator is taking a long time to respond to requests. In the case
# of 1, it's probably enterprise mode and we don't care. In the case of 2,
# the response timeout error is printed at another place. In either case,
# we don't need to print an error here.
pass
def _create_collector(self, collector_type, args, kwargs):
return _Collector(self.enqueue, collector_type, *args, **kwargs)
def create_gauge(self, *args, **kwargs):
return self._create_collector('Gauge', args, kwargs)
def create_counter(self, *args, **kwargs):
return self._create_collector('Counter', args, kwargs)
def create_summary(self, *args, **kwargs):
return self._create_collector('Summary', args, kwargs)
def create_histogram(self, *args, **kwargs):
return self._create_collector('Histogram', args, kwargs)
def create_untyped(self, *args, **kwargs):
return self._create_collector('Untyped', args, kwargs)
class PrometheusPlugin(object):
def __init__(self, app=None):
self.app = app
if app is not None:
self.state = self.init_app(app)
else:
self.state = None
def init_app(self, app):
prom_url = app.config.get('PROMETHEUS_AGGREGATOR_URL')
logger.debug('Initializing prometheus with aggregator url: %s', prom_url)
prometheus = Prometheus(prom_url)
# register extension with app
app.extensions = getattr(app, 'extensions', {})
app.extensions['prometheus'] = prometheus
return prometheus
def __getattr__(self, name):
return getattr(self.state, name, None)
class _Collector(object): class _Collector(object):
""" Collector for a Prometheus metric. """
def __init__(self, enqueue_method, c_type, name, c_help, namespace='', subsystem='', **kwargs): def __init__(self, enqueue_method, c_type, name, c_help, namespace='', subsystem='', **kwargs):
self._enqueue_method = enqueue_method self._enqueue_method = enqueue_method
@ -153,5 +159,7 @@ class _Collector(object):
'LabelValues': [str(i) for i in labelvalues], 'LabelValues': [str(i) for i in labelvalues],
'Method': method, 'Method': method,
}) })
self._enqueue_method('put', data) self._enqueue_method('put', data)
return f return f