From 3d9acf2fff71e714af601218937dd1459eb3aced Mon Sep 17 00:00:00 2001 From: Matt Jibson Date: Fri, 20 Nov 2015 15:32:17 -0500 Subject: [PATCH] Use prometheus as a metric backend This entails writing a metric aggregation program since each worker has its own memory, and thus own metrics because of python gunicorn. The python client is a simple wrapper that makes web requests to it. --- app.py | 3 + buildman/server.py | 4 + data/queue.py | 6 + local-docker.sh | 5 + prom_aggregator/.godir | 1 + prom_aggregator/Dockerfile | 14 ++ prom_aggregator/main.go | 330 ++++++++++++++++++++++++++++++++++++ storage/cloud.py | 8 + util/prometheus/__init__.py | 122 +++++++++++++ util/saas/metricqueue.py | 9 + 10 files changed, 502 insertions(+) create mode 100644 prom_aggregator/.godir create mode 100644 prom_aggregator/Dockerfile create mode 100644 prom_aggregator/main.go create mode 100644 util/prometheus/__init__.py diff --git a/app.py b/app.py index d01a7ada9..2e7e35418 100644 --- a/app.py +++ b/app.py @@ -25,6 +25,7 @@ from data.archivedlogs import LogArchive from data.userevent import UserEventsBuilderModule from data.queue import WorkQueue, MetricQueueReporter from util import get_app_url +from util import prometheus from util.saas.analytics import Analytics from util.saas.exceptionlog import Sentry from util.names import urn_generator @@ -207,6 +208,8 @@ if os.path.exists(_v2_key_path): else: docker_v2_signing_key = RSAKey(key=RSA.generate(2048)) +prometheus.init(app.config.get('PROMETHEUS_AGGREGATOR_URL')) + database.configure(app.config) model.config.app_config = app.config model.config.store = storage diff --git a/buildman/server.py b/buildman/server.py index 0403ce810..8f4690c89 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -17,8 +17,11 @@ from buildman.jobutil.buildstatus import StatusHandler from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from data import database from app import app, metric_queue +from app import app +from util.prometheus import Counter logger = logging.getLogger(__name__) +build_counter = Counter('builds', 'Number of builds', labelnames=['name']) WORK_CHECK_TIMEOUT = 10 TIMEOUT_PERIOD_MINUTES = 20 @@ -238,3 +241,4 @@ def report_completion_status(status): return metric_queue.put(status_name, 1, unit='Count') + build_counter.Inc(labelvalues=[status_name]) diff --git a/data/queue.py b/data/queue.py index 720f84beb..559a48204 100644 --- a/data/queue.py +++ b/data/queue.py @@ -2,10 +2,14 @@ from datetime import datetime, timedelta from data.database import QueueItem, db, db_for_update, db_random_func from util.morecollections import AttrDict +from util.prometheus import Gauge MINIMUM_EXTENSION = timedelta(seconds=20) +build_capacity_shortage = Gauge('build_capacity_shortage', 'Build capacity shortage.') +percent_building = Gauge('build_percent_building', 'Percent building.') + class NoopWith: def __enter__(self): pass @@ -20,9 +24,11 @@ class MetricQueueReporter(object): def __call__(self, currently_processing, running_count, total_count): need_capacity_count = total_count - running_count self._metric_queue.put('BuildCapacityShortage', need_capacity_count, unit='Count') + build_capacity_shortage.Set(need_capacity_count) building_percent = 100 if currently_processing else 0 self._metric_queue.put('PercentBuilding', building_percent, unit='Percent') + percent_building.Set(building_percent) class WorkQueue(object): def __init__(self, queue_name, transaction_factory, diff --git a/local-docker.sh b/local-docker.sh index d6fd55cf1..9568b7084 100755 --- a/local-docker.sh +++ b/local-docker.sh @@ -30,6 +30,11 @@ initdb) fulldbtest) d bash /src/quay/test/fulldbtest.sh ;; +prom) + R=quay.io/quay/prom-monitor + docker build -t $R prom_aggregator + docker run --rm -it --net=host $R + ;; *) echo "unknown option" exit 1 diff --git a/prom_aggregator/.godir b/prom_aggregator/.godir new file mode 100644 index 000000000..ec82fbf32 --- /dev/null +++ b/prom_aggregator/.godir @@ -0,0 +1 @@ +github.com/coreos-inc/quay/prom_aggregator diff --git a/prom_aggregator/Dockerfile b/prom_aggregator/Dockerfile new file mode 100644 index 000000000..5a3e2a59c --- /dev/null +++ b/prom_aggregator/Dockerfile @@ -0,0 +1,14 @@ +FROM golang + +RUN mkdir -p /go/src/app +WORKDIR /go/src/app + +CMD ["go-wrapper", "run"] +ENV GO15VENDOREXPERIMENT=1 + +COPY . /go/src/app + +RUN go-wrapper download +RUN go-wrapper install + +EXPOSE 9092 diff --git a/prom_aggregator/main.go b/prom_aggregator/main.go new file mode 100644 index 000000000..51ef525ff --- /dev/null +++ b/prom_aggregator/main.go @@ -0,0 +1,330 @@ +package main + +import ( + "encoding/json" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "strings" + "sync" + "time" + + "github.com/coreos/pkg/flagutil" + "github.com/prometheus/client_golang/prometheus" +) + +func main() { + fs := flag.NewFlagSet("prometheus_aggregator", flag.ExitOnError) + listen := fs.String("listen", ":9092", "") + + if err := fs.Parse(os.Args[1:]); err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + os.Exit(1) + } + if err := flagutil.SetFlagsFromEnv(fs, "PROMETHEUS_AGGREGATOR"); err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + os.Exit(1) + } + + http.Handle("/metrics", prometheus.Handler()) + http.HandleFunc("/call", wrap(call)) + log.Println("listening on", *listen) + log.Fatal(http.ListenAndServe(*listen, nil)) +} + +func wrap(f func(http.ResponseWriter, *http.Request) error) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + err := f(w, r) + if err != nil { + http.Error(w, err.Error(), 500) + } + } +} + +type options struct { + // common + identifier + Help string + LabelNames []string + + // summary + Objectives map[float64]float64 + MaxAge time.Duration + AgeBuckets uint32 + BufCap uint32 + + // histogram + Buckets []float64 +} + +func (o *options) register() error { + lock.RLock() + mv, ok := metrics[o.identifier] + lock.RUnlock() + + if ok { + return nil + } + switch o.Type { + case "Gauge": + mv = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: o.Namespace, + Subsystem: o.Subsystem, + Name: o.Name, + Help: o.Help, + }, o.LabelNames) + case "Counter": + mv = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: o.Namespace, + Subsystem: o.Subsystem, + Name: o.Name, + Help: o.Help, + }, o.LabelNames) + case "Untyped": + mv = prometheus.NewUntypedVec(prometheus.UntypedOpts{ + Namespace: o.Namespace, + Subsystem: o.Subsystem, + Name: o.Name, + Help: o.Help, + }, o.LabelNames) + case "Summary": + mv = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: o.Namespace, + Subsystem: o.Subsystem, + Name: o.Name, + Help: o.Help, + Objectives: o.Objectives, + MaxAge: o.MaxAge, + AgeBuckets: o.AgeBuckets, + BufCap: o.BufCap, + }, o.LabelNames) + case "Histogram": + mv = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: o.Namespace, + Subsystem: o.Subsystem, + Name: o.Name, + Help: o.Help, + Buckets: o.Buckets, + }, o.LabelNames) + default: + return fmt.Errorf("unknown type: %s", o.Type) + } + err := prometheus.Register(mv) + if err != nil { + // TODO(mjibson): cache the error instead of creating new collectors each time + return err + } + lock.Lock() + metrics[o.identifier] = mv + lock.Unlock() + return nil +} + +type message struct { + Call string + Data json.RawMessage +} + +func call(w http.ResponseWriter, r *http.Request) error { + dec := json.NewDecoder(r.Body) + var me MultiError + for { + var m message + var err error + if err = dec.Decode(&m); err == io.EOF { + break + } else if err != nil { + return err + } + log.Println(m.Call, string(m.Data)) + switch m.Call { + case "put": + err = put(m.Data) + case "register": + err = register(m.Data) + default: + err = fmt.Errorf("unknown call: %s", m.Call) + } + me = append(me, err) + } + return me.AsError() +} + +func register(msg json.RawMessage) error { + var o options + if err := json.Unmarshal(msg, &o); err != nil { + return err + } + return o.register() +} + +func put(msg json.RawMessage) error { + var m metric + if err := json.Unmarshal(msg, &m); err != nil { + return err + } + return m.put() +} + +var ( + lock sync.RWMutex + metrics = make(map[identifier]prometheus.Collector) +) + +type identifier struct { + Namespace string + Subsystem string + Name string + Type string +} + +type metric struct { + identifier + + LabelValues []string + // Method is the method name: Set, Inc, Dec, Add, Sub, Observe (dependant on the Type). + Method string + // Value is the parameter to Method. Unused for Inc and Dec. + Value float64 +} + +const ( + errUnexpectedType = "unknown type %T: expected %s" + errUnknownMethod = "unknown method %s on type %s" +) + +func (m *metric) put() error { + lock.RLock() + mv, ok := metrics[m.identifier] + lock.RUnlock() + if !ok { + return fmt.Errorf("unknown collector: %v", m.identifier) + } + switch m.Type { + case "Gauge": + v, ok := mv.(*prometheus.GaugeVec) + if !ok { + return fmt.Errorf(errUnexpectedType, m.Type, mv) + } + c, err := v.GetMetricWithLabelValues(m.LabelValues...) + if err != nil { + return err + } + switch m.Method { + case "Set": + c.Set(m.Value) + case "Inc": + c.Inc() + case "Dec": + c.Dec() + case "Add": + c.Add(m.Value) + case "Sub": + c.Sub(m.Value) + default: + return fmt.Errorf(errUnknownMethod, m.Method, m.Type) + } + case "Untyped": + v, ok := mv.(*prometheus.UntypedVec) + if !ok { + return fmt.Errorf(errUnexpectedType, m.Type, mv) + } + c, err := v.GetMetricWithLabelValues(m.LabelValues...) + if err != nil { + return err + } + switch m.Method { + case "Set": + c.Set(m.Value) + case "Inc": + c.Inc() + case "Dec": + c.Dec() + case "Add": + c.Add(m.Value) + case "Sub": + c.Sub(m.Value) + default: + return fmt.Errorf(errUnknownMethod, m.Method, m.Type) + } + case "Counter": + v, ok := mv.(*prometheus.CounterVec) + if !ok { + return fmt.Errorf(errUnexpectedType, m.Type, mv) + } + c, err := v.GetMetricWithLabelValues(m.LabelValues...) + if err != nil { + return err + } + switch m.Method { + case "Set": + c.Set(m.Value) + case "Inc": + c.Inc() + case "Add": + c.Add(m.Value) + default: + return fmt.Errorf(errUnknownMethod, m.Method, m.Type) + } + case "Summary": + v, ok := mv.(*prometheus.SummaryVec) + if !ok { + return fmt.Errorf(errUnexpectedType, m.Type, mv) + } + c, err := v.GetMetricWithLabelValues(m.LabelValues...) + if err != nil { + return err + } + switch m.Method { + case "Observe": + c.Observe(m.Value) + default: + return fmt.Errorf(errUnknownMethod, m.Method, m.Type) + } + case "Histogram": + v, ok := mv.(*prometheus.HistogramVec) + if !ok { + return fmt.Errorf(errUnexpectedType, m.Type, mv) + } + c, err := v.GetMetricWithLabelValues(m.LabelValues...) + if err != nil { + return err + } + switch m.Method { + case "Observe": + c.Observe(m.Value) + default: + return fmt.Errorf(errUnknownMethod, m.Method, m.Type) + } + default: + return fmt.Errorf("unknown type: %s", m.Type) + } + return nil +} + +// MultiError is returned by batch operations when there are errors with +// particular elements. Errors will be in a one-to-one correspondence with +// the input elements; successful elements will have a nil entry. +type MultiError []error + +func (m MultiError) Error() string { + var strs []string + for i, err := range m { + if err != nil { + strs = append(strs, fmt.Sprintf("[%d] %v", i, err)) + } + } + return strings.Join(strs, " ") +} + +func (m MultiError) AsError() error { + for _, e := range m { + if e != nil { + return m + } + } + return nil +} diff --git a/storage/cloud.py b/storage/cloud.py index 1d8ff4673..7c541d549 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -15,10 +15,13 @@ from collections import namedtuple from util.registry import filelike from storage.basestorage import BaseStorageV2, InvalidChunkException +from util.prometheus import Counter logger = logging.getLogger(__name__) +multipart_upload_start = Counter('multipart_upload_start', 'Multipart upload startse') +multipart_upload_end = Counter('multipart_upload_end', 'Multipart upload ends.', labelnames=['type']) _PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length']) _CHUNKS_KEY = 'chunks' @@ -166,6 +169,7 @@ class _CloudStorage(BaseStorageV2): if self._metric_queue is not None: self._metric_queue.put('MultipartUploadStart', 1) + multipart_upload_start.Inc() return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata, **self._upload_params) @@ -206,6 +210,8 @@ class _CloudStorage(BaseStorageV2): if self._metric_queue is not None: self._metric_queue.put('MultipartUploadFailure', 1) + multipart_upload_end.Inc(labelvalues=['failure']) + if cancel_on_error: mp.cancel_upload() return 0, error @@ -216,7 +222,9 @@ class _CloudStorage(BaseStorageV2): if self._metric_queue is not None: self._metric_queue.put('MultipartUploadSuccess', 1) + multipart_upload_end.Inc(labelvalues=['success']) mp.complete_upload() + return total_bytes_written, error def exists(self, path): diff --git a/util/prometheus/__init__.py b/util/prometheus/__init__.py new file mode 100644 index 000000000..1c0b6f4df --- /dev/null +++ b/util/prometheus/__init__.py @@ -0,0 +1,122 @@ +import datetime +import json +import logging + +from Queue import Queue, Full, Empty +from threading import Thread + +import requests + + +logger = logging.getLogger(__name__) + +URL = None +QUEUE_MAX = 1000 +MAX_BATCH_SIZE = 100 +REGISTER_WAIT = datetime.timedelta(hours=1) + +queue = Queue(QUEUE_MAX) +registered = [] + +def init(url): + global URL, queue + if not url: + logger.debug('Prometheus aggregator not started: empty URL') + queue = None + return + URL = url + sender = _QueueSender() + sender.start() + logger.debug('Prometheus aggregator sending to %s', URL) + +def enqueue(call, data): + if not queue: + return + v = json.dumps({ + 'Call': call, + 'Data': data, + }) + if call == 'register': + registered.append(v) + return + try: + queue.put_nowait(v) + except Full: + # If the queue is full, it is because 1) no aggregator was enabled or 2) + # the aggregator is taking a long time to respond to requests. In the case + # of 1, it's probably enterprise mode and we don't care. In the case of 2, + # the response timeout error is printed at another place. In either case, + # we don't need to print an error here. + pass + +class _QueueSender(Thread): + def __init__(self): + Thread.__init__(self) + self.daemon = True + self.next_register = datetime.datetime.now() + + def run(self): + while True: + reqs = [] + reqs.append(queue.get()) + + while len(reqs) < MAX_BATCH_SIZE: + try: + req = queue.get_nowait() + reqs.append(req) + except Empty: + break + + try: + resp = requests.post(URL + '/call', '\n'.join(reqs)) + if resp.status_code == 500 and self.next_register <= datetime.datetime.now(): + resp = requests.post(URL + '/call', '\n'.join(registered)) + self.next_register = datetime.datetime.now() + REGISTER_WAIT + logger.debug('Register returned %s for %s metrics; setting next to %s', resp.status_code, len(registered), self.next_register) + elif resp.status_code != 200: + logger.debug('Failed sending to prometheus: %s: %s: %s', resp.status_code, resp.text, ', '.join(reqs)) + else: + logger.debug('Sent %d prometheus metrics', len(reqs)) + except: + logger.exception('Failed to write to prometheus aggregator: %s', reqs) + +class _Collector(object): + def __init__(self, name, help, namespace='', subsystem='', **kwargs): + self._name = name + self._namespace = namespace + self._subsystem = subsystem + kwargs['Name'] = name + kwargs['Namespace'] = namespace + kwargs['Subsystem'] = subsystem + kwargs['Type'] = self.__class__.__name__ + kwargs['Help'] = help + enqueue('register', kwargs) + + def __getattr__(self, method): + def f(value=0, labelvalues=()): + data = { + 'Name': self._name, + 'Subsystem': self._subsystem, + 'Namespace': self._namespace, + 'Type': self.__class__.__name__, + 'Value': value, + 'LabelValues': [str(i) for i in labelvalues], + 'Method': method, + } + enqueue('put', data) + return f + +class Gauge(_Collector): + pass + +class Counter(_Collector): + pass + +class Summary(_Collector): + pass + +class Histogram(_Collector): + pass + +class Untyped(_Collector): + pass diff --git a/util/saas/metricqueue.py b/util/saas/metricqueue.py index ab0a6c3cc..7bb2781f8 100644 --- a/util/saas/metricqueue.py +++ b/util/saas/metricqueue.py @@ -4,12 +4,17 @@ import time from functools import wraps from Queue import Queue, Full +from util.prometheus import Histogram, Counter from flask import g, request logger = logging.getLogger(__name__) +resp_time = Histogram('response_time', 'HTTP response time in seconds', labelnames=['endpoint']) +resp_code = Counter('response_code', 'HTTP response code', labelnames=['endpoint', 'code']) +non_200 = Counter('response_non200', 'Non-200 HTTP response codes', labelnames=['endpoint']) + class MetricQueue(object): def __init__(self): self._queue = None @@ -54,10 +59,14 @@ def time_after_request(name, metric_queue): metric_queue.put('ResponseTime', dur, dimensions=dims, unit='Seconds') metric_queue.put('ResponseCode', r.status_code, dimensions=dims) + resp_time.Observe(dur, labelvalues=[request.endpoint]) + resp_code.Inc(labelvalues=[request.endpoint, r.status_code]) + if r.status_code >= 500: metric_queue.put('5XXResponse', 1, dimensions={'name': name}) elif r.status_code < 200 or r.status_code >= 300: metric_queue.put('Non200Response', 1, dimensions={'name': name}) + non_200.Inc(labelvalues=[request.endpoint]) return r return f