Merge pull request #1596 from coreos-inc/prometheus-client

Prometheus support
This commit is contained in:
josephschorr 2016-07-05 15:45:40 -04:00 committed by GitHub
commit c90878ff04
20 changed files with 369 additions and 106 deletions

View file

@ -64,6 +64,10 @@ RUN curl -O https://storage.googleapis.com/golang/go1.6.linux-amd64.tar.gz && \
RUN curl -L -o /usr/local/bin/jwtproxy https://github.com/coreos/jwtproxy/releases/download/v0.0.1/jwtproxy-linux-x64 RUN curl -L -o /usr/local/bin/jwtproxy https://github.com/coreos/jwtproxy/releases/download/v0.0.1/jwtproxy-linux-x64
RUN chmod +x /usr/local/bin/jwtproxy RUN chmod +x /usr/local/bin/jwtproxy
# Install prometheus-aggregator
RUN curl -L -o /usr/local/bin/prometheus-aggregator https://github.com/coreos/prometheus-aggregator/releases/download/v0.0.1-alpha/prometheus-aggregator
RUN chmod +x /usr/local/bin/prometheus-aggregator
# Install Grunt # Install Grunt
RUN ln -s /usr/bin/nodejs /usr/bin/node RUN ln -s /usr/bin/nodejs /usr/bin/node
RUN npm install -g grunt-cli RUN npm install -g grunt-cli

11
app.py
View file

@ -23,7 +23,7 @@ from data.billing import Billing
from data.buildlogs import BuildLogs from data.buildlogs import BuildLogs
from data.archivedlogs import LogArchive from data.archivedlogs import LogArchive
from data.userevent import UserEventsBuilderModule from data.userevent import UserEventsBuilderModule
from data.queue import WorkQueue, MetricQueueReporter from data.queue import WorkQueue, BuildMetricQueueReporter
from util import get_app_url from util import get_app_url
from util.saas.analytics import Analytics from util.saas.analytics import Analytics
from util.saas.exceptionlog import Sentry from util.saas.exceptionlog import Sentry
@ -34,11 +34,12 @@ from util.config.oauth import (GoogleOAuthConfig, GithubOAuthConfig, GitLabOAuth
from util.security.signing import Signer from util.security.signing import Signer
from util.security.instancekeys import InstanceKeys from util.security.instancekeys import InstanceKeys
from util.saas.cloudwatch import start_cloudwatch_sender from util.saas.cloudwatch import start_cloudwatch_sender
from util.saas.metricqueue import MetricQueue
from util.config.provider import get_config_provider from util.config.provider import get_config_provider
from util.config.configutil import generate_secret_key from util.config.configutil import generate_secret_key
from util.config.superusermanager import SuperUserManager from util.config.superusermanager import SuperUserManager
from util.secscan.api import SecurityScannerAPI from util.secscan.api import SecurityScannerAPI
from util.metrics.metricqueue import MetricQueue
from util.metrics.prometheus import PrometheusPlugin
OVERRIDE_CONFIG_DIRECTORY = 'conf/stack/' OVERRIDE_CONFIG_DIRECTORY = 'conf/stack/'
OVERRIDE_CONFIG_YAML_FILENAME = 'conf/stack/config.yaml' OVERRIDE_CONFIG_YAML_FILENAME = 'conf/stack/config.yaml'
@ -167,7 +168,8 @@ Principal(app, use_sessions=False)
avatar = Avatar(app) avatar = Avatar(app)
login_manager = LoginManager(app) login_manager = LoginManager(app)
mail = Mail(app) mail = Mail(app)
metric_queue = MetricQueue() prometheus = PrometheusPlugin(app)
metric_queue = MetricQueue(prometheus)
storage = Storage(app, metric_queue) storage = Storage(app, metric_queue)
userfiles = Userfiles(app, storage) userfiles = Userfiles(app, storage)
log_archive = LogArchive(app, storage) log_archive = LogArchive(app, storage)
@ -195,7 +197,7 @@ oauth_apps = [github_login, github_trigger, gitlab_trigger, google_login, dex_lo
image_replication_queue = WorkQueue(app.config['REPLICATION_QUEUE_NAME'], tf) image_replication_queue = WorkQueue(app.config['REPLICATION_QUEUE_NAME'], tf)
dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf, dockerfile_build_queue = WorkQueue(app.config['DOCKERFILE_BUILD_QUEUE_NAME'], tf,
reporter=MetricQueueReporter(metric_queue)) reporter=BuildMetricQueueReporter(metric_queue))
notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf) notification_queue = WorkQueue(app.config['NOTIFICATION_QUEUE_NAME'], tf)
secscan_notification_queue = WorkQueue(app.config['SECSCAN_NOTIFICATION_QUEUE_NAME'], tf) secscan_notification_queue = WorkQueue(app.config['SECSCAN_NOTIFICATION_QUEUE_NAME'], tf)
secscan_api = SecurityScannerAPI(app, app.config, storage) secscan_api = SecurityScannerAPI(app, app.config, storage)
@ -207,6 +209,7 @@ if os.path.exists(_v2_key_path):
else: else:
docker_v2_signing_key = RSAKey(key=RSA.generate(2048)) docker_v2_signing_key = RSAKey(key=RSA.generate(2048))
database.configure(app.config) database.configure(app.config)
model.config.app_config = app.config model.config.app_config = app.config
model.config.store = storage model.config.store = storage

View file

@ -340,7 +340,8 @@ class EphemeralBuilderManager(BaseManager):
try: try:
builder_id = yield From(self._executor.start_builder(realm, token, build_uuid)) builder_id = yield From(self._executor.start_builder(realm, token, build_uuid))
metric_queue.put('EC2BuilderStarted', 1, unit='Count') metric_queue.put_deprecated('EC2BuilderStarted', 1, unit='Count')
metric_queue.ephemeral_build_workers.Inc(labelvalues=[builder_id, build_uuid])
except: except:
logger.exception('Exception when starting builder for job: %s', build_uuid) logger.exception('Exception when starting builder for job: %s', build_uuid)
raise Return(False, EC2_API_TIMEOUT) raise Return(False, EC2_API_TIMEOUT)

View file

@ -152,7 +152,8 @@ class EC2Executor(BuilderExecutor):
)) ))
except boto.exception.EC2ResponseError as ec2e: except boto.exception.EC2ResponseError as ec2e:
logger.exception('Unable to spawn builder instance') logger.exception('Unable to spawn builder instance')
metric_queue.put('EC2BuildStartFailure', 1, unit='Count') metric_queue.put_deprecated('EC2BuildStartFailure', 1, unit='Count')
metric_queue.ephemeral_build_worker_failure.Inc(labelvalues=[build_uuid])
raise ec2e raise ec2e
if not reservation.instances: if not reservation.instances:

View file

@ -17,6 +17,7 @@ from buildman.jobutil.buildstatus import StatusHandler
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
from data import database from data import database
from app import app, metric_queue from app import app, metric_queue
from app import app
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -237,4 +238,5 @@ def report_completion_status(status):
else: else:
return return
metric_queue.put(status_name, 1, unit='Count') metric_queue.put_deprecated(status_name, 1, unit='Count')
metric_queue.build_counter.Inc(labelvalues=[status_name])

View file

@ -0,0 +1,7 @@
#!/bin/sh
# Ensure dependencies start before the logger
sv check syslog-ng > /dev/null || exit 1
# Start the logger
exec logger -i -t prometheus-aggregator

View file

@ -0,0 +1,7 @@
#! /bin/bash
echo 'Starting prometheus aggregator'
/usr/local/bin/prometheus-aggregator
echo 'Prometheus aggregator exited'

View file

@ -355,3 +355,5 @@ class DefaultConfig(object):
# The whitelist of client IDs for OAuth applications that allow for direct login. # The whitelist of client IDs for OAuth applications that allow for direct login.
DIRECT_OAUTH_CLIENTID_WHITELIST = [] DIRECT_OAUTH_CLIENTID_WHITELIST = []
# URL that specifies the location of the prometheus stats aggregator.
PROMETHEUS_AGGREGATOR_URL = 'http://localhost:9092'

View file

@ -6,6 +6,7 @@ from util.morecollections import AttrDict
MINIMUM_EXTENSION = timedelta(seconds=20) MINIMUM_EXTENSION = timedelta(seconds=20)
class NoopWith: class NoopWith:
def __enter__(self): def __enter__(self):
pass pass
@ -13,18 +14,24 @@ class NoopWith:
def __exit__(self, type, value, traceback): def __exit__(self, type, value, traceback):
pass pass
class MetricQueueReporter(object):
class BuildMetricQueueReporter(object):
""" Metric queue reporter for the build system. """
def __init__(self, metric_queue): def __init__(self, metric_queue):
self._metric_queue = metric_queue self._metric_queue = metric_queue
def __call__(self, currently_processing, running_count, total_count): def __call__(self, currently_processing, running_count, total_count):
need_capacity_count = total_count - running_count need_capacity_count = total_count - running_count
self._metric_queue.put('BuildCapacityShortage', need_capacity_count, unit='Count') self._metric_queue.put_deprecated('BuildCapacityShortage', need_capacity_count, unit='Count')
self._metric_queue.build_capacity_shortage.Set(need_capacity_count)
building_percent = 100 if currently_processing else 0 building_percent = 100 if currently_processing else 0
self._metric_queue.put('PercentBuilding', building_percent, unit='Percent') self._metric_queue.put_deprecated('PercentBuilding', building_percent, unit='Percent')
self._metric_queue.percent_building.Set(building_percent)
class WorkQueue(object): class WorkQueue(object):
""" Work queue defines methods for interacting with a queue backed by the database. """
def __init__(self, queue_name, transaction_factory, def __init__(self, queue_name, transaction_factory,
canonical_name_match_list=None, reporter=None, metric_queue=None): canonical_name_match_list=None, reporter=None, metric_queue=None):
self._queue_name = queue_name self._queue_name = queue_name
@ -102,9 +109,13 @@ class WorkQueue(object):
if self._metric_queue: if self._metric_queue:
dim = {'queue': self._queue_name} dim = {'queue': self._queue_name}
self._metric_queue.put('Running', running_count, dimensions=dim) self._metric_queue.put_deprecated('Running', running_count, dimensions=dim)
self._metric_queue.put('AvailableNotRunning', available_not_running_count, dimensions=dim) self._metric_queue.put_deprecated('AvailableNotRunning', available_not_running_count,
self._metric_queue.put('Available', available_count, dimensions=dim) dimensions=dim)
self._metric_queue.put_deprecated('Available', available_count, dimensions=dim)
self._metric_queue.work_queue_running.set(running_count, labelvalues=[self._queue_name])
self._metric_queue.work_queue_available.set(available_count, labelvalues=[self._queue_name])
if self._reporter: if self._reporter:
self._reporter(self._currently_processing, running_count, self._reporter(self._currently_processing, running_count,
@ -136,8 +147,10 @@ class WorkQueue(object):
with self._transaction_factory(db): with self._transaction_factory(db):
r = str(QueueItem.create(**params).id) r = str(QueueItem.create(**params).id)
if self._metric_queue: if self._metric_queue:
self._metric_queue.put('Added', 1, dimensions={'queue': self._queue_name}) self._metric_queue.put_deprecated('Added', 1, dimensions={'queue': self._queue_name})
return r return r
def get(self, processing_time=300, ordering_required=False): def get(self, processing_time=300, ordering_required=False):

10
dev.df
View file

@ -1,6 +1,6 @@
# vim:ft=dockerfile # vim:ft=dockerfile
FROM phusion/baseimage:0.9.16 FROM phusion/baseimage:0.9.18
ENV DEBIAN_FRONTEND noninteractive ENV DEBIAN_FRONTEND noninteractive
ENV HOME /root ENV HOME /root
@ -16,11 +16,13 @@ ADD requirements.txt requirements.txt
RUN virtualenv --distribute venv RUN virtualenv --distribute venv
RUN venv/bin/pip install -r requirements.txt RUN venv/bin/pip install -r requirements.txt
WORKDIR /src/quay ARG src_subdir
ENV PYTHONPATH=/
ENV PATH=/venv/bin:$PATH
RUN apt-key adv --keyserver hkp://pgp.mit.edu:80 --recv-keys 58118E89F3A912897C070ADBF76221572C52609D \ RUN apt-key adv --keyserver hkp://pgp.mit.edu:80 --recv-keys 58118E89F3A912897C070ADBF76221572C52609D \
&& echo "deb https://apt.dockerproject.org/repo ubuntu-trusty main" > /etc/apt/sources.list.d/docker.list \ && echo "deb https://apt.dockerproject.org/repo ubuntu-trusty main" > /etc/apt/sources.list.d/docker.list \
&& apt-get update \ && apt-get update \
&& apt-get install -y docker-engine && apt-get install -y docker-engine
ENV PYTHONPATH=/
ENV PATH=/venv/bin:$PATH
WORKDIR /src/$src_subdir

View file

@ -23,7 +23,7 @@ from auth.auth import process_oauth
from endpoints.csrf import csrf_protect from endpoints.csrf import csrf_protect
from endpoints.exception import ApiException, Unauthorized, InvalidRequest, InvalidResponse, FreshLoginRequired from endpoints.exception import ApiException, Unauthorized, InvalidRequest, InvalidResponse, FreshLoginRequired
from endpoints.decorators import check_anon_protection from endpoints.decorators import check_anon_protection
from util.saas.metricqueue import time_decorator from util.metrics.metricqueue import time_decorator
from util.pagination import encrypt_page_token, decrypt_page_token from util.pagination import encrypt_page_token, decrypt_page_token
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View file

@ -2,7 +2,7 @@ from flask import Blueprint, make_response
from app import metric_queue from app import metric_queue
from endpoints.decorators import anon_protect, anon_allowed from endpoints.decorators import anon_protect, anon_allowed
from util.saas.metricqueue import time_blueprint from util.metrics.metricqueue import time_blueprint
v1_bp = Blueprint('v1', __name__) v1_bp = Blueprint('v1', __name__)

View file

@ -18,7 +18,7 @@ from endpoints.decorators import anon_protect, anon_allowed
from endpoints.v2.errors import V2RegistryException, Unauthorized from endpoints.v2.errors import V2RegistryException, Unauthorized
from util.http import abort from util.http import abort
from util.registry.dockerver import docker_version from util.registry.dockerver import docker_version
from util.saas.metricqueue import time_blueprint from util.metrics.metricqueue import time_blueprint
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
v2_bp = Blueprint('v2', __name__) v2_bp = Blueprint('v2', __name__)

View file

@ -6,7 +6,7 @@ REPO=quay.io/quay/quay-dev
d () d ()
{ {
docker build -t $REPO -f dev.df . docker build -t $REPO -f dev.df --build-arg src_subdir=$(basename `pwd`) .
docker -- run --rm -v /var/run/docker.sock:/run/docker.sock -it --net=host -v $(pwd)/..:/src $REPO $* docker -- run --rm -v /var/run/docker.sock:/run/docker.sock -it --net=host -v $(pwd)/..:/src $REPO $*
} }
@ -30,6 +30,11 @@ initdb)
fulldbtest) fulldbtest)
d bash /src/quay/test/fulldbtest.sh d bash /src/quay/test/fulldbtest.sh
;; ;;
prom)
R=quay.io/quay/prom-monitor
docker build -t $R prom_aggregator
docker run --rm -it --net=host $R -loglevel=debug
;;
*) *)
echo "unknown option" echo "unknown option"
exit 1 exit 1

View file

@ -14,12 +14,11 @@ from uuid import uuid4
from collections import namedtuple from collections import namedtuple
from util.registry import filelike from util.registry import filelike
from storage.basestorage import BaseStorageV2, InvalidChunkException from storage.basestorage import BaseStorageV2
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length']) _PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length'])
_CHUNKS_KEY = 'chunks' _CHUNKS_KEY = 'chunks'
@ -164,7 +163,8 @@ class _CloudStorage(BaseStorageV2):
metadata['Content-Encoding'] = content_encoding metadata['Content-Encoding'] = content_encoding
if self._metric_queue is not None: if self._metric_queue is not None:
self._metric_queue.put('MultipartUploadStart', 1) self._metric_queue.put_deprecated('MultipartUploadStart', 1)
self._metric_queue.multipart_upload_start.Inc()
return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata, return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
**self._upload_params) **self._upload_params)
@ -204,7 +204,8 @@ class _CloudStorage(BaseStorageV2):
error = ex error = ex
if self._metric_queue is not None: if self._metric_queue is not None:
self._metric_queue.put('MultipartUploadFailure', 1) self._metric_queue.put_deprecated('MultipartUploadFailure', 1)
self._metric_queue.multipart_upload_end.Inc(labelvalues=['failure'])
if cancel_on_error: if cancel_on_error:
mp.cancel_upload() mp.cancel_upload()
@ -214,9 +215,11 @@ class _CloudStorage(BaseStorageV2):
if total_bytes_written > 0: if total_bytes_written > 0:
if self._metric_queue is not None: if self._metric_queue is not None:
self._metric_queue.put('MultipartUploadSuccess', 1) self._metric_queue.put_deprecated('MultipartUploadSuccess', 1)
self._metric_queue.multipart_upload_end.Inc(labelvalues=['success'])
mp.complete_upload() mp.complete_upload()
return total_bytes_written, error return total_bytes_written, error
def exists(self, path): def exists(self, path):

0
util/metrics/__init__.py Normal file
View file

122
util/metrics/metricqueue.py Normal file
View file

@ -0,0 +1,122 @@
import datetime
import logging
import time
from functools import wraps
from Queue import Queue, Full
from flask import g, request
logger = logging.getLogger(__name__)
# Buckets for the API response times.
API_RESPONSE_TIME_BUCKETS = [.01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0]
class MetricQueue(object):
""" Object to which various metrics are written, for distribution to metrics collection
system(s) such Prometheus.
"""
def __init__(self, prom):
# Define the various exported metrics.
self.resp_time = prom.create_histogram('response_time', 'HTTP response time in seconds',
labelnames=['endpoint'],
buckets=API_RESPONSE_TIME_BUCKETS)
self.resp_code = prom.create_counter('response_code', 'HTTP response code',
labelnames=['endpoint', 'code'])
self.non_200 = prom.create_counter('response_non200', 'Non-200 HTTP response codes',
labelnames=['endpoint'])
self.multipart_upload_start = prom.create_counter('multipart_upload_start',
'Multipart upload startse')
self.multipart_upload_end = prom.create_counter('self._metric_queue.multipart_upload_end',
'Multipart upload ends.', labelnames=['type'])
self.build_capacity_shortage = prom.create_gauge('build_capacity_shortage',
'Build capacity shortage.')
self.percent_building = prom.create_gauge('build_percent_building', 'Percent building.')
self.build_counter = prom.create_counter('builds', 'Number of builds', labelnames=['name'])
self.ephemeral_build_workers = prom.create_counter('ephemeral_build_workers',
'Number of started ephemeral build workers', labelnames=['name', 'build_uuid'])
self.ephemeral_build_worker_failure = prom.create_counter('ephemeral_build_worker_failure',
'Number of failed-to-start ephemeral build workers', labelnames=['build_uuid'])
self.work_queue_running = prom.create_gauge('work_queue_running', 'Running items in a queue',
labelnames=['queue_name'])
self.work_queue_available = prom.create_gauge('work_queue_available',
'Available items in a queue',
labelnames=['queue_name'])
# Deprecated: Define an in-memory queue for reporting metrics to CloudWatch or another
# provider.
self._queue = None
def enable_deprecated(self, maxsize=10000):
self._queue = Queue(maxsize)
def put_deprecated(self, name, value, **kwargs):
if self._queue is None:
logger.debug('No metric queue %s %s %s', name, value, kwargs)
return
try:
kwargs.setdefault('timestamp', datetime.datetime.now())
kwargs.setdefault('dimensions', {})
self._queue.put_nowait((name, value, kwargs))
except Full:
logger.error('Metric queue full')
def get_deprecated(self):
return self._queue.get()
def get_nowait_deprecated(self):
return self._queue.get_nowait()
def time_decorator(name, metric_queue):
""" Decorates an endpoint method to have its request time logged to the metrics queue. """
after = _time_after_request(name, metric_queue)
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
_time_before_request()
rv = func(*args, **kwargs)
after(rv)
return rv
return wrapper
return decorator
def time_blueprint(bp, metric_queue):
""" Decorates a blueprint to have its request time logged to the metrics queue. """
bp.before_request(_time_before_request)
bp.after_request(_time_after_request(bp.name, metric_queue))
def _time_before_request():
g._request_start_time = time.time()
def _time_after_request(name, metric_queue):
def f(r):
start = getattr(g, '_request_start_time', None)
if start is None:
return r
dur = time.time() - start
dims = {'endpoint': request.endpoint}
metric_queue.put_deprecated('ResponseTime', dur, dimensions=dims, unit='Seconds')
metric_queue.put_deprecated('ResponseCode', r.status_code, dimensions=dims)
metric_queue.resp_time.Observe(dur, labelvalues=[request.endpoint])
metric_queue.resp_code.Inc(labelvalues=[request.endpoint, r.status_code])
if r.status_code >= 500:
metric_queue.put_deprecated('5XXResponse', 1, dimensions={'name': name})
elif r.status_code < 200 or r.status_code >= 300:
metric_queue.put_deprecated('Non200Response', 1, dimensions={'name': name})
metric_queue.non_200.Inc(labelvalues=[request.endpoint])
return r
return f

166
util/metrics/prometheus.py Normal file
View file

@ -0,0 +1,166 @@
import datetime
import json
import logging
from Queue import Queue, Full, Empty
from threading import Thread
import requests
logger = logging.getLogger(__name__)
QUEUE_MAX = 1000
MAX_BATCH_SIZE = 100
REGISTER_WAIT = datetime.timedelta(hours=1)
class PrometheusPlugin(object):
""" Application plugin for reporting metrics to Prometheus. """
def __init__(self, app=None):
self.app = app
if app is not None:
self.state = self.init_app(app)
else:
self.state = None
def init_app(self, app):
prom_url = app.config.get('PROMETHEUS_AGGREGATOR_URL')
logger.debug('Initializing prometheus with aggregator url: %s', prom_url)
prometheus = Prometheus(prom_url)
# register extension with app
app.extensions = getattr(app, 'extensions', {})
app.extensions['prometheus'] = prometheus
return prometheus
def __getattr__(self, name):
return getattr(self.state, name, None)
class Prometheus(object):
""" Aggregator for collecting stats that are reported to Prometheus. """
def __init__(self, url=None):
self._metric_collectors = []
self._url = url
if url is not None:
self._queue = Queue(QUEUE_MAX)
self._sender = _QueueSender(self._queue, url, self._metric_collectors)
self._sender.start()
logger.debug('Prometheus aggregator sending to %s', url)
else:
self._queue = None
logger.debug('Prometheus aggregator disabled')
def enqueue(self, call, data):
if not self._queue:
return
v = json.dumps({
'Call': call,
'Data': data,
})
if call == 'register':
self._metric_collectors.append(v)
return
try:
self._queue.put_nowait(v)
except Full:
# If the queue is full, it is because 1) no aggregator was enabled or 2)
# the aggregator is taking a long time to respond to requests. In the case
# of 1, it's probably enterprise mode and we don't care. In the case of 2,
# the response timeout error is printed inside the queue handler. In either case,
# we don't need to print an error here.
pass
def create_gauge(self, *args, **kwargs):
return self._create_collector('Gauge', args, kwargs)
def create_counter(self, *args, **kwargs):
return self._create_collector('Counter', args, kwargs)
def create_summary(self, *args, **kwargs):
return self._create_collector('Summary', args, kwargs)
def create_histogram(self, *args, **kwargs):
return self._create_collector('Histogram', args, kwargs)
def create_untyped(self, *args, **kwargs):
return self._create_collector('Untyped', args, kwargs)
def _create_collector(self, collector_type, args, kwargs):
return _Collector(self.enqueue, collector_type, *args, **kwargs)
class _QueueSender(Thread):
""" Helper class which uses a thread to asynchronously send metrics to the local Prometheus
aggregator. """
def __init__(self, queue, url, metric_collectors):
Thread.__init__(self)
self.daemon = True
self.next_register = datetime.datetime.now()
self._queue = queue
self._url = url
self._metric_collectors = metric_collectors
def run(self):
while True:
reqs = []
reqs.append(self._queue.get())
while len(reqs) < MAX_BATCH_SIZE:
try:
req = self._queue.get_nowait()
reqs.append(req)
except Empty:
break
try:
resp = requests.post(self._url + '/call', '\n'.join(reqs))
if resp.status_code == 500 and self.next_register <= datetime.datetime.now():
resp = requests.post(self._url + '/call', '\n'.join(self._metric_collectors))
self.next_register = datetime.datetime.now() + REGISTER_WAIT
logger.debug('Register returned %s for %s metrics; setting next to %s', resp.status_code,
len(self._metric_collectors), self.next_register)
elif resp.status_code != 200:
logger.debug('Failed sending to prometheus: %s: %s: %s', resp.status_code, resp.text,
', '.join(reqs))
else:
logger.debug('Sent %d prometheus metrics', len(reqs))
except:
logger.exception('Failed to write to prometheus aggregator: %s', reqs)
class _Collector(object):
""" Collector for a Prometheus metric. """
def __init__(self, enqueue_method, collector_type, collector_name, collector_help,
namespace='', subsystem='', **kwargs):
self._enqueue_method = enqueue_method
self._base_args = {
'Name': collector_name,
'Namespace': namespace,
'Subsystem': subsystem,
'Type': collector_type,
}
registration_params = dict(kwargs)
registration_params.update(self._base_args)
registration_params['Help'] = collector_help
self._enqueue_method('register', registration_params)
def __getattr__(self, method):
def f(value=0, labelvalues=()):
data = dict(self._base_args)
data.update({
'Value': value,
'LabelValues': [str(i) for i in labelvalues],
'Method': method,
})
self._enqueue_method('put', data)
return f

View file

@ -48,7 +48,7 @@ class CloudWatchSender(Thread):
connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key) connection = boto.connect_cloudwatch(self._aws_access_key, self._aws_secret_key)
except: except:
logger.exception('Failed to connect to CloudWatch.') logger.exception('Failed to connect to CloudWatch.')
self._metrics.enable() self._metrics.enable_deprecated()
while True: while True:
metrics = { metrics = {
@ -59,12 +59,12 @@ class CloudWatchSender(Thread):
'dimensions': [], 'dimensions': [],
} }
metric = self._metrics.get() metric = self._metrics.get_deprecated()
append_metric(metrics, metric) append_metric(metrics, metric)
while len(metrics['name']) < MAX_BATCH_METRICS: while len(metrics['name']) < MAX_BATCH_METRICS:
try: try:
metric = self._metrics.get_nowait() metric = self._metrics.get_nowait_deprecated()
append_metric(metrics, metric) append_metric(metrics, metric)
except Empty: except Empty:
break break
@ -74,7 +74,7 @@ class CloudWatchSender(Thread):
logger.debug('Sent %d CloudWatch metrics', len(metrics['name'])) logger.debug('Sent %d CloudWatch metrics', len(metrics['name']))
except: except:
for i in range(len(metrics['name'])): for i in range(len(metrics['name'])):
self._metrics.put(metrics['name'][i], metrics['value'][i], self._metrics.put_deprecated(metrics['name'][i], metrics['value'][i],
unit=metrics['unit'][i], unit=metrics['unit'][i],
dimensions=metrics['dimensions'][i], dimensions=metrics['dimensions'][i],
timestamp=metrics['timestamp'][i], timestamp=metrics['timestamp'][i],

View file

@ -1,75 +0,0 @@
import datetime
import logging
import time
from functools import wraps
from Queue import Queue, Full
from flask import g, request
logger = logging.getLogger(__name__)
class MetricQueue(object):
def __init__(self):
self._queue = None
def enable(self, maxsize=10000):
self._queue = Queue(maxsize)
def put(self, name, value, **kwargs):
if self._queue is None:
logger.debug('No metric queue %s %s %s', name, value, kwargs)
return
try:
kwargs.setdefault('timestamp', datetime.datetime.now())
kwargs.setdefault('dimensions', {})
self._queue.put_nowait((name, value, kwargs))
except Full:
logger.error('Metric queue full')
def get(self):
return self._queue.get()
def get_nowait(self):
return self._queue.get_nowait()
def time_blueprint(bp, metric_queue):
bp.before_request(time_before_request)
bp.after_request(time_after_request(bp.name, metric_queue))
def time_before_request():
g._request_start_time = time.time()
def time_after_request(name, metric_queue):
def f(r):
start = getattr(g, '_request_start_time', None)
if start is None:
return r
dur = time.time() - start
dims = {'endpoint': request.endpoint}
metric_queue.put('ResponseTime', dur, dimensions=dims, unit='Seconds')
metric_queue.put('ResponseCode', r.status_code, dimensions=dims)
if r.status_code >= 500:
metric_queue.put('5XXResponse', 1, dimensions={'name': name})
elif r.status_code < 200 or r.status_code >= 300:
metric_queue.put('Non200Response', 1, dimensions={'name': name})
return r
return f
def time_decorator(name, metric_queue):
after = time_after_request(name, metric_queue)
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
time_before_request()
rv = func(*args, **kwargs)
after(rv)
return rv
return wrapper
return decorator