From 668a8edc507dc8ce12ecfa22c9cd7eb6fa867fca Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Mon, 1 Feb 2016 15:07:46 -0500 Subject: [PATCH] Refactor prometheus integration Move prometheus to SaaS and make it a plugin Move static callers to use metrics_queue plugin Change local-docker to support different quay clone dirnames Change prom_aggregator to use logrus --- app.py | 6 +- data/queue.py | 9 +-- dev.df | 10 ++- local-docker.sh | 4 +- prom_aggregator/Dockerfile | 2 +- prom_aggregator/main.go | 24 ++++-- storage/cloud.py | 11 +-- util/prometheus/__init__.py | 122 ---------------------------- util/saas/metricqueue.py | 32 ++++++-- util/saas/prometheus.py | 157 ++++++++++++++++++++++++++++++++++++ 10 files changed, 216 insertions(+), 161 deletions(-) delete mode 100644 util/prometheus/__init__.py create mode 100644 util/saas/prometheus.py diff --git a/app.py b/app.py index 2e7e35418..af325a478 100644 --- a/app.py +++ b/app.py @@ -25,7 +25,7 @@ from data.archivedlogs import LogArchive from data.userevent import UserEventsBuilderModule from data.queue import WorkQueue, MetricQueueReporter from util import get_app_url -from util import prometheus +from util.saas.prometheus import PrometheusPlugin from util.saas.analytics import Analytics from util.saas.exceptionlog import Sentry from util.names import urn_generator @@ -168,7 +168,8 @@ Principal(app, use_sessions=False) avatar = Avatar(app) login_manager = LoginManager(app) mail = Mail(app) -metric_queue = MetricQueue() +prometheus = PrometheusPlugin(app) +metric_queue = MetricQueue(prometheus) storage = Storage(app, metric_queue) userfiles = Userfiles(app, storage) log_archive = LogArchive(app, storage) @@ -208,7 +209,6 @@ if os.path.exists(_v2_key_path): else: docker_v2_signing_key = RSAKey(key=RSA.generate(2048)) -prometheus.init(app.config.get('PROMETHEUS_AGGREGATOR_URL')) database.configure(app.config) model.config.app_config = app.config diff --git a/data/queue.py b/data/queue.py index 559a48204..44e76ea71 100644 --- a/data/queue.py +++ b/data/queue.py @@ -2,13 +2,10 @@ from datetime import datetime, timedelta from data.database import QueueItem, db, db_for_update, db_random_func from util.morecollections import AttrDict -from util.prometheus import Gauge MINIMUM_EXTENSION = timedelta(seconds=20) -build_capacity_shortage = Gauge('build_capacity_shortage', 'Build capacity shortage.') -percent_building = Gauge('build_percent_building', 'Percent building.') class NoopWith: def __enter__(self): @@ -17,6 +14,7 @@ class NoopWith: def __exit__(self, type, value, traceback): pass + class MetricQueueReporter(object): def __init__(self, metric_queue): self._metric_queue = metric_queue @@ -24,11 +22,12 @@ class MetricQueueReporter(object): def __call__(self, currently_processing, running_count, total_count): need_capacity_count = total_count - running_count self._metric_queue.put('BuildCapacityShortage', need_capacity_count, unit='Count') - build_capacity_shortage.Set(need_capacity_count) + self._metric_queue.build_capacity_shortage.Set(need_capacity_count) building_percent = 100 if currently_processing else 0 self._metric_queue.put('PercentBuilding', building_percent, unit='Percent') - percent_building.Set(building_percent) + self._metric_queue.percent_building.Set(building_percent) + class WorkQueue(object): def __init__(self, queue_name, transaction_factory, diff --git a/dev.df b/dev.df index 13a1a4029..57db0befa 100644 --- a/dev.df +++ b/dev.df @@ -1,6 +1,6 @@ # vim:ft=dockerfile -FROM phusion/baseimage:0.9.16 +FROM phusion/baseimage:0.9.18 ENV DEBIAN_FRONTEND noninteractive ENV HOME /root @@ -16,11 +16,13 @@ ADD requirements.txt requirements.txt RUN virtualenv --distribute venv RUN venv/bin/pip install -r requirements.txt -WORKDIR /src/quay -ENV PYTHONPATH=/ -ENV PATH=/venv/bin:$PATH +ARG src_subdir RUN apt-key adv --keyserver hkp://pgp.mit.edu:80 --recv-keys 58118E89F3A912897C070ADBF76221572C52609D \ && echo "deb https://apt.dockerproject.org/repo ubuntu-trusty main" > /etc/apt/sources.list.d/docker.list \ && apt-get update \ && apt-get install -y docker-engine + +ENV PYTHONPATH=/ +ENV PATH=/venv/bin:$PATH +WORKDIR /src/$src_subdir diff --git a/local-docker.sh b/local-docker.sh index 9568b7084..4545331a7 100755 --- a/local-docker.sh +++ b/local-docker.sh @@ -6,7 +6,7 @@ REPO=quay.io/quay/quay-dev d () { - docker build -t $REPO -f dev.df . + docker build -t $REPO -f dev.df --build-arg src_subdir=$(basename `pwd`) . docker -- run --rm -v /var/run/docker.sock:/run/docker.sock -it --net=host -v $(pwd)/..:/src $REPO $* } @@ -33,7 +33,7 @@ fulldbtest) prom) R=quay.io/quay/prom-monitor docker build -t $R prom_aggregator - docker run --rm -it --net=host $R + docker run --rm -it --net=host $R -loglevel=debug ;; *) echo "unknown option" diff --git a/prom_aggregator/Dockerfile b/prom_aggregator/Dockerfile index 5a3e2a59c..26e62905d 100644 --- a/prom_aggregator/Dockerfile +++ b/prom_aggregator/Dockerfile @@ -3,7 +3,7 @@ FROM golang RUN mkdir -p /go/src/app WORKDIR /go/src/app -CMD ["go-wrapper", "run"] +ENTRYPOINT ["go-wrapper", "run"] ENV GO15VENDOREXPERIMENT=1 COPY . /go/src/app diff --git a/prom_aggregator/main.go b/prom_aggregator/main.go index 51ef525ff..edebf3dce 100644 --- a/prom_aggregator/main.go +++ b/prom_aggregator/main.go @@ -5,7 +5,6 @@ import ( "flag" "fmt" "io" - "log" "net/http" "os" "strings" @@ -14,24 +13,33 @@ import ( "github.com/coreos/pkg/flagutil" "github.com/prometheus/client_golang/prometheus" + log "github.com/Sirupsen/logrus" ) -func main() { - fs := flag.NewFlagSet("prometheus_aggregator", flag.ExitOnError) - listen := fs.String("listen", ":9092", "") +var listen = flag.String("listen", ":9092", "") +var level = flag.String("loglevel", "info", "default log level: debug, info, warn, error, fatal, panic") - if err := fs.Parse(os.Args[1:]); err != nil { +func main() { + if err := flag.CommandLine.Parse(os.Args[1:]); err != nil { fmt.Fprintln(os.Stderr, err.Error()) os.Exit(1) } - if err := flagutil.SetFlagsFromEnv(fs, "PROMETHEUS_AGGREGATOR"); err != nil { + if err := flagutil.SetFlagsFromEnv(flag.CommandLine, "PROMETHEUS_AGGREGATOR"); err != nil { fmt.Fprintln(os.Stderr, err.Error()) os.Exit(1) } + lvl, err := log.ParseLevel(*level) + if err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + os.Exit(1) + } + + log.SetLevel(lvl) + http.Handle("/metrics", prometheus.Handler()) http.HandleFunc("/call", wrap(call)) - log.Println("listening on", *listen) + log.Infoln("listening on", *listen) log.Fatal(http.ListenAndServe(*listen, nil)) } @@ -139,7 +147,7 @@ func call(w http.ResponseWriter, r *http.Request) error { } else if err != nil { return err } - log.Println(m.Call, string(m.Data)) + log.Debugln(m.Call, string(m.Data)) switch m.Call { case "put": err = put(m.Data) diff --git a/storage/cloud.py b/storage/cloud.py index 7c541d549..6e7aa0576 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -15,14 +15,10 @@ from collections import namedtuple from util.registry import filelike from storage.basestorage import BaseStorageV2, InvalidChunkException -from util.prometheus import Counter logger = logging.getLogger(__name__) -multipart_upload_start = Counter('multipart_upload_start', 'Multipart upload startse') -multipart_upload_end = Counter('multipart_upload_end', 'Multipart upload ends.', labelnames=['type']) - _PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length']) _CHUNKS_KEY = 'chunks' @@ -168,8 +164,8 @@ class _CloudStorage(BaseStorageV2): if self._metric_queue is not None: self._metric_queue.put('MultipartUploadStart', 1) + self._metric_queue.multipart_upload_start.Inc() - multipart_upload_start.Inc() return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata, **self._upload_params) @@ -209,8 +205,7 @@ class _CloudStorage(BaseStorageV2): if self._metric_queue is not None: self._metric_queue.put('MultipartUploadFailure', 1) - - multipart_upload_end.Inc(labelvalues=['failure']) + self._metric_queue.multipart_upload_end.Inc(labelvalues=['failure']) if cancel_on_error: mp.cancel_upload() @@ -221,8 +216,8 @@ class _CloudStorage(BaseStorageV2): if total_bytes_written > 0: if self._metric_queue is not None: self._metric_queue.put('MultipartUploadSuccess', 1) + self._metric_queue.multipart_upload_end.Inc(labelvalues=['success']) - multipart_upload_end.Inc(labelvalues=['success']) mp.complete_upload() return total_bytes_written, error diff --git a/util/prometheus/__init__.py b/util/prometheus/__init__.py deleted file mode 100644 index 1c0b6f4df..000000000 --- a/util/prometheus/__init__.py +++ /dev/null @@ -1,122 +0,0 @@ -import datetime -import json -import logging - -from Queue import Queue, Full, Empty -from threading import Thread - -import requests - - -logger = logging.getLogger(__name__) - -URL = None -QUEUE_MAX = 1000 -MAX_BATCH_SIZE = 100 -REGISTER_WAIT = datetime.timedelta(hours=1) - -queue = Queue(QUEUE_MAX) -registered = [] - -def init(url): - global URL, queue - if not url: - logger.debug('Prometheus aggregator not started: empty URL') - queue = None - return - URL = url - sender = _QueueSender() - sender.start() - logger.debug('Prometheus aggregator sending to %s', URL) - -def enqueue(call, data): - if not queue: - return - v = json.dumps({ - 'Call': call, - 'Data': data, - }) - if call == 'register': - registered.append(v) - return - try: - queue.put_nowait(v) - except Full: - # If the queue is full, it is because 1) no aggregator was enabled or 2) - # the aggregator is taking a long time to respond to requests. In the case - # of 1, it's probably enterprise mode and we don't care. In the case of 2, - # the response timeout error is printed at another place. In either case, - # we don't need to print an error here. - pass - -class _QueueSender(Thread): - def __init__(self): - Thread.__init__(self) - self.daemon = True - self.next_register = datetime.datetime.now() - - def run(self): - while True: - reqs = [] - reqs.append(queue.get()) - - while len(reqs) < MAX_BATCH_SIZE: - try: - req = queue.get_nowait() - reqs.append(req) - except Empty: - break - - try: - resp = requests.post(URL + '/call', '\n'.join(reqs)) - if resp.status_code == 500 and self.next_register <= datetime.datetime.now(): - resp = requests.post(URL + '/call', '\n'.join(registered)) - self.next_register = datetime.datetime.now() + REGISTER_WAIT - logger.debug('Register returned %s for %s metrics; setting next to %s', resp.status_code, len(registered), self.next_register) - elif resp.status_code != 200: - logger.debug('Failed sending to prometheus: %s: %s: %s', resp.status_code, resp.text, ', '.join(reqs)) - else: - logger.debug('Sent %d prometheus metrics', len(reqs)) - except: - logger.exception('Failed to write to prometheus aggregator: %s', reqs) - -class _Collector(object): - def __init__(self, name, help, namespace='', subsystem='', **kwargs): - self._name = name - self._namespace = namespace - self._subsystem = subsystem - kwargs['Name'] = name - kwargs['Namespace'] = namespace - kwargs['Subsystem'] = subsystem - kwargs['Type'] = self.__class__.__name__ - kwargs['Help'] = help - enqueue('register', kwargs) - - def __getattr__(self, method): - def f(value=0, labelvalues=()): - data = { - 'Name': self._name, - 'Subsystem': self._subsystem, - 'Namespace': self._namespace, - 'Type': self.__class__.__name__, - 'Value': value, - 'LabelValues': [str(i) for i in labelvalues], - 'Method': method, - } - enqueue('put', data) - return f - -class Gauge(_Collector): - pass - -class Counter(_Collector): - pass - -class Summary(_Collector): - pass - -class Histogram(_Collector): - pass - -class Untyped(_Collector): - pass diff --git a/util/saas/metricqueue.py b/util/saas/metricqueue.py index 7bb2781f8..aa09d848e 100644 --- a/util/saas/metricqueue.py +++ b/util/saas/metricqueue.py @@ -4,20 +4,32 @@ import time from functools import wraps from Queue import Queue, Full -from util.prometheus import Histogram, Counter from flask import g, request logger = logging.getLogger(__name__) -resp_time = Histogram('response_time', 'HTTP response time in seconds', labelnames=['endpoint']) -resp_code = Counter('response_code', 'HTTP response code', labelnames=['endpoint', 'code']) -non_200 = Counter('response_non200', 'Non-200 HTTP response codes', labelnames=['endpoint']) + +API_BUCKETS = [.01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0] + class MetricQueue(object): - def __init__(self): + def __init__(self, prom): self._queue = None + self.resp_time = prom.create_histogram('response_time', 'HTTP response time in seconds', + labelnames=['endpoint'], buckets=API_BUCKETS) + self.resp_code = prom.create_counter('response_code', 'HTTP response code', + labelnames=['endpoint', 'code']) + self.non_200 = prom.create_counter('response_non200', 'Non-200 HTTP response codes', + labelnames=['endpoint']) + self.multipart_upload_start = prom.create_counter('multipart_upload_start', + 'Multipart upload startse') + self.multipart_upload_end = prom.create_counter('self._metric_queue.multipart_upload_end', + 'Multipart upload ends.', labelnames=['type']) + self.build_capacity_shortage = prom.create_gauge('build_capacity_shortage', + 'Build capacity shortage.') + self.percent_building = prom.create_gauge('build_percent_building', 'Percent building.') def enable(self, maxsize=10000): self._queue = Queue(maxsize) @@ -40,13 +52,16 @@ class MetricQueue(object): def get_nowait(self): return self._queue.get_nowait() + def time_blueprint(bp, metric_queue): bp.before_request(time_before_request) bp.after_request(time_after_request(bp.name, metric_queue)) + def time_before_request(): g._request_start_time = time.time() + def time_after_request(name, metric_queue): def f(r): start = getattr(g, '_request_start_time', None) @@ -59,18 +74,19 @@ def time_after_request(name, metric_queue): metric_queue.put('ResponseTime', dur, dimensions=dims, unit='Seconds') metric_queue.put('ResponseCode', r.status_code, dimensions=dims) - resp_time.Observe(dur, labelvalues=[request.endpoint]) - resp_code.Inc(labelvalues=[request.endpoint, r.status_code]) + metric_queue.resp_time.Observe(dur, labelvalues=[request.endpoint]) + metric_queue.resp_code.Inc(labelvalues=[request.endpoint, r.status_code]) if r.status_code >= 500: metric_queue.put('5XXResponse', 1, dimensions={'name': name}) elif r.status_code < 200 or r.status_code >= 300: metric_queue.put('Non200Response', 1, dimensions={'name': name}) - non_200.Inc(labelvalues=[request.endpoint]) + metric_queue.non_200.Inc(labelvalues=[request.endpoint]) return r return f + def time_decorator(name, metric_queue): after = time_after_request(name, metric_queue) def decorator(func): diff --git a/util/saas/prometheus.py b/util/saas/prometheus.py new file mode 100644 index 000000000..ae5cf264e --- /dev/null +++ b/util/saas/prometheus.py @@ -0,0 +1,157 @@ +import datetime +import json +import logging + +from Queue import Queue, Full, Empty +from threading import Thread + +import requests + + +logger = logging.getLogger(__name__) + +QUEUE_MAX = 1000 +MAX_BATCH_SIZE = 100 +REGISTER_WAIT = datetime.timedelta(hours=1) + + +class _QueueSender(Thread): + def __init__(self, queue, url, registered): + Thread.__init__(self) + self.daemon = True + self.next_register = datetime.datetime.now() + self._queue = queue + self._url = url + self._registered = registered + + def run(self): + while True: + reqs = [] + reqs.append(self._queue.get()) + + while len(reqs) < MAX_BATCH_SIZE: + try: + req = self._queue.get_nowait() + reqs.append(req) + except Empty: + break + + try: + resp = requests.post(self._url + '/call', '\n'.join(reqs)) + if resp.status_code == 500 and self.next_register <= datetime.datetime.now(): + resp = requests.post(self._url + '/call', '\n'.join(self._registered)) + self.next_register = datetime.datetime.now() + REGISTER_WAIT + logger.debug('Register returned %s for %s metrics; setting next to %s', resp.status_code, + len(self._registered), self.next_register) + elif resp.status_code != 200: + logger.debug('Failed sending to prometheus: %s: %s: %s', resp.status_code, resp.text, + ', '.join(reqs)) + else: + logger.debug('Sent %d prometheus metrics', len(reqs)) + except: + logger.exception('Failed to write to prometheus aggregator: %s', reqs) + + +class Prometheus(object): + def __init__(self, url): + self._registered = [] + self._url = url + + if url is not None: + self._queue = Queue(QUEUE_MAX) + self._sender = _QueueSender(self._queue, url, self._registered) + self._sender.start() + logger.debug('Prometheus aggregator sending to %s', url) + else: + self._queue = None + logger.debug('Prometheus aggregator disabled') + + def enqueue(self, call, data): + if not self._queue: + return + + v = json.dumps({ + 'Call': call, + 'Data': data, + }) + if call == 'register': + self._registered.append(v) + return + try: + self._queue.put_nowait(v) + except Full: + # If the queue is full, it is because 1) no aggregator was enabled or 2) + # the aggregator is taking a long time to respond to requests. In the case + # of 1, it's probably enterprise mode and we don't care. In the case of 2, + # the response timeout error is printed at another place. In either case, + # we don't need to print an error here. + pass + + def _create_collector(self, collector_type, args, kwargs): + return _Collector(self.enqueue, collector_type, *args, **kwargs) + + def create_gauge(self, *args, **kwargs): + return self._create_collector('Gauge', args, kwargs) + + def create_counter(self, *args, **kwargs): + return self._create_collector('Counter', args, kwargs) + + def create_summary(self, *args, **kwargs): + return self._create_collector('Summary', args, kwargs) + + def create_histogram(self, *args, **kwargs): + return self._create_collector('Histogram', args, kwargs) + + def create_untyped(self, *args, **kwargs): + return self._create_collector('Untyped', args, kwargs) + + +class PrometheusPlugin(object): + def __init__(self, app=None): + self.app = app + if app is not None: + self.state = self.init_app(app) + else: + self.state = None + + def init_app(self, app): + prom_url = app.config.get('PROMETHEUS_AGGREGATOR_URL') + logger.debug('Initializing prometheus with aggregator url: %s', prom_url) + prometheus = Prometheus(prom_url) + + # register extension with app + app.extensions = getattr(app, 'extensions', {}) + app.extensions['prometheus'] = prometheus + return prometheus + + def __getattr__(self, name): + return getattr(self.state, name, None) + + +class _Collector(object): + def __init__(self, enqueue_method, c_type, name, c_help, namespace='', subsystem='', **kwargs): + self._enqueue_method = enqueue_method + + self._base_args = { + 'Name': name, + 'Namespace': namespace, + 'Subsystem': subsystem, + 'Type': c_type, + } + + registration_params = dict(kwargs) + registration_params.update(self._base_args) + registration_params['Help'] = c_help + + self._enqueue_method('register', registration_params) + + def __getattr__(self, method): + def f(value=0, labelvalues=()): + data = dict(self._base_args) + data.update({ + 'Value': value, + 'LabelValues': [str(i) for i in labelvalues], + 'Method': method, + }) + self._enqueue_method('put', data) + return f