Move aggregator into its own repo and add it to the image

This commit is contained in:
Joseph Schorr 2016-07-01 14:16:15 -04:00
parent 713ba3abaf
commit a1009af61c
14 changed files with 38 additions and 370 deletions

View file

@ -64,6 +64,10 @@ RUN curl -O https://storage.googleapis.com/golang/go1.6.linux-amd64.tar.gz && \
RUN curl -L -o /usr/local/bin/jwtproxy https://github.com/coreos/jwtproxy/releases/download/v0.0.1/jwtproxy-linux-x64 RUN curl -L -o /usr/local/bin/jwtproxy https://github.com/coreos/jwtproxy/releases/download/v0.0.1/jwtproxy-linux-x64
RUN chmod +x /usr/local/bin/jwtproxy RUN chmod +x /usr/local/bin/jwtproxy
# Install prometheus-aggregator
RUN curl -L -o /usr/local/bin/prometheus-aggregator https://github.com/coreos/prometheus-aggregator/releases/download/v0.0.1-alpha/prometheus-aggregator
RUN chmod +x /usr/local/bin/prometheus-aggregator
# Install Grunt # Install Grunt
RUN ln -s /usr/bin/nodejs /usr/bin/node RUN ln -s /usr/bin/nodejs /usr/bin/node
RUN npm install -g grunt-cli RUN npm install -g grunt-cli

4
app.py
View file

@ -25,7 +25,6 @@ from data.archivedlogs import LogArchive
from data.userevent import UserEventsBuilderModule from data.userevent import UserEventsBuilderModule
from data.queue import WorkQueue, BuildMetricQueueReporter from data.queue import WorkQueue, BuildMetricQueueReporter
from util import get_app_url from util import get_app_url
from util.saas.prometheus import PrometheusPlugin
from util.saas.analytics import Analytics from util.saas.analytics import Analytics
from util.saas.exceptionlog import Sentry from util.saas.exceptionlog import Sentry
from util.names import urn_generator from util.names import urn_generator
@ -35,11 +34,12 @@ from util.config.oauth import (GoogleOAuthConfig, GithubOAuthConfig, GitLabOAuth
from util.security.signing import Signer from util.security.signing import Signer
from util.security.instancekeys import InstanceKeys from util.security.instancekeys import InstanceKeys
from util.saas.cloudwatch import start_cloudwatch_sender from util.saas.cloudwatch import start_cloudwatch_sender
from util.saas.metricqueue import MetricQueue
from util.config.provider import get_config_provider from util.config.provider import get_config_provider
from util.config.configutil import generate_secret_key from util.config.configutil import generate_secret_key
from util.config.superusermanager import SuperUserManager from util.config.superusermanager import SuperUserManager
from util.secscan.api import SecurityScannerAPI from util.secscan.api import SecurityScannerAPI
from util.metrics.metricqueue import MetricQueue
from util.metrics.prometheus import PrometheusPlugin
OVERRIDE_CONFIG_DIRECTORY = 'conf/stack/' OVERRIDE_CONFIG_DIRECTORY = 'conf/stack/'
OVERRIDE_CONFIG_YAML_FILENAME = 'conf/stack/config.yaml' OVERRIDE_CONFIG_YAML_FILENAME = 'conf/stack/config.yaml'

View file

@ -0,0 +1,7 @@
#!/bin/sh
# Ensure dependencies start before the logger
sv check syslog-ng > /dev/null || exit 1
# Start the logger
exec logger -i -t prometheus-aggregator

View file

@ -0,0 +1,7 @@
#! /bin/bash
echo 'Starting prometheus aggregator'
/usr/local/bin/prometheus-aggregator
echo 'Prometheus aggregator exited'

View file

@ -355,3 +355,5 @@ class DefaultConfig(object):
# The whitelist of client IDs for OAuth applications that allow for direct login. # The whitelist of client IDs for OAuth applications that allow for direct login.
DIRECT_OAUTH_CLIENTID_WHITELIST = [] DIRECT_OAUTH_CLIENTID_WHITELIST = []
# URL that specifies the location of the prometheus stats aggregator.
PROMETHEUS_AGGREGATOR_URL = 'http://localhost:9092'

View file

@ -23,7 +23,7 @@ from auth.auth import process_oauth
from endpoints.csrf import csrf_protect from endpoints.csrf import csrf_protect
from endpoints.exception import ApiException, Unauthorized, InvalidRequest, InvalidResponse, FreshLoginRequired from endpoints.exception import ApiException, Unauthorized, InvalidRequest, InvalidResponse, FreshLoginRequired
from endpoints.decorators import check_anon_protection from endpoints.decorators import check_anon_protection
from util.saas.metricqueue import time_decorator from util.metrics.metricqueue import time_decorator
from util.pagination import encrypt_page_token, decrypt_page_token from util.pagination import encrypt_page_token, decrypt_page_token
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View file

@ -2,7 +2,7 @@ from flask import Blueprint, make_response
from app import metric_queue from app import metric_queue
from endpoints.decorators import anon_protect, anon_allowed from endpoints.decorators import anon_protect, anon_allowed
from util.saas.metricqueue import time_blueprint from util.metrics.metricqueue import time_blueprint
v1_bp = Blueprint('v1', __name__) v1_bp = Blueprint('v1', __name__)

View file

@ -18,7 +18,7 @@ from endpoints.decorators import anon_protect, anon_allowed
from endpoints.v2.errors import V2RegistryException, Unauthorized from endpoints.v2.errors import V2RegistryException, Unauthorized
from util.http import abort from util.http import abort
from util.registry.dockerver import docker_version from util.registry.dockerver import docker_version
from util.saas.metricqueue import time_blueprint from util.metrics.metricqueue import time_blueprint
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
v2_bp = Blueprint('v2', __name__) v2_bp = Blueprint('v2', __name__)

View file

@ -1 +0,0 @@
github.com/coreos-inc/quay/prom_aggregator

View file

@ -1,14 +0,0 @@
FROM golang
RUN mkdir -p /go/src/app
WORKDIR /go/src/app
ENTRYPOINT ["go-wrapper", "run"]
ENV GO15VENDOREXPERIMENT=1
COPY . /go/src/app
RUN go-wrapper download
RUN go-wrapper install
EXPOSE 9092

View file

@ -1,338 +0,0 @@
package main
import (
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/coreos/pkg/flagutil"
"github.com/prometheus/client_golang/prometheus"
log "github.com/Sirupsen/logrus"
)
var listen = flag.String("listen", ":9092", "")
var level = flag.String("loglevel", "info", "default log level: debug, info, warn, error, fatal, panic")
func main() {
if err := flag.CommandLine.Parse(os.Args[1:]); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
if err := flagutil.SetFlagsFromEnv(flag.CommandLine, "PROMETHEUS_AGGREGATOR"); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
lvl, err := log.ParseLevel(*level)
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
log.SetLevel(lvl)
http.Handle("/metrics", prometheus.Handler())
http.HandleFunc("/call", wrap(call))
log.Infoln("listening on", *listen)
log.Fatal(http.ListenAndServe(*listen, nil))
}
func wrap(f func(http.ResponseWriter, *http.Request) error) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
err := f(w, r)
if err != nil {
http.Error(w, err.Error(), 500)
}
}
}
type options struct {
// common
identifier
Help string
LabelNames []string
// summary
Objectives map[float64]float64
MaxAge time.Duration
AgeBuckets uint32
BufCap uint32
// histogram
Buckets []float64
}
func (o *options) register() error {
lock.RLock()
mv, ok := metrics[o.identifier]
lock.RUnlock()
if ok {
return nil
}
switch o.Type {
case "Gauge":
mv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: o.Namespace,
Subsystem: o.Subsystem,
Name: o.Name,
Help: o.Help,
}, o.LabelNames)
case "Counter":
mv = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: o.Namespace,
Subsystem: o.Subsystem,
Name: o.Name,
Help: o.Help,
}, o.LabelNames)
case "Untyped":
mv = prometheus.NewUntypedVec(prometheus.UntypedOpts{
Namespace: o.Namespace,
Subsystem: o.Subsystem,
Name: o.Name,
Help: o.Help,
}, o.LabelNames)
case "Summary":
mv = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: o.Namespace,
Subsystem: o.Subsystem,
Name: o.Name,
Help: o.Help,
Objectives: o.Objectives,
MaxAge: o.MaxAge,
AgeBuckets: o.AgeBuckets,
BufCap: o.BufCap,
}, o.LabelNames)
case "Histogram":
mv = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: o.Namespace,
Subsystem: o.Subsystem,
Name: o.Name,
Help: o.Help,
Buckets: o.Buckets,
}, o.LabelNames)
default:
return fmt.Errorf("unknown type: %s", o.Type)
}
err := prometheus.Register(mv)
if err != nil {
// TODO(mjibson): cache the error instead of creating new collectors each time
return err
}
lock.Lock()
metrics[o.identifier] = mv
lock.Unlock()
return nil
}
type message struct {
Call string
Data json.RawMessage
}
func call(w http.ResponseWriter, r *http.Request) error {
dec := json.NewDecoder(r.Body)
var me MultiError
for {
var m message
var err error
if err = dec.Decode(&m); err == io.EOF {
break
} else if err != nil {
return err
}
log.Debugln(m.Call, string(m.Data))
switch m.Call {
case "put":
err = put(m.Data)
case "register":
err = register(m.Data)
default:
err = fmt.Errorf("unknown call: %s", m.Call)
}
me = append(me, err)
}
return me.AsError()
}
func register(msg json.RawMessage) error {
var o options
if err := json.Unmarshal(msg, &o); err != nil {
return err
}
return o.register()
}
func put(msg json.RawMessage) error {
var m metric
if err := json.Unmarshal(msg, &m); err != nil {
return err
}
return m.put()
}
var (
lock sync.RWMutex
metrics = make(map[identifier]prometheus.Collector)
)
type identifier struct {
Namespace string
Subsystem string
Name string
Type string
}
type metric struct {
identifier
LabelValues []string
// Method is the method name: Set, Inc, Dec, Add, Sub, Observe (dependant on the Type).
Method string
// Value is the parameter to Method. Unused for Inc and Dec.
Value float64
}
const (
errUnexpectedType = "unknown type %T: expected %s"
errUnknownMethod = "unknown method %s on type %s"
)
func (m *metric) put() error {
lock.RLock()
mv, ok := metrics[m.identifier]
lock.RUnlock()
if !ok {
return fmt.Errorf("unknown collector: %v", m.identifier)
}
switch m.Type {
case "Gauge":
v, ok := mv.(*prometheus.GaugeVec)
if !ok {
return fmt.Errorf(errUnexpectedType, m.Type, mv)
}
c, err := v.GetMetricWithLabelValues(m.LabelValues...)
if err != nil {
return err
}
switch m.Method {
case "Set":
c.Set(m.Value)
case "Inc":
c.Inc()
case "Dec":
c.Dec()
case "Add":
c.Add(m.Value)
case "Sub":
c.Sub(m.Value)
default:
return fmt.Errorf(errUnknownMethod, m.Method, m.Type)
}
case "Untyped":
v, ok := mv.(*prometheus.UntypedVec)
if !ok {
return fmt.Errorf(errUnexpectedType, m.Type, mv)
}
c, err := v.GetMetricWithLabelValues(m.LabelValues...)
if err != nil {
return err
}
switch m.Method {
case "Set":
c.Set(m.Value)
case "Inc":
c.Inc()
case "Dec":
c.Dec()
case "Add":
c.Add(m.Value)
case "Sub":
c.Sub(m.Value)
default:
return fmt.Errorf(errUnknownMethod, m.Method, m.Type)
}
case "Counter":
v, ok := mv.(*prometheus.CounterVec)
if !ok {
return fmt.Errorf(errUnexpectedType, m.Type, mv)
}
c, err := v.GetMetricWithLabelValues(m.LabelValues...)
if err != nil {
return err
}
switch m.Method {
case "Set":
c.Set(m.Value)
case "Inc":
c.Inc()
case "Add":
c.Add(m.Value)
default:
return fmt.Errorf(errUnknownMethod, m.Method, m.Type)
}
case "Summary":
v, ok := mv.(*prometheus.SummaryVec)
if !ok {
return fmt.Errorf(errUnexpectedType, m.Type, mv)
}
c, err := v.GetMetricWithLabelValues(m.LabelValues...)
if err != nil {
return err
}
switch m.Method {
case "Observe":
c.Observe(m.Value)
default:
return fmt.Errorf(errUnknownMethod, m.Method, m.Type)
}
case "Histogram":
v, ok := mv.(*prometheus.HistogramVec)
if !ok {
return fmt.Errorf(errUnexpectedType, m.Type, mv)
}
c, err := v.GetMetricWithLabelValues(m.LabelValues...)
if err != nil {
return err
}
switch m.Method {
case "Observe":
c.Observe(m.Value)
default:
return fmt.Errorf(errUnknownMethod, m.Method, m.Type)
}
default:
return fmt.Errorf("unknown type: %s", m.Type)
}
return nil
}
// MultiError is returned by batch operations when there are errors with
// particular elements. Errors will be in a one-to-one correspondence with
// the input elements; successful elements will have a nil entry.
type MultiError []error
func (m MultiError) Error() string {
var strs []string
for i, err := range m {
if err != nil {
strs = append(strs, fmt.Sprintf("[%d] %v", i, err))
}
}
return strings.Join(strs, " ")
}
func (m MultiError) AsError() error {
for _, e := range m {
if e != nil {
return m
}
}
return nil
}

0
util/metrics/__init__.py Normal file
View file

View file

@ -40,12 +40,12 @@ class PrometheusPlugin(object):
class Prometheus(object): class Prometheus(object):
""" Aggregator for collecting stats that are reported to Prometheus. """ """ Aggregator for collecting stats that are reported to Prometheus. """
def __init__(self, url=None): def __init__(self, url=None):
self._registered = [] self._metric_collectors = []
self._url = url self._url = url
if url is not None: if url is not None:
self._queue = Queue(QUEUE_MAX) self._queue = Queue(QUEUE_MAX)
self._sender = _QueueSender(self._queue, url, self._registered) self._sender = _QueueSender(self._queue, url, self._metric_collectors)
self._sender.start() self._sender.start()
logger.debug('Prometheus aggregator sending to %s', url) logger.debug('Prometheus aggregator sending to %s', url)
else: else:
@ -62,7 +62,7 @@ class Prometheus(object):
}) })
if call == 'register': if call == 'register':
self._registered.append(v) self._metric_collectors.append(v)
return return
try: try:
@ -71,7 +71,7 @@ class Prometheus(object):
# If the queue is full, it is because 1) no aggregator was enabled or 2) # If the queue is full, it is because 1) no aggregator was enabled or 2)
# the aggregator is taking a long time to respond to requests. In the case # the aggregator is taking a long time to respond to requests. In the case
# of 1, it's probably enterprise mode and we don't care. In the case of 2, # of 1, it's probably enterprise mode and we don't care. In the case of 2,
# the response timeout error is printed at another place. In either case, # the response timeout error is printed inside the queue handler. In either case,
# we don't need to print an error here. # we don't need to print an error here.
pass pass
@ -97,13 +97,13 @@ class Prometheus(object):
class _QueueSender(Thread): class _QueueSender(Thread):
""" Helper class which uses a thread to asynchronously send metrics to the local Prometheus """ Helper class which uses a thread to asynchronously send metrics to the local Prometheus
aggregator. """ aggregator. """
def __init__(self, queue, url, registered): def __init__(self, queue, url, metric_collectors):
Thread.__init__(self) Thread.__init__(self)
self.daemon = True self.daemon = True
self.next_register = datetime.datetime.now() self.next_register = datetime.datetime.now()
self._queue = queue self._queue = queue
self._url = url self._url = url
self._registered = registered self._metric_collectors = metric_collectors
def run(self): def run(self):
while True: while True:
@ -120,10 +120,10 @@ class _QueueSender(Thread):
try: try:
resp = requests.post(self._url + '/call', '\n'.join(reqs)) resp = requests.post(self._url + '/call', '\n'.join(reqs))
if resp.status_code == 500 and self.next_register <= datetime.datetime.now(): if resp.status_code == 500 and self.next_register <= datetime.datetime.now():
resp = requests.post(self._url + '/call', '\n'.join(self._registered)) resp = requests.post(self._url + '/call', '\n'.join(self._metric_collectors))
self.next_register = datetime.datetime.now() + REGISTER_WAIT self.next_register = datetime.datetime.now() + REGISTER_WAIT
logger.debug('Register returned %s for %s metrics; setting next to %s', resp.status_code, logger.debug('Register returned %s for %s metrics; setting next to %s', resp.status_code,
len(self._registered), self.next_register) len(self._metric_collectors), self.next_register)
elif resp.status_code != 200: elif resp.status_code != 200:
logger.debug('Failed sending to prometheus: %s: %s: %s', resp.status_code, resp.text, logger.debug('Failed sending to prometheus: %s: %s: %s', resp.status_code, resp.text,
', '.join(reqs)) ', '.join(reqs))
@ -135,19 +135,20 @@ class _QueueSender(Thread):
class _Collector(object): class _Collector(object):
""" Collector for a Prometheus metric. """ """ Collector for a Prometheus metric. """
def __init__(self, enqueue_method, c_type, name, c_help, namespace='', subsystem='', **kwargs): def __init__(self, enqueue_method, collector_type, collector_name, collector_help,
namespace='', subsystem='', **kwargs):
self._enqueue_method = enqueue_method self._enqueue_method = enqueue_method
self._base_args = { self._base_args = {
'Name': name, 'Name': collector_name,
'Namespace': namespace, 'Namespace': namespace,
'Subsystem': subsystem, 'Subsystem': subsystem,
'Type': c_type, 'Type': collector_type,
} }
registration_params = dict(kwargs) registration_params = dict(kwargs)
registration_params.update(self._base_args) registration_params.update(self._base_args)
registration_params['Help'] = c_help registration_params['Help'] = collector_help
self._enqueue_method('register', registration_params) self._enqueue_method('register', registration_params)