diff --git a/metrics/prometheus.go b/metrics/prometheus.go index b5a53214..91b32b23 100644 --- a/metrics/prometheus.go +++ b/metrics/prometheus.go @@ -10,4 +10,7 @@ const ( var ( // StorageNamespace is the prometheus namespace of blob/cache related operations StorageNamespace = metrics.NewNamespace(NamespacePrefix, "storage", nil) + + // NotificationsNamespace is the prometheus namespace of notification related metrics + NotificationsNamespace = metrics.NewNamespace(NamespacePrefix, "notifications", nil) ) diff --git a/notifications/endpoint.go b/notifications/endpoint.go index a8a52d0c..854f1dd6 100644 --- a/notifications/endpoint.go +++ b/notifications/endpoint.go @@ -58,7 +58,7 @@ func NewEndpoint(name, url string, config EndpointConfig) *Endpoint { endpoint.url = url endpoint.EndpointConfig = config endpoint.defaults() - endpoint.metrics = newSafeMetrics() + endpoint.metrics = newSafeMetrics(name) // Configures the inmemory queue, retry, http pipeline. endpoint.Sink = newHTTPSink( diff --git a/notifications/http_test.go b/notifications/http_test.go index de47f789..b7845cf9 100644 --- a/notifications/http_test.go +++ b/notifications/http_test.go @@ -63,7 +63,7 @@ func TestHTTPSink(t *testing.T) { }) server := httptest.NewTLSServer(serverHandler) - metrics := newSafeMetrics() + metrics := newSafeMetrics("") sink := newHTTPSink(server.URL, 0, nil, nil, &endpointMetricsHTTPStatusListener{safeMetrics: metrics}) diff --git a/notifications/metrics.go b/notifications/metrics.go index a20af168..657b2aa0 100644 --- a/notifications/metrics.go +++ b/notifications/metrics.go @@ -5,6 +5,18 @@ import ( "fmt" "net/http" "sync" + + prometheus "github.com/docker/distribution/metrics" + "github.com/docker/go-metrics" +) + +var ( + // eventsCounter counts total events of incoming, success, failure, and errors + eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type", "endpoint") + // pendingGauge measures the pending queue size + pendingGauge = prometheus.NotificationsNamespace.NewLabeledGauge("pending", "The gauge of pending events in queue", metrics.Total, "endpoint") + // statusCounter counts the total notification call per each status code + statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code", "endpoint") ) // EndpointMetrics track various actions taken by the endpoint, typically by @@ -22,14 +34,16 @@ type EndpointMetrics struct { // safeMetrics guards the metrics implementation with a lock and provides a // safe update function. type safeMetrics struct { + EndpointName string EndpointMetrics sync.Mutex // protects statuses map } // newSafeMetrics returns safeMetrics with map allocated. -func newSafeMetrics() *safeMetrics { +func newSafeMetrics(name string) *safeMetrics { var sm safeMetrics sm.Statuses = make(map[string]int) + sm.EndpointName = name return &sm } @@ -61,6 +75,9 @@ func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Eve defer emsl.safeMetrics.Unlock() emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) emsl.Successes += len(events) + + statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1) + eventsCounter.WithValues("Successes", emsl.EndpointName).Inc(float64(len(events))) } func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) { @@ -68,12 +85,17 @@ func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Eve defer emsl.safeMetrics.Unlock() emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) emsl.Failures += len(events) + + statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1) + eventsCounter.WithValues("Failures", emsl.EndpointName).Inc(float64(len(events))) } func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) { emsl.safeMetrics.Lock() defer emsl.safeMetrics.Unlock() emsl.Errors += len(events) + + eventsCounter.WithValues("Errors", emsl.EndpointName).Inc(float64(len(events))) } // endpointMetricsEventQueueListener maintains the incoming events counter and @@ -87,12 +109,17 @@ func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) { defer eqc.Unlock() eqc.Events += len(events) eqc.Pending += len(events) + + eventsCounter.WithValues("Events", eqc.EndpointName).Inc() + pendingGauge.WithValues(eqc.EndpointName).Inc(float64(len(events))) } func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) { eqc.Lock() defer eqc.Unlock() eqc.Pending -= len(events) + + pendingGauge.WithValues(eqc.EndpointName).Dec(1) } // endpoints is global registry of endpoints used to report metrics to expvar @@ -149,4 +176,7 @@ func init() { })) registry.(*expvar.Map).Set("notifications", ¬ifications) + + // register prometheus metrics + metrics.Register(prometheus.NotificationsNamespace) } diff --git a/notifications/sinks_test.go b/notifications/sinks_test.go index 06f88b2c..4a69486b 100644 --- a/notifications/sinks_test.go +++ b/notifications/sinks_test.go @@ -66,7 +66,7 @@ func TestBroadcaster(t *testing.T) { func TestEventQueue(t *testing.T) { const nevents = 1000 var ts testSink - metrics := newSafeMetrics() + metrics := newSafeMetrics("") eq := newEventQueue( // delayed sync simulates destination slower than channel comms &delayedSink{