Use prometheus as a metric backend

This entails writing a metric aggregation program since each worker has its
own memory, and thus own metrics because of python gunicorn. The python
client is a simple wrapper that makes web requests to it.
This commit is contained in:
Matt Jibson 2015-11-20 15:32:17 -05:00 committed by Joseph Schorr
parent 781f2eec72
commit 3d9acf2fff
10 changed files with 502 additions and 0 deletions

3
app.py
View file

@ -25,6 +25,7 @@ from data.archivedlogs import LogArchive
from data.userevent import UserEventsBuilderModule from data.userevent import UserEventsBuilderModule
from data.queue import WorkQueue, MetricQueueReporter from data.queue import WorkQueue, MetricQueueReporter
from util import get_app_url from util import get_app_url
from util import prometheus
from util.saas.analytics import Analytics from util.saas.analytics import Analytics
from util.saas.exceptionlog import Sentry from util.saas.exceptionlog import Sentry
from util.names import urn_generator from util.names import urn_generator
@ -207,6 +208,8 @@ if os.path.exists(_v2_key_path):
else: else:
docker_v2_signing_key = RSAKey(key=RSA.generate(2048)) docker_v2_signing_key = RSAKey(key=RSA.generate(2048))
prometheus.init(app.config.get('PROMETHEUS_AGGREGATOR_URL'))
database.configure(app.config) database.configure(app.config)
model.config.app_config = app.config model.config.app_config = app.config
model.config.store = storage model.config.store = storage

View file

@ -17,8 +17,11 @@ from buildman.jobutil.buildstatus import StatusHandler
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
from data import database from data import database
from app import app, metric_queue from app import app, metric_queue
from app import app
from util.prometheus import Counter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
build_counter = Counter('builds', 'Number of builds', labelnames=['name'])
WORK_CHECK_TIMEOUT = 10 WORK_CHECK_TIMEOUT = 10
TIMEOUT_PERIOD_MINUTES = 20 TIMEOUT_PERIOD_MINUTES = 20
@ -238,3 +241,4 @@ def report_completion_status(status):
return return
metric_queue.put(status_name, 1, unit='Count') metric_queue.put(status_name, 1, unit='Count')
build_counter.Inc(labelvalues=[status_name])

View file

@ -2,10 +2,14 @@ from datetime import datetime, timedelta
from data.database import QueueItem, db, db_for_update, db_random_func from data.database import QueueItem, db, db_for_update, db_random_func
from util.morecollections import AttrDict from util.morecollections import AttrDict
from util.prometheus import Gauge
MINIMUM_EXTENSION = timedelta(seconds=20) MINIMUM_EXTENSION = timedelta(seconds=20)
build_capacity_shortage = Gauge('build_capacity_shortage', 'Build capacity shortage.')
percent_building = Gauge('build_percent_building', 'Percent building.')
class NoopWith: class NoopWith:
def __enter__(self): def __enter__(self):
pass pass
@ -20,9 +24,11 @@ class MetricQueueReporter(object):
def __call__(self, currently_processing, running_count, total_count): def __call__(self, currently_processing, running_count, total_count):
need_capacity_count = total_count - running_count need_capacity_count = total_count - running_count
self._metric_queue.put('BuildCapacityShortage', need_capacity_count, unit='Count') self._metric_queue.put('BuildCapacityShortage', need_capacity_count, unit='Count')
build_capacity_shortage.Set(need_capacity_count)
building_percent = 100 if currently_processing else 0 building_percent = 100 if currently_processing else 0
self._metric_queue.put('PercentBuilding', building_percent, unit='Percent') self._metric_queue.put('PercentBuilding', building_percent, unit='Percent')
percent_building.Set(building_percent)
class WorkQueue(object): class WorkQueue(object):
def __init__(self, queue_name, transaction_factory, def __init__(self, queue_name, transaction_factory,

View file

@ -30,6 +30,11 @@ initdb)
fulldbtest) fulldbtest)
d bash /src/quay/test/fulldbtest.sh d bash /src/quay/test/fulldbtest.sh
;; ;;
prom)
R=quay.io/quay/prom-monitor
docker build -t $R prom_aggregator
docker run --rm -it --net=host $R
;;
*) *)
echo "unknown option" echo "unknown option"
exit 1 exit 1

1
prom_aggregator/.godir Normal file
View file

@ -0,0 +1 @@
github.com/coreos-inc/quay/prom_aggregator

View file

@ -0,0 +1,14 @@
FROM golang
RUN mkdir -p /go/src/app
WORKDIR /go/src/app
CMD ["go-wrapper", "run"]
ENV GO15VENDOREXPERIMENT=1
COPY . /go/src/app
RUN go-wrapper download
RUN go-wrapper install
EXPOSE 9092

330
prom_aggregator/main.go Normal file
View file

@ -0,0 +1,330 @@
package main
import (
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/coreos/pkg/flagutil"
"github.com/prometheus/client_golang/prometheus"
)
func main() {
fs := flag.NewFlagSet("prometheus_aggregator", flag.ExitOnError)
listen := fs.String("listen", ":9092", "")
if err := fs.Parse(os.Args[1:]); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
if err := flagutil.SetFlagsFromEnv(fs, "PROMETHEUS_AGGREGATOR"); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
http.Handle("/metrics", prometheus.Handler())
http.HandleFunc("/call", wrap(call))
log.Println("listening on", *listen)
log.Fatal(http.ListenAndServe(*listen, nil))
}
func wrap(f func(http.ResponseWriter, *http.Request) error) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
err := f(w, r)
if err != nil {
http.Error(w, err.Error(), 500)
}
}
}
type options struct {
// common
identifier
Help string
LabelNames []string
// summary
Objectives map[float64]float64
MaxAge time.Duration
AgeBuckets uint32
BufCap uint32
// histogram
Buckets []float64
}
func (o *options) register() error {
lock.RLock()
mv, ok := metrics[o.identifier]
lock.RUnlock()
if ok {
return nil
}
switch o.Type {
case "Gauge":
mv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: o.Namespace,
Subsystem: o.Subsystem,
Name: o.Name,
Help: o.Help,
}, o.LabelNames)
case "Counter":
mv = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: o.Namespace,
Subsystem: o.Subsystem,
Name: o.Name,
Help: o.Help,
}, o.LabelNames)
case "Untyped":
mv = prometheus.NewUntypedVec(prometheus.UntypedOpts{
Namespace: o.Namespace,
Subsystem: o.Subsystem,
Name: o.Name,
Help: o.Help,
}, o.LabelNames)
case "Summary":
mv = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: o.Namespace,
Subsystem: o.Subsystem,
Name: o.Name,
Help: o.Help,
Objectives: o.Objectives,
MaxAge: o.MaxAge,
AgeBuckets: o.AgeBuckets,
BufCap: o.BufCap,
}, o.LabelNames)
case "Histogram":
mv = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: o.Namespace,
Subsystem: o.Subsystem,
Name: o.Name,
Help: o.Help,
Buckets: o.Buckets,
}, o.LabelNames)
default:
return fmt.Errorf("unknown type: %s", o.Type)
}
err := prometheus.Register(mv)
if err != nil {
// TODO(mjibson): cache the error instead of creating new collectors each time
return err
}
lock.Lock()
metrics[o.identifier] = mv
lock.Unlock()
return nil
}
type message struct {
Call string
Data json.RawMessage
}
func call(w http.ResponseWriter, r *http.Request) error {
dec := json.NewDecoder(r.Body)
var me MultiError
for {
var m message
var err error
if err = dec.Decode(&m); err == io.EOF {
break
} else if err != nil {
return err
}
log.Println(m.Call, string(m.Data))
switch m.Call {
case "put":
err = put(m.Data)
case "register":
err = register(m.Data)
default:
err = fmt.Errorf("unknown call: %s", m.Call)
}
me = append(me, err)
}
return me.AsError()
}
func register(msg json.RawMessage) error {
var o options
if err := json.Unmarshal(msg, &o); err != nil {
return err
}
return o.register()
}
func put(msg json.RawMessage) error {
var m metric
if err := json.Unmarshal(msg, &m); err != nil {
return err
}
return m.put()
}
var (
lock sync.RWMutex
metrics = make(map[identifier]prometheus.Collector)
)
type identifier struct {
Namespace string
Subsystem string
Name string
Type string
}
type metric struct {
identifier
LabelValues []string
// Method is the method name: Set, Inc, Dec, Add, Sub, Observe (dependant on the Type).
Method string
// Value is the parameter to Method. Unused for Inc and Dec.
Value float64
}
const (
errUnexpectedType = "unknown type %T: expected %s"
errUnknownMethod = "unknown method %s on type %s"
)
func (m *metric) put() error {
lock.RLock()
mv, ok := metrics[m.identifier]
lock.RUnlock()
if !ok {
return fmt.Errorf("unknown collector: %v", m.identifier)
}
switch m.Type {
case "Gauge":
v, ok := mv.(*prometheus.GaugeVec)
if !ok {
return fmt.Errorf(errUnexpectedType, m.Type, mv)
}
c, err := v.GetMetricWithLabelValues(m.LabelValues...)
if err != nil {
return err
}
switch m.Method {
case "Set":
c.Set(m.Value)
case "Inc":
c.Inc()
case "Dec":
c.Dec()
case "Add":
c.Add(m.Value)
case "Sub":
c.Sub(m.Value)
default:
return fmt.Errorf(errUnknownMethod, m.Method, m.Type)
}
case "Untyped":
v, ok := mv.(*prometheus.UntypedVec)
if !ok {
return fmt.Errorf(errUnexpectedType, m.Type, mv)
}
c, err := v.GetMetricWithLabelValues(m.LabelValues...)
if err != nil {
return err
}
switch m.Method {
case "Set":
c.Set(m.Value)
case "Inc":
c.Inc()
case "Dec":
c.Dec()
case "Add":
c.Add(m.Value)
case "Sub":
c.Sub(m.Value)
default:
return fmt.Errorf(errUnknownMethod, m.Method, m.Type)
}
case "Counter":
v, ok := mv.(*prometheus.CounterVec)
if !ok {
return fmt.Errorf(errUnexpectedType, m.Type, mv)
}
c, err := v.GetMetricWithLabelValues(m.LabelValues...)
if err != nil {
return err
}
switch m.Method {
case "Set":
c.Set(m.Value)
case "Inc":
c.Inc()
case "Add":
c.Add(m.Value)
default:
return fmt.Errorf(errUnknownMethod, m.Method, m.Type)
}
case "Summary":
v, ok := mv.(*prometheus.SummaryVec)
if !ok {
return fmt.Errorf(errUnexpectedType, m.Type, mv)
}
c, err := v.GetMetricWithLabelValues(m.LabelValues...)
if err != nil {
return err
}
switch m.Method {
case "Observe":
c.Observe(m.Value)
default:
return fmt.Errorf(errUnknownMethod, m.Method, m.Type)
}
case "Histogram":
v, ok := mv.(*prometheus.HistogramVec)
if !ok {
return fmt.Errorf(errUnexpectedType, m.Type, mv)
}
c, err := v.GetMetricWithLabelValues(m.LabelValues...)
if err != nil {
return err
}
switch m.Method {
case "Observe":
c.Observe(m.Value)
default:
return fmt.Errorf(errUnknownMethod, m.Method, m.Type)
}
default:
return fmt.Errorf("unknown type: %s", m.Type)
}
return nil
}
// MultiError is returned by batch operations when there are errors with
// particular elements. Errors will be in a one-to-one correspondence with
// the input elements; successful elements will have a nil entry.
type MultiError []error
func (m MultiError) Error() string {
var strs []string
for i, err := range m {
if err != nil {
strs = append(strs, fmt.Sprintf("[%d] %v", i, err))
}
}
return strings.Join(strs, " ")
}
func (m MultiError) AsError() error {
for _, e := range m {
if e != nil {
return m
}
}
return nil
}

View file

@ -15,10 +15,13 @@ from collections import namedtuple
from util.registry import filelike from util.registry import filelike
from storage.basestorage import BaseStorageV2, InvalidChunkException from storage.basestorage import BaseStorageV2, InvalidChunkException
from util.prometheus import Counter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
multipart_upload_start = Counter('multipart_upload_start', 'Multipart upload startse')
multipart_upload_end = Counter('multipart_upload_end', 'Multipart upload ends.', labelnames=['type'])
_PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length']) _PartUploadMetadata = namedtuple('_PartUploadMetadata', ['path', 'offset', 'length'])
_CHUNKS_KEY = 'chunks' _CHUNKS_KEY = 'chunks'
@ -166,6 +169,7 @@ class _CloudStorage(BaseStorageV2):
if self._metric_queue is not None: if self._metric_queue is not None:
self._metric_queue.put('MultipartUploadStart', 1) self._metric_queue.put('MultipartUploadStart', 1)
multipart_upload_start.Inc()
return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata, return self._cloud_bucket.initiate_multipart_upload(path, metadata=metadata,
**self._upload_params) **self._upload_params)
@ -206,6 +210,8 @@ class _CloudStorage(BaseStorageV2):
if self._metric_queue is not None: if self._metric_queue is not None:
self._metric_queue.put('MultipartUploadFailure', 1) self._metric_queue.put('MultipartUploadFailure', 1)
multipart_upload_end.Inc(labelvalues=['failure'])
if cancel_on_error: if cancel_on_error:
mp.cancel_upload() mp.cancel_upload()
return 0, error return 0, error
@ -216,7 +222,9 @@ class _CloudStorage(BaseStorageV2):
if self._metric_queue is not None: if self._metric_queue is not None:
self._metric_queue.put('MultipartUploadSuccess', 1) self._metric_queue.put('MultipartUploadSuccess', 1)
multipart_upload_end.Inc(labelvalues=['success'])
mp.complete_upload() mp.complete_upload()
return total_bytes_written, error return total_bytes_written, error
def exists(self, path): def exists(self, path):

122
util/prometheus/__init__.py Normal file
View file

@ -0,0 +1,122 @@
import datetime
import json
import logging
from Queue import Queue, Full, Empty
from threading import Thread
import requests
logger = logging.getLogger(__name__)
URL = None
QUEUE_MAX = 1000
MAX_BATCH_SIZE = 100
REGISTER_WAIT = datetime.timedelta(hours=1)
queue = Queue(QUEUE_MAX)
registered = []
def init(url):
global URL, queue
if not url:
logger.debug('Prometheus aggregator not started: empty URL')
queue = None
return
URL = url
sender = _QueueSender()
sender.start()
logger.debug('Prometheus aggregator sending to %s', URL)
def enqueue(call, data):
if not queue:
return
v = json.dumps({
'Call': call,
'Data': data,
})
if call == 'register':
registered.append(v)
return
try:
queue.put_nowait(v)
except Full:
# If the queue is full, it is because 1) no aggregator was enabled or 2)
# the aggregator is taking a long time to respond to requests. In the case
# of 1, it's probably enterprise mode and we don't care. In the case of 2,
# the response timeout error is printed at another place. In either case,
# we don't need to print an error here.
pass
class _QueueSender(Thread):
def __init__(self):
Thread.__init__(self)
self.daemon = True
self.next_register = datetime.datetime.now()
def run(self):
while True:
reqs = []
reqs.append(queue.get())
while len(reqs) < MAX_BATCH_SIZE:
try:
req = queue.get_nowait()
reqs.append(req)
except Empty:
break
try:
resp = requests.post(URL + '/call', '\n'.join(reqs))
if resp.status_code == 500 and self.next_register <= datetime.datetime.now():
resp = requests.post(URL + '/call', '\n'.join(registered))
self.next_register = datetime.datetime.now() + REGISTER_WAIT
logger.debug('Register returned %s for %s metrics; setting next to %s', resp.status_code, len(registered), self.next_register)
elif resp.status_code != 200:
logger.debug('Failed sending to prometheus: %s: %s: %s', resp.status_code, resp.text, ', '.join(reqs))
else:
logger.debug('Sent %d prometheus metrics', len(reqs))
except:
logger.exception('Failed to write to prometheus aggregator: %s', reqs)
class _Collector(object):
def __init__(self, name, help, namespace='', subsystem='', **kwargs):
self._name = name
self._namespace = namespace
self._subsystem = subsystem
kwargs['Name'] = name
kwargs['Namespace'] = namespace
kwargs['Subsystem'] = subsystem
kwargs['Type'] = self.__class__.__name__
kwargs['Help'] = help
enqueue('register', kwargs)
def __getattr__(self, method):
def f(value=0, labelvalues=()):
data = {
'Name': self._name,
'Subsystem': self._subsystem,
'Namespace': self._namespace,
'Type': self.__class__.__name__,
'Value': value,
'LabelValues': [str(i) for i in labelvalues],
'Method': method,
}
enqueue('put', data)
return f
class Gauge(_Collector):
pass
class Counter(_Collector):
pass
class Summary(_Collector):
pass
class Histogram(_Collector):
pass
class Untyped(_Collector):
pass

View file

@ -4,12 +4,17 @@ import time
from functools import wraps from functools import wraps
from Queue import Queue, Full from Queue import Queue, Full
from util.prometheus import Histogram, Counter
from flask import g, request from flask import g, request
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
resp_time = Histogram('response_time', 'HTTP response time in seconds', labelnames=['endpoint'])
resp_code = Counter('response_code', 'HTTP response code', labelnames=['endpoint', 'code'])
non_200 = Counter('response_non200', 'Non-200 HTTP response codes', labelnames=['endpoint'])
class MetricQueue(object): class MetricQueue(object):
def __init__(self): def __init__(self):
self._queue = None self._queue = None
@ -54,10 +59,14 @@ def time_after_request(name, metric_queue):
metric_queue.put('ResponseTime', dur, dimensions=dims, unit='Seconds') metric_queue.put('ResponseTime', dur, dimensions=dims, unit='Seconds')
metric_queue.put('ResponseCode', r.status_code, dimensions=dims) metric_queue.put('ResponseCode', r.status_code, dimensions=dims)
resp_time.Observe(dur, labelvalues=[request.endpoint])
resp_code.Inc(labelvalues=[request.endpoint, r.status_code])
if r.status_code >= 500: if r.status_code >= 500:
metric_queue.put('5XXResponse', 1, dimensions={'name': name}) metric_queue.put('5XXResponse', 1, dimensions={'name': name})
elif r.status_code < 200 or r.status_code >= 300: elif r.status_code < 200 or r.status_code >= 300:
metric_queue.put('Non200Response', 1, dimensions={'name': name}) metric_queue.put('Non200Response', 1, dimensions={'name': name})
non_200.Inc(labelvalues=[request.endpoint])
return r return r
return f return f