diff --git a/app.py b/app.py index af325a478..315bb08ca 100644 --- a/app.py +++ b/app.py @@ -23,7 +23,7 @@ from data.billing import Billing from data.buildlogs import BuildLogs from data.archivedlogs import LogArchive from data.userevent import UserEventsBuilderModule -from data.queue import WorkQueue, MetricQueueReporter +from data.queue import WorkQueue, BuildMetricQueueReporter from util import get_app_url from util.saas.prometheus import PrometheusPlugin from util.saas.analytics import Analytics @@ -197,7 +197,7 @@ oauth_apps = [github_login, github_trigger, gitlab_trigger, google_login, dex_lo image_replication_queue = WorkQueue(app.config['REPLICATION_QUEUE_NAME'], tf) dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf, - reporter=MetricQueueReporter(metric_queue)) + reporter=BuildMetricQueueReporter(metric_queue)) notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf) secscan_notification_queue = WorkQueue(app.config['SECSCAN_NOTIFICATION_QUEUE_NAME'], tf) secscan_api = SecurityScannerAPI(app, app.config, storage) diff --git a/buildman/manager/ephemeral.py b/buildman/manager/ephemeral.py index 14b88ef8c..689d1fbea 100644 --- a/buildman/manager/ephemeral.py +++ b/buildman/manager/ephemeral.py @@ -340,7 +340,8 @@ class EphemeralBuilderManager(BaseManager): try: builder_id = yield From(self._executor.start_builder(realm, token, build_uuid)) - metric_queue.put('EC2BuilderStarted', 1, unit='Count') + metric_queue.put_deprecated('EC2BuilderStarted', 1, unit='Count') + metric_queue.ephemeral_build_workers.Inc(labelvalues=[builder_id, build_uuid]) except: logger.exception('Exception when starting builder for job: %s', build_uuid) raise Return(False, EC2_API_TIMEOUT) diff --git a/buildman/manager/executor.py b/buildman/manager/executor.py index 30cd0c6e6..557128ac6 100644 --- a/buildman/manager/executor.py +++ b/buildman/manager/executor.py @@ -152,7 +152,8 @@ class EC2Executor(BuilderExecutor): )) except boto.exception.EC2ResponseError as ec2e: logger.exception('Unable to spawn builder instance') - metric_queue.put('EC2BuildStartFailure', 1, unit='Count') + metric_queue.put_deprecated('EC2BuildStartFailure', 1, unit='Count') + metric_queue.ephemeral_build_worker_failure.Inc(labelvalues=[build_uuid]) raise ec2e if not reservation.instances: diff --git a/buildman/server.py b/buildman/server.py index 8f4690c89..fc8dd54e2 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -18,10 +18,8 @@ from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from data import database from app import app, metric_queue from app import app -from util.prometheus import Counter logger = logging.getLogger(__name__) -build_counter = Counter('builds', 'Number of builds', labelnames=['name']) WORK_CHECK_TIMEOUT = 10 TIMEOUT_PERIOD_MINUTES = 20 @@ -240,5 +238,5 @@ def report_completion_status(status): else: return - metric_queue.put(status_name, 1, unit='Count') - build_counter.Inc(labelvalues=[status_name]) + metric_queue.put_deprecated(status_name, 1, unit='Count') + metric_queue.build_counter.Inc(labelvalues=[status_name]) diff --git a/data/queue.py b/data/queue.py index 44e76ea71..dcb031fa5 100644 --- a/data/queue.py +++ b/data/queue.py @@ -15,21 +15,23 @@ class NoopWith: pass -class MetricQueueReporter(object): +class BuildMetricQueueReporter(object): + """ Metric queue reporter for the build system. """ def __init__(self, metric_queue): self._metric_queue = metric_queue def __call__(self, currently_processing, running_count, total_count): need_capacity_count = total_count - running_count - self._metric_queue.put('BuildCapacityShortage', need_capacity_count, unit='Count') + self._metric_queue.put_deprecated('BuildCapacityShortage', need_capacity_count, unit='Count') self._metric_queue.build_capacity_shortage.Set(need_capacity_count) building_percent = 100 if currently_processing else 0 - self._metric_queue.put('PercentBuilding', building_percent, unit='Percent') + self._metric_queue.put_deprecated('PercentBuilding', building_percent, unit='Percent') self._metric_queue.percent_building.Set(building_percent) class WorkQueue(object): + """ Work queue defines methods for interacting with a queue backed by the database. """ def __init__(self, queue_name, transaction_factory, canonical_name_match_list=None, reporter=None, metric_queue=None): self._queue_name = queue_name @@ -107,9 +109,13 @@ class WorkQueue(object): if self._metric_queue: dim = {'queue': self._queue_name} - self._metric_queue.put('Running', running_count, dimensions=dim) - self._metric_queue.put('AvailableNotRunning', available_not_running_count, dimensions=dim) - self._metric_queue.put('Available', available_count, dimensions=dim) + self._metric_queue.put_deprecated('Running', running_count, dimensions=dim) + self._metric_queue.put_deprecated('AvailableNotRunning', available_not_running_count, + dimensions=dim) + self._metric_queue.put_deprecated('Available', available_count, dimensions=dim) + + self._metric_queue.work_queue_running.set(running_count, labelvalues=[self._queue_name]) + self._metric_queue.work_queue_available.set(available_count, labelvalues=[self._queue_name]) if self._reporter: self._reporter(self._currently_processing, running_count, @@ -141,8 +147,10 @@ class WorkQueue(object): with self._transaction_factory(db): r = str(QueueItem.create(**params).id) + if self._metric_queue: - self._metric_queue.put('Added', 1, dimensions={'queue': self._queue_name}) + self._metric_queue.put_deprecated('Added', 1, dimensions={'queue': self._queue_name}) + return r def get(self, processing_time=300, ordering_required=False): diff --git a/storage/cloud.py b/storage/cloud.py index 6e7aa0576..5ef0249a8 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -14,7 +14,7 @@ from uuid import uuid4 from collections import namedtuple from util.registry import filelike -from storage.basestorage import BaseStorageV2, InvalidChunkException +from storage.basestorage import BaseStorageV2 logger = logging.getLogger(__name__) @@ -163,7 +163,7 @@ class _CloudStorage(BaseStorageV2): metadata['Content-Encoding'] = content_encoding if self._metric_queue is not None: - self._metric_queue.put('MultipartUploadStart', 1) + self._metric_queue.put_deprecated('MultipartUploadStart', 1) self._metric_queue.multipart_upload_start.Inc() return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata, @@ -204,7 +204,7 @@ class _CloudStorage(BaseStorageV2): error = ex if self._metric_queue is not None: - self._metric_queue.put('MultipartUploadFailure', 1) + self._metric_queue.put_deprecated('MultipartUploadFailure', 1) self._metric_queue.multipart_upload_end.Inc(labelvalues=['failure']) if cancel_on_error: @@ -215,7 +215,7 @@ class _CloudStorage(BaseStorageV2): if total_bytes_written > 0: if self._metric_queue is not None: - self._metric_queue.put('MultipartUploadSuccess', 1) + self._metric_queue.put_deprecated('MultipartUploadSuccess', 1) self._metric_queue.multipart_upload_end.Inc(labelvalues=['success']) mp.complete_upload() diff --git a/util/saas/cloudwatch.py b/util/saas/cloudwatch.py index 1f2a412dd..7f7fc3ad3 100644 --- a/util/saas/cloudwatch.py +++ b/util/saas/cloudwatch.py @@ -48,7 +48,7 @@ class CloudWatchSender(Thread): connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key) except: logger.exception('Failed to connect to CloudWatch.') - self._metrics.enable() + self._metrics.enable_deprecated() while True: metrics = { @@ -59,12 +59,12 @@ class CloudWatchSender(Thread): 'dimensions': [], } - metric = self._metrics.get() + metric = self._metrics.get_deprecated() append_metric(metrics, metric) while len(metrics['name']) < MAX_BATCH_METRICS: try: - metric = self._metrics.get_nowait() + metric = self._metrics.get_nowait_deprecated() append_metric(metrics, metric) except Empty: break @@ -74,7 +74,7 @@ class CloudWatchSender(Thread): logger.debug('Sent %d CloudWatch metrics', len(metrics['name'])) except: for i in range(len(metrics['name'])): - self._metrics.put(metrics['name'][i], metrics['value'][i], + self._metrics.put_deprecated(metrics['name'][i], metrics['value'][i], unit=metrics['unit'][i], dimensions=metrics['dimensions'][i], timestamp=metrics['timestamp'][i], diff --git a/util/saas/metricqueue.py b/util/saas/metricqueue.py index aa09d848e..303f677ef 100644 --- a/util/saas/metricqueue.py +++ b/util/saas/metricqueue.py @@ -10,15 +10,19 @@ from flask import g, request logger = logging.getLogger(__name__) - -API_BUCKETS = [.01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0] +# Buckets for the API response times. +API_RESPONSE_TIME_BUCKETS = [.01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0] class MetricQueue(object): + """ Object to which various metrics are written, for distribution to metrics collection + system(s) such Prometheus. + """ def __init__(self, prom): - self._queue = None + # Define the various exported metrics. self.resp_time = prom.create_histogram('response_time', 'HTTP response time in seconds', - labelnames=['endpoint'], buckets=API_BUCKETS) + labelnames=['endpoint'], + buckets=API_RESPONSE_TIME_BUCKETS) self.resp_code = prom.create_counter('response_code', 'HTTP response code', labelnames=['endpoint', 'code']) self.non_200 = prom.create_counter('response_non200', 'Non-200 HTTP response codes', @@ -30,11 +34,26 @@ class MetricQueue(object): self.build_capacity_shortage = prom.create_gauge('build_capacity_shortage', 'Build capacity shortage.') self.percent_building = prom.create_gauge('build_percent_building', 'Percent building.') + self.build_counter = prom.create_counter('builds', 'Number of builds', labelnames=['name']) + self.ephemeral_build_workers = prom.create_counter('ephemeral_build_workers', + 'Number of started ephemeral build workers', labelnames=['name', 'build_uuid']) + self.ephemeral_build_worker_failure = prom.create_counter('ephemeral_build_worker_failure', + 'Number of failed-to-start ephemeral build workers', labelnames=['build_uuid']) - def enable(self, maxsize=10000): + self.work_queue_running = prom.create_gauge('work_queue_running', 'Running items in a queue', + labelnames=['queue_name']) + self.work_queue_available = prom.create_gauge('work_queue_available', + 'Available items in a queue', + labelnames=['queue_name']) + + # Deprecated: Define an in-memory queue for reporting metrics to CloudWatch or another + # provider. + self._queue = None + + def enable_deprecated(self, maxsize=10000): self._queue = Queue(maxsize) - def put(self, name, value, **kwargs): + def put_deprecated(self, name, value, **kwargs): if self._queue is None: logger.debug('No metric queue %s %s %s', name, value, kwargs) return @@ -46,23 +65,38 @@ class MetricQueue(object): except Full: logger.error('Metric queue full') - def get(self): + def get_deprecated(self): return self._queue.get() - def get_nowait(self): + def get_nowait_deprecated(self): return self._queue.get_nowait() +def time_decorator(name, metric_queue): + """ Decorates an endpoint method to have its request time logged to the metrics queue. """ + after = _time_after_request(name, metric_queue) + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + _time_before_request() + rv = func(*args, **kwargs) + after(rv) + return rv + return wrapper + return decorator + + def time_blueprint(bp, metric_queue): - bp.before_request(time_before_request) - bp.after_request(time_after_request(bp.name, metric_queue)) + """ Decorates a blueprint to have its request time logged to the metrics queue. """ + bp.before_request(_time_before_request) + bp.after_request(_time_after_request(bp.name, metric_queue)) -def time_before_request(): +def _time_before_request(): g._request_start_time = time.time() -def time_after_request(name, metric_queue): +def _time_after_request(name, metric_queue): def f(r): start = getattr(g, '_request_start_time', None) if start is None: @@ -71,30 +105,18 @@ def time_after_request(name, metric_queue): dur = time.time() - start dims = {'endpoint': request.endpoint} - metric_queue.put('ResponseTime', dur, dimensions=dims, unit='Seconds') - metric_queue.put('ResponseCode', r.status_code, dimensions=dims) + metric_queue.put_deprecated('ResponseTime', dur, dimensions=dims, unit='Seconds') + metric_queue.put_deprecated('ResponseCode', r.status_code, dimensions=dims) metric_queue.resp_time.Observe(dur, labelvalues=[request.endpoint]) metric_queue.resp_code.Inc(labelvalues=[request.endpoint, r.status_code]) if r.status_code >= 500: - metric_queue.put('5XXResponse', 1, dimensions={'name': name}) + metric_queue.put_deprecated('5XXResponse', 1, dimensions={'name': name}) elif r.status_code < 200 or r.status_code >= 300: - metric_queue.put('Non200Response', 1, dimensions={'name': name}) + metric_queue.put_deprecated('Non200Response', 1, dimensions={'name': name}) metric_queue.non_200.Inc(labelvalues=[request.endpoint]) return r return f - -def time_decorator(name, metric_queue): - after = time_after_request(name, metric_queue) - def decorator(func): - @wraps(func) - def wrapper(*args, **kwargs): - time_before_request() - rv = func(*args, **kwargs) - after(rv) - return rv - return wrapper - return decorator diff --git a/util/saas/prometheus.py b/util/saas/prometheus.py index ae5cf264e..80f7f553a 100644 --- a/util/saas/prometheus.py +++ b/util/saas/prometheus.py @@ -14,8 +14,89 @@ QUEUE_MAX = 1000 MAX_BATCH_SIZE = 100 REGISTER_WAIT = datetime.timedelta(hours=1) +class PrometheusPlugin(object): + """ Application plugin for reporting metrics to Prometheus. """ + def __init__(self, app=None): + self.app = app + if app is not None: + self.state = self.init_app(app) + else: + self.state = None + + def init_app(self, app): + prom_url = app.config.get('PROMETHEUS_AGGREGATOR_URL') + logger.debug('Initializing prometheus with aggregator url: %s', prom_url) + prometheus = Prometheus(prom_url) + + # register extension with app + app.extensions = getattr(app, 'extensions', {}) + app.extensions['prometheus'] = prometheus + return prometheus + + def __getattr__(self, name): + return getattr(self.state, name, None) + + +class Prometheus(object): + """ Aggregator for collecting stats that are reported to Prometheus. """ + def __init__(self, url=None): + self._registered = [] + self._url = url + + if url is not None: + self._queue = Queue(QUEUE_MAX) + self._sender = _QueueSender(self._queue, url, self._registered) + self._sender.start() + logger.debug('Prometheus aggregator sending to %s', url) + else: + self._queue = None + logger.debug('Prometheus aggregator disabled') + + def enqueue(self, call, data): + if not self._queue: + return + + v = json.dumps({ + 'Call': call, + 'Data': data, + }) + + if call == 'register': + self._registered.append(v) + return + + try: + self._queue.put_nowait(v) + except Full: + # If the queue is full, it is because 1) no aggregator was enabled or 2) + # the aggregator is taking a long time to respond to requests. In the case + # of 1, it's probably enterprise mode and we don't care. In the case of 2, + # the response timeout error is printed at another place. In either case, + # we don't need to print an error here. + pass + + def create_gauge(self, *args, **kwargs): + return self._create_collector('Gauge', args, kwargs) + + def create_counter(self, *args, **kwargs): + return self._create_collector('Counter', args, kwargs) + + def create_summary(self, *args, **kwargs): + return self._create_collector('Summary', args, kwargs) + + def create_histogram(self, *args, **kwargs): + return self._create_collector('Histogram', args, kwargs) + + def create_untyped(self, *args, **kwargs): + return self._create_collector('Untyped', args, kwargs) + + def _create_collector(self, collector_type, args, kwargs): + return _Collector(self.enqueue, collector_type, *args, **kwargs) + class _QueueSender(Thread): + """ Helper class which uses a thread to asynchronously send metrics to the local Prometheus + aggregator. """ def __init__(self, queue, url, registered): Thread.__init__(self) self.daemon = True @@ -52,83 +133,8 @@ class _QueueSender(Thread): logger.exception('Failed to write to prometheus aggregator: %s', reqs) -class Prometheus(object): - def __init__(self, url): - self._registered = [] - self._url = url - - if url is not None: - self._queue = Queue(QUEUE_MAX) - self._sender = _QueueSender(self._queue, url, self._registered) - self._sender.start() - logger.debug('Prometheus aggregator sending to %s', url) - else: - self._queue = None - logger.debug('Prometheus aggregator disabled') - - def enqueue(self, call, data): - if not self._queue: - return - - v = json.dumps({ - 'Call': call, - 'Data': data, - }) - if call == 'register': - self._registered.append(v) - return - try: - self._queue.put_nowait(v) - except Full: - # If the queue is full, it is because 1) no aggregator was enabled or 2) - # the aggregator is taking a long time to respond to requests. In the case - # of 1, it's probably enterprise mode and we don't care. In the case of 2, - # the response timeout error is printed at another place. In either case, - # we don't need to print an error here. - pass - - def _create_collector(self, collector_type, args, kwargs): - return _Collector(self.enqueue, collector_type, *args, **kwargs) - - def create_gauge(self, *args, **kwargs): - return self._create_collector('Gauge', args, kwargs) - - def create_counter(self, *args, **kwargs): - return self._create_collector('Counter', args, kwargs) - - def create_summary(self, *args, **kwargs): - return self._create_collector('Summary', args, kwargs) - - def create_histogram(self, *args, **kwargs): - return self._create_collector('Histogram', args, kwargs) - - def create_untyped(self, *args, **kwargs): - return self._create_collector('Untyped', args, kwargs) - - -class PrometheusPlugin(object): - def __init__(self, app=None): - self.app = app - if app is not None: - self.state = self.init_app(app) - else: - self.state = None - - def init_app(self, app): - prom_url = app.config.get('PROMETHEUS_AGGREGATOR_URL') - logger.debug('Initializing prometheus with aggregator url: %s', prom_url) - prometheus = Prometheus(prom_url) - - # register extension with app - app.extensions = getattr(app, 'extensions', {}) - app.extensions['prometheus'] = prometheus - return prometheus - - def __getattr__(self, name): - return getattr(self.state, name, None) - - class _Collector(object): + """ Collector for a Prometheus metric. """ def __init__(self, enqueue_method, c_type, name, c_help, namespace='', subsystem='', **kwargs): self._enqueue_method = enqueue_method @@ -153,5 +159,7 @@ class _Collector(object): 'LabelValues': [str(i) for i in labelvalues], 'Method': method, }) + self._enqueue_method('put', data) + return f