diff --git a/Dockerfile b/Dockerfile index cde82f190..bf501282b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -64,6 +64,10 @@ RUN curl -O https://storage.googleapis.com/golang/go1.6.linux-amd64.tar.gz && \ RUN curl -L -o /usr/local/bin/jwtproxy https://github.com/coreos/jwtproxy/releases/download/v0.0.1/jwtproxy-linux-x64 RUN chmod +x /usr/local/bin/jwtproxy +# Install prometheus-aggregator +RUN curl -L -o /usr/local/bin/prometheus-aggregator https://github.com/coreos/prometheus-aggregator/releases/download/v0.0.1-alpha/prometheus-aggregator +RUN chmod +x /usr/local/bin/prometheus-aggregator + # Install Grunt RUN ln -s /usr/bin/nodejs /usr/bin/node RUN npm install -g grunt-cli diff --git a/app.py b/app.py index 315bb08ca..f88570183 100644 --- a/app.py +++ b/app.py @@ -25,7 +25,6 @@ from data.archivedlogs import LogArchive from data.userevent import UserEventsBuilderModule from data.queue import WorkQueue, BuildMetricQueueReporter from util import get_app_url -from util.saas.prometheus import PrometheusPlugin from util.saas.analytics import Analytics from util.saas.exceptionlog import Sentry from util.names import urn_generator @@ -35,11 +34,12 @@ from util.config.oauth import (GoogleOAuthConfig, GithubOAuthConfig, GitLabOAuth from util.security.signing import Signer from util.security.instancekeys import InstanceKeys from util.saas.cloudwatch import start_cloudwatch_sender -from util.saas.metricqueue import MetricQueue from util.config.provider import get_config_provider from util.config.configutil import generate_secret_key from util.config.superusermanager import SuperUserManager from util.secscan.api import SecurityScannerAPI +from util.metrics.metricqueue import MetricQueue +from util.metrics.prometheus import PrometheusPlugin OVERRIDE_CONFIG_DIRECTORY = 'conf/stack/' OVERRIDE_CONFIG_YAML_FILENAME = 'conf/stack/config.yaml' diff --git a/conf/init/service/prometheus-aggregator/log/run b/conf/init/service/prometheus-aggregator/log/run new file mode 100755 index 000000000..005c506d0 --- /dev/null +++ b/conf/init/service/prometheus-aggregator/log/run @@ -0,0 +1,7 @@ +#!/bin/sh + +# Ensure dependencies start before the logger +sv check syslog-ng > /dev/null || exit 1 + +# Start the logger +exec logger -i -t prometheus-aggregator diff --git a/conf/init/service/prometheus-aggregator/run b/conf/init/service/prometheus-aggregator/run new file mode 100755 index 000000000..fc9b157c7 --- /dev/null +++ b/conf/init/service/prometheus-aggregator/run @@ -0,0 +1,7 @@ +#! /bin/bash + +echo 'Starting prometheus aggregator' + +/usr/local/bin/prometheus-aggregator + +echo 'Prometheus aggregator exited' \ No newline at end of file diff --git a/config.py b/config.py index c808841a4..f24300dd6 100644 --- a/config.py +++ b/config.py @@ -355,3 +355,5 @@ class DefaultConfig(object): # The whitelist of client IDs for OAuth applications that allow for direct login. DIRECT_OAUTH_CLIENTID_WHITELIST = [] + # URL that specifies the location of the prometheus stats aggregator. + PROMETHEUS_AGGREGATOR_URL = 'http://localhost:9092' diff --git a/endpoints/api/__init__.py b/endpoints/api/__init__.py index 84d4e537c..ccb83e554 100644 --- a/endpoints/api/__init__.py +++ b/endpoints/api/__init__.py @@ -23,7 +23,7 @@ from auth.auth import process_oauth from endpoints.csrf import csrf_protect from endpoints.exception import ApiException, Unauthorized, InvalidRequest, InvalidResponse, FreshLoginRequired from endpoints.decorators import check_anon_protection -from util.saas.metricqueue import time_decorator +from util.metrics.metricqueue import time_decorator from util.pagination import encrypt_page_token, decrypt_page_token logger = logging.getLogger(__name__) diff --git a/endpoints/v1/__init__.py b/endpoints/v1/__init__.py index 3eabbe338..1e9715787 100644 --- a/endpoints/v1/__init__.py +++ b/endpoints/v1/__init__.py @@ -2,7 +2,7 @@ from flask import Blueprint, make_response from app import metric_queue from endpoints.decorators import anon_protect, anon_allowed -from util.saas.metricqueue import time_blueprint +from util.metrics.metricqueue import time_blueprint v1_bp = Blueprint('v1', __name__) diff --git a/endpoints/v2/__init__.py b/endpoints/v2/__init__.py index e57dcb623..1ab42747f 100644 --- a/endpoints/v2/__init__.py +++ b/endpoints/v2/__init__.py @@ -18,7 +18,7 @@ from endpoints.decorators import anon_protect, anon_allowed from endpoints.v2.errors import V2RegistryException, Unauthorized from util.http import abort from util.registry.dockerver import docker_version -from util.saas.metricqueue import time_blueprint +from util.metrics.metricqueue import time_blueprint logger = logging.getLogger(__name__) v2_bp = Blueprint('v2', __name__) diff --git a/prom_aggregator/.godir b/prom_aggregator/.godir deleted file mode 100644 index ec82fbf32..000000000 --- a/prom_aggregator/.godir +++ /dev/null @@ -1 +0,0 @@ -github.com/coreos-inc/quay/prom_aggregator diff --git a/prom_aggregator/Dockerfile b/prom_aggregator/Dockerfile deleted file mode 100644 index 26e62905d..000000000 --- a/prom_aggregator/Dockerfile +++ /dev/null @@ -1,14 +0,0 @@ -FROM golang - -RUN mkdir -p /go/src/app -WORKDIR /go/src/app - -ENTRYPOINT ["go-wrapper", "run"] -ENV GO15VENDOREXPERIMENT=1 - -COPY . /go/src/app - -RUN go-wrapper download -RUN go-wrapper install - -EXPOSE 9092 diff --git a/prom_aggregator/main.go b/prom_aggregator/main.go deleted file mode 100644 index edebf3dce..000000000 --- a/prom_aggregator/main.go +++ /dev/null @@ -1,338 +0,0 @@ -package main - -import ( - "encoding/json" - "flag" - "fmt" - "io" - "net/http" - "os" - "strings" - "sync" - "time" - - "github.com/coreos/pkg/flagutil" - "github.com/prometheus/client_golang/prometheus" - log "github.com/Sirupsen/logrus" -) - -var listen = flag.String("listen", ":9092", "") -var level = flag.String("loglevel", "info", "default log level: debug, info, warn, error, fatal, panic") - -func main() { - if err := flag.CommandLine.Parse(os.Args[1:]); err != nil { - fmt.Fprintln(os.Stderr, err.Error()) - os.Exit(1) - } - if err := flagutil.SetFlagsFromEnv(flag.CommandLine, "PROMETHEUS_AGGREGATOR"); err != nil { - fmt.Fprintln(os.Stderr, err.Error()) - os.Exit(1) - } - - lvl, err := log.ParseLevel(*level) - if err != nil { - fmt.Fprintln(os.Stderr, err.Error()) - os.Exit(1) - } - - log.SetLevel(lvl) - - http.Handle("/metrics", prometheus.Handler()) - http.HandleFunc("/call", wrap(call)) - log.Infoln("listening on", *listen) - log.Fatal(http.ListenAndServe(*listen, nil)) -} - -func wrap(f func(http.ResponseWriter, *http.Request) error) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - err := f(w, r) - if err != nil { - http.Error(w, err.Error(), 500) - } - } -} - -type options struct { - // common - identifier - Help string - LabelNames []string - - // summary - Objectives map[float64]float64 - MaxAge time.Duration - AgeBuckets uint32 - BufCap uint32 - - // histogram - Buckets []float64 -} - -func (o *options) register() error { - lock.RLock() - mv, ok := metrics[o.identifier] - lock.RUnlock() - - if ok { - return nil - } - switch o.Type { - case "Gauge": - mv = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: o.Namespace, - Subsystem: o.Subsystem, - Name: o.Name, - Help: o.Help, - }, o.LabelNames) - case "Counter": - mv = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: o.Namespace, - Subsystem: o.Subsystem, - Name: o.Name, - Help: o.Help, - }, o.LabelNames) - case "Untyped": - mv = prometheus.NewUntypedVec(prometheus.UntypedOpts{ - Namespace: o.Namespace, - Subsystem: o.Subsystem, - Name: o.Name, - Help: o.Help, - }, o.LabelNames) - case "Summary": - mv = prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: o.Namespace, - Subsystem: o.Subsystem, - Name: o.Name, - Help: o.Help, - Objectives: o.Objectives, - MaxAge: o.MaxAge, - AgeBuckets: o.AgeBuckets, - BufCap: o.BufCap, - }, o.LabelNames) - case "Histogram": - mv = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: o.Namespace, - Subsystem: o.Subsystem, - Name: o.Name, - Help: o.Help, - Buckets: o.Buckets, - }, o.LabelNames) - default: - return fmt.Errorf("unknown type: %s", o.Type) - } - err := prometheus.Register(mv) - if err != nil { - // TODO(mjibson): cache the error instead of creating new collectors each time - return err - } - lock.Lock() - metrics[o.identifier] = mv - lock.Unlock() - return nil -} - -type message struct { - Call string - Data json.RawMessage -} - -func call(w http.ResponseWriter, r *http.Request) error { - dec := json.NewDecoder(r.Body) - var me MultiError - for { - var m message - var err error - if err = dec.Decode(&m); err == io.EOF { - break - } else if err != nil { - return err - } - log.Debugln(m.Call, string(m.Data)) - switch m.Call { - case "put": - err = put(m.Data) - case "register": - err = register(m.Data) - default: - err = fmt.Errorf("unknown call: %s", m.Call) - } - me = append(me, err) - } - return me.AsError() -} - -func register(msg json.RawMessage) error { - var o options - if err := json.Unmarshal(msg, &o); err != nil { - return err - } - return o.register() -} - -func put(msg json.RawMessage) error { - var m metric - if err := json.Unmarshal(msg, &m); err != nil { - return err - } - return m.put() -} - -var ( - lock sync.RWMutex - metrics = make(map[identifier]prometheus.Collector) -) - -type identifier struct { - Namespace string - Subsystem string - Name string - Type string -} - -type metric struct { - identifier - - LabelValues []string - // Method is the method name: Set, Inc, Dec, Add, Sub, Observe (dependant on the Type). - Method string - // Value is the parameter to Method. Unused for Inc and Dec. - Value float64 -} - -const ( - errUnexpectedType = "unknown type %T: expected %s" - errUnknownMethod = "unknown method %s on type %s" -) - -func (m *metric) put() error { - lock.RLock() - mv, ok := metrics[m.identifier] - lock.RUnlock() - if !ok { - return fmt.Errorf("unknown collector: %v", m.identifier) - } - switch m.Type { - case "Gauge": - v, ok := mv.(*prometheus.GaugeVec) - if !ok { - return fmt.Errorf(errUnexpectedType, m.Type, mv) - } - c, err := v.GetMetricWithLabelValues(m.LabelValues...) - if err != nil { - return err - } - switch m.Method { - case "Set": - c.Set(m.Value) - case "Inc": - c.Inc() - case "Dec": - c.Dec() - case "Add": - c.Add(m.Value) - case "Sub": - c.Sub(m.Value) - default: - return fmt.Errorf(errUnknownMethod, m.Method, m.Type) - } - case "Untyped": - v, ok := mv.(*prometheus.UntypedVec) - if !ok { - return fmt.Errorf(errUnexpectedType, m.Type, mv) - } - c, err := v.GetMetricWithLabelValues(m.LabelValues...) - if err != nil { - return err - } - switch m.Method { - case "Set": - c.Set(m.Value) - case "Inc": - c.Inc() - case "Dec": - c.Dec() - case "Add": - c.Add(m.Value) - case "Sub": - c.Sub(m.Value) - default: - return fmt.Errorf(errUnknownMethod, m.Method, m.Type) - } - case "Counter": - v, ok := mv.(*prometheus.CounterVec) - if !ok { - return fmt.Errorf(errUnexpectedType, m.Type, mv) - } - c, err := v.GetMetricWithLabelValues(m.LabelValues...) - if err != nil { - return err - } - switch m.Method { - case "Set": - c.Set(m.Value) - case "Inc": - c.Inc() - case "Add": - c.Add(m.Value) - default: - return fmt.Errorf(errUnknownMethod, m.Method, m.Type) - } - case "Summary": - v, ok := mv.(*prometheus.SummaryVec) - if !ok { - return fmt.Errorf(errUnexpectedType, m.Type, mv) - } - c, err := v.GetMetricWithLabelValues(m.LabelValues...) - if err != nil { - return err - } - switch m.Method { - case "Observe": - c.Observe(m.Value) - default: - return fmt.Errorf(errUnknownMethod, m.Method, m.Type) - } - case "Histogram": - v, ok := mv.(*prometheus.HistogramVec) - if !ok { - return fmt.Errorf(errUnexpectedType, m.Type, mv) - } - c, err := v.GetMetricWithLabelValues(m.LabelValues...) - if err != nil { - return err - } - switch m.Method { - case "Observe": - c.Observe(m.Value) - default: - return fmt.Errorf(errUnknownMethod, m.Method, m.Type) - } - default: - return fmt.Errorf("unknown type: %s", m.Type) - } - return nil -} - -// MultiError is returned by batch operations when there are errors with -// particular elements. Errors will be in a one-to-one correspondence with -// the input elements; successful elements will have a nil entry. -type MultiError []error - -func (m MultiError) Error() string { - var strs []string - for i, err := range m { - if err != nil { - strs = append(strs, fmt.Sprintf("[%d] %v", i, err)) - } - } - return strings.Join(strs, " ") -} - -func (m MultiError) AsError() error { - for _, e := range m { - if e != nil { - return m - } - } - return nil -} diff --git a/util/metrics/__init__.py b/util/metrics/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/util/saas/metricqueue.py b/util/metrics/metricqueue.py similarity index 100% rename from util/saas/metricqueue.py rename to util/metrics/metricqueue.py diff --git a/util/saas/prometheus.py b/util/metrics/prometheus.py similarity index 86% rename from util/saas/prometheus.py rename to util/metrics/prometheus.py index 80f7f553a..331c73412 100644 --- a/util/saas/prometheus.py +++ b/util/metrics/prometheus.py @@ -40,12 +40,12 @@ class PrometheusPlugin(object): class Prometheus(object): """ Aggregator for collecting stats that are reported to Prometheus. """ def __init__(self, url=None): - self._registered = [] + self._metric_collectors = [] self._url = url if url is not None: self._queue = Queue(QUEUE_MAX) - self._sender = _QueueSender(self._queue, url, self._registered) + self._sender = _QueueSender(self._queue, url, self._metric_collectors) self._sender.start() logger.debug('Prometheus aggregator sending to %s', url) else: @@ -62,7 +62,7 @@ class Prometheus(object): }) if call == 'register': - self._registered.append(v) + self._metric_collectors.append(v) return try: @@ -71,7 +71,7 @@ class Prometheus(object): # If the queue is full, it is because 1) no aggregator was enabled or 2) # the aggregator is taking a long time to respond to requests. In the case # of 1, it's probably enterprise mode and we don't care. In the case of 2, - # the response timeout error is printed at another place. In either case, + # the response timeout error is printed inside the queue handler. In either case, # we don't need to print an error here. pass @@ -97,13 +97,13 @@ class Prometheus(object): class _QueueSender(Thread): """ Helper class which uses a thread to asynchronously send metrics to the local Prometheus aggregator. """ - def __init__(self, queue, url, registered): + def __init__(self, queue, url, metric_collectors): Thread.__init__(self) self.daemon = True self.next_register = datetime.datetime.now() self._queue = queue self._url = url - self._registered = registered + self._metric_collectors = metric_collectors def run(self): while True: @@ -120,10 +120,10 @@ class _QueueSender(Thread): try: resp = requests.post(self._url + '/call', '\n'.join(reqs)) if resp.status_code == 500 and self.next_register <= datetime.datetime.now(): - resp = requests.post(self._url + '/call', '\n'.join(self._registered)) + resp = requests.post(self._url + '/call', '\n'.join(self._metric_collectors)) self.next_register = datetime.datetime.now() + REGISTER_WAIT logger.debug('Register returned %s for %s metrics; setting next to %s', resp.status_code, - len(self._registered), self.next_register) + len(self._metric_collectors), self.next_register) elif resp.status_code != 200: logger.debug('Failed sending to prometheus: %s: %s: %s', resp.status_code, resp.text, ', '.join(reqs)) @@ -135,19 +135,20 @@ class _QueueSender(Thread): class _Collector(object): """ Collector for a Prometheus metric. """ - def __init__(self, enqueue_method, c_type, name, c_help, namespace='', subsystem='', **kwargs): + def __init__(self, enqueue_method, collector_type, collector_name, collector_help, + namespace='', subsystem='', **kwargs): self._enqueue_method = enqueue_method self._base_args = { - 'Name': name, + 'Name': collector_name, 'Namespace': namespace, 'Subsystem': subsystem, - 'Type': c_type, + 'Type': collector_type, } registration_params = dict(kwargs) registration_params.update(self._base_args) - registration_params['Help'] = c_help + registration_params['Help'] = collector_help self._enqueue_method('register', registration_params)