From 8b706168467e9352e0c567633c3c264a340de79d Mon Sep 17 00:00:00 2001 From: tifayuki Date: Tue, 13 Feb 2018 13:30:56 -0800 Subject: [PATCH 1/6] Add notification metrics It adds notification related prometheus metrics, including: - total count for events/success/failure/error - total count for notification per each status code - gauge of the pending notification queue Signed-off-by: tifayuki --- metrics/prometheus.go | 3 +++ notifications/metrics.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/metrics/prometheus.go b/metrics/prometheus.go index b5a53214..91b32b23 100644 --- a/metrics/prometheus.go +++ b/metrics/prometheus.go @@ -10,4 +10,7 @@ const ( var ( // StorageNamespace is the prometheus namespace of blob/cache related operations StorageNamespace = metrics.NewNamespace(NamespacePrefix, "storage", nil) + + // NotificationsNamespace is the prometheus namespace of notification related metrics + NotificationsNamespace = metrics.NewNamespace(NamespacePrefix, "notifications", nil) ) diff --git a/notifications/metrics.go b/notifications/metrics.go index a20af168..69960e9c 100644 --- a/notifications/metrics.go +++ b/notifications/metrics.go @@ -5,6 +5,18 @@ import ( "fmt" "net/http" "sync" + + prometheus "github.com/docker/distribution/metrics" + "github.com/docker/go-metrics" +) + +var ( + // eventsCounter counts total events of incoming, success, failure, and errors + eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type") + // pendingGauge measures the pending queue size + pendingGauge = prometheus.NotificationsNamespace.NewGauge("pending", "The gauge of pending events in queue", metrics.Total) + // statusCounter counts the total notification call per each status code + statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code") ) // EndpointMetrics track various actions taken by the endpoint, typically by @@ -61,6 +73,9 @@ func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Eve defer emsl.safeMetrics.Unlock() emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) emsl.Successes += len(events) + + statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1) + eventsCounter.WithValues("Successes").Inc(1) } func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) { @@ -68,12 +83,17 @@ func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Eve defer emsl.safeMetrics.Unlock() emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) emsl.Failures += len(events) + + statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1) + eventsCounter.WithValues("Failures").Inc(1) } func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) { emsl.safeMetrics.Lock() defer emsl.safeMetrics.Unlock() emsl.Errors += len(events) + + eventsCounter.WithValues("Errors").Inc(1) } // endpointMetricsEventQueueListener maintains the incoming events counter and @@ -87,12 +107,17 @@ func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) { defer eqc.Unlock() eqc.Events += len(events) eqc.Pending += len(events) + + eventsCounter.WithValues("Events").Inc() + pendingGauge.Inc(1) } func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) { eqc.Lock() defer eqc.Unlock() eqc.Pending -= len(events) + + pendingGauge.Dec(1) } // endpoints is global registry of endpoints used to report metrics to expvar @@ -149,4 +174,7 @@ func init() { })) registry.(*expvar.Map).Set("notifications", ¬ifications) + + // register prometheus metrics + metrics.Register(prometheus.NotificationsNamespace) } From 76da6290b0b99d6d6ce635424426e7803edb4303 Mon Sep 17 00:00:00 2001 From: Honglin Feng Date: Thu, 11 Oct 2018 21:39:02 +0800 Subject: [PATCH 2/6] add label to the metrics Signed-off-by: Honglin Feng --- notifications/endpoint.go | 2 +- notifications/http_test.go | 2 +- notifications/metrics.go | 26 ++++++++++++++------------ notifications/sinks_test.go | 2 +- 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/notifications/endpoint.go b/notifications/endpoint.go index a8a52d0c..854f1dd6 100644 --- a/notifications/endpoint.go +++ b/notifications/endpoint.go @@ -58,7 +58,7 @@ func NewEndpoint(name, url string, config EndpointConfig) *Endpoint { endpoint.url = url endpoint.EndpointConfig = config endpoint.defaults() - endpoint.metrics = newSafeMetrics() + endpoint.metrics = newSafeMetrics(name) // Configures the inmemory queue, retry, http pipeline. endpoint.Sink = newHTTPSink( diff --git a/notifications/http_test.go b/notifications/http_test.go index de47f789..b7845cf9 100644 --- a/notifications/http_test.go +++ b/notifications/http_test.go @@ -63,7 +63,7 @@ func TestHTTPSink(t *testing.T) { }) server := httptest.NewTLSServer(serverHandler) - metrics := newSafeMetrics() + metrics := newSafeMetrics("") sink := newHTTPSink(server.URL, 0, nil, nil, &endpointMetricsHTTPStatusListener{safeMetrics: metrics}) diff --git a/notifications/metrics.go b/notifications/metrics.go index 69960e9c..4464edd8 100644 --- a/notifications/metrics.go +++ b/notifications/metrics.go @@ -12,11 +12,11 @@ import ( var ( // eventsCounter counts total events of incoming, success, failure, and errors - eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type") + eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type", "to") // pendingGauge measures the pending queue size - pendingGauge = prometheus.NotificationsNamespace.NewGauge("pending", "The gauge of pending events in queue", metrics.Total) + pendingGauge = prometheus.NotificationsNamespace.NewLabeledGauge("pending", "The gauge of pending events in queue", metrics.Total, "to") // statusCounter counts the total notification call per each status code - statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code") + statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code", "to") ) // EndpointMetrics track various actions taken by the endpoint, typically by @@ -34,14 +34,16 @@ type EndpointMetrics struct { // safeMetrics guards the metrics implementation with a lock and provides a // safe update function. type safeMetrics struct { + EndpointName string EndpointMetrics sync.Mutex // protects statuses map } // newSafeMetrics returns safeMetrics with map allocated. -func newSafeMetrics() *safeMetrics { +func newSafeMetrics(name string) *safeMetrics { var sm safeMetrics sm.Statuses = make(map[string]int) + sm.EndpointName = name return &sm } @@ -74,8 +76,8 @@ func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Eve emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) emsl.Successes += len(events) - statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1) - eventsCounter.WithValues("Successes").Inc(1) + statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1) + eventsCounter.WithValues("Successes", emsl.EndpointName).Inc(1) } func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) { @@ -84,8 +86,8 @@ func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Eve emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) emsl.Failures += len(events) - statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1) - eventsCounter.WithValues("Failures").Inc(1) + statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1) + eventsCounter.WithValues("Failures", emsl.EndpointName).Inc(1) } func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) { @@ -93,7 +95,7 @@ func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) { defer emsl.safeMetrics.Unlock() emsl.Errors += len(events) - eventsCounter.WithValues("Errors").Inc(1) + eventsCounter.WithValues("Errors", emsl.EndpointName).Inc(1) } // endpointMetricsEventQueueListener maintains the incoming events counter and @@ -108,8 +110,8 @@ func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) { eqc.Events += len(events) eqc.Pending += len(events) - eventsCounter.WithValues("Events").Inc() - pendingGauge.Inc(1) + eventsCounter.WithValues("Events", eqc.EndpointName).Inc() + pendingGauge.WithValues(eqc.EndpointName).Inc(1) } func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) { @@ -117,7 +119,7 @@ func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) { defer eqc.Unlock() eqc.Pending -= len(events) - pendingGauge.Dec(1) + pendingGauge.WithValues(eqc.EndpointName).Dec(1) } // endpoints is global registry of endpoints used to report metrics to expvar diff --git a/notifications/sinks_test.go b/notifications/sinks_test.go index 06f88b2c..4a69486b 100644 --- a/notifications/sinks_test.go +++ b/notifications/sinks_test.go @@ -66,7 +66,7 @@ func TestBroadcaster(t *testing.T) { func TestEventQueue(t *testing.T) { const nevents = 1000 var ts testSink - metrics := newSafeMetrics() + metrics := newSafeMetrics("") eq := newEventQueue( // delayed sync simulates destination slower than channel comms &delayedSink{ From 228bafca0b419d16e50704d25d13b7d6a07c3925 Mon Sep 17 00:00:00 2001 From: Honglin Feng Date: Mon, 15 Oct 2018 19:50:38 +0800 Subject: [PATCH 3/6] run go fmt Signed-off-by: Honglin Feng --- registry/storage/driver/s3-aws/s3.go | 10 +++++----- registry/storage/linkedblobstore.go | 16 ++++++++-------- registry/storage/linkedblobstore_test.go | 4 ++-- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/registry/storage/driver/s3-aws/s3.go b/registry/storage/driver/s3-aws/s3.go index 126a07f6..124619ce 100644 --- a/registry/storage/driver/s3-aws/s3.go +++ b/registry/storage/driver/s3-aws/s3.go @@ -476,11 +476,11 @@ func New(params DriverParameters) (*Driver, error) { // } d := &driver{ - S3: s3obj, - Bucket: params.Bucket, - ChunkSize: params.ChunkSize, - Encrypt: params.Encrypt, - KeyID: params.KeyID, + S3: s3obj, + Bucket: params.Bucket, + ChunkSize: params.ChunkSize, + Encrypt: params.Encrypt, + KeyID: params.KeyID, MultipartCopyChunkSize: params.MultipartCopyChunkSize, MultipartCopyMaxConcurrency: params.MultipartCopyMaxConcurrency, MultipartCopyThresholdSize: params.MultipartCopyThresholdSize, diff --git a/registry/storage/linkedblobstore.go b/registry/storage/linkedblobstore.go index de591c8a..3fb1da26 100644 --- a/registry/storage/linkedblobstore.go +++ b/registry/storage/linkedblobstore.go @@ -312,14 +312,14 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string } bw := &blobWriter{ - ctx: ctx, - blobStore: lbs, - id: uuid, - startedAt: startedAt, - digester: digest.Canonical.Digester(), - fileWriter: fw, - driver: lbs.driver, - path: path, + ctx: ctx, + blobStore: lbs, + id: uuid, + startedAt: startedAt, + digester: digest.Canonical.Digester(), + fileWriter: fw, + driver: lbs.driver, + path: path, resumableDigestEnabled: lbs.resumableDigestEnabled, } diff --git a/registry/storage/linkedblobstore_test.go b/registry/storage/linkedblobstore_test.go index 7682b45c..43774d85 100644 --- a/registry/storage/linkedblobstore_test.go +++ b/registry/storage/linkedblobstore_test.go @@ -161,8 +161,8 @@ type mockBlobDescriptorServiceFactory struct { func (f *mockBlobDescriptorServiceFactory) BlobAccessController(svc distribution.BlobDescriptorService) distribution.BlobDescriptorService { return &mockBlobDescriptorService{ BlobDescriptorService: svc, - t: f.t, - stats: f.stats, + t: f.t, + stats: f.stats, } } From 09a63caa37ca9ad7b16b99d4806b82482ec796b5 Mon Sep 17 00:00:00 2001 From: Honglin Feng Date: Mon, 15 Oct 2018 20:18:36 +0800 Subject: [PATCH 4/6] run go fmt and goimports Signed-off-by: Honglin Feng --- registry/storage/driver/s3-aws/s3.go | 10 +++++----- registry/storage/linkedblobstore.go | 16 ++++++++-------- registry/storage/linkedblobstore_test.go | 4 ++-- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/registry/storage/driver/s3-aws/s3.go b/registry/storage/driver/s3-aws/s3.go index 124619ce..126a07f6 100644 --- a/registry/storage/driver/s3-aws/s3.go +++ b/registry/storage/driver/s3-aws/s3.go @@ -476,11 +476,11 @@ func New(params DriverParameters) (*Driver, error) { // } d := &driver{ - S3: s3obj, - Bucket: params.Bucket, - ChunkSize: params.ChunkSize, - Encrypt: params.Encrypt, - KeyID: params.KeyID, + S3: s3obj, + Bucket: params.Bucket, + ChunkSize: params.ChunkSize, + Encrypt: params.Encrypt, + KeyID: params.KeyID, MultipartCopyChunkSize: params.MultipartCopyChunkSize, MultipartCopyMaxConcurrency: params.MultipartCopyMaxConcurrency, MultipartCopyThresholdSize: params.MultipartCopyThresholdSize, diff --git a/registry/storage/linkedblobstore.go b/registry/storage/linkedblobstore.go index 3fb1da26..de591c8a 100644 --- a/registry/storage/linkedblobstore.go +++ b/registry/storage/linkedblobstore.go @@ -312,14 +312,14 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string } bw := &blobWriter{ - ctx: ctx, - blobStore: lbs, - id: uuid, - startedAt: startedAt, - digester: digest.Canonical.Digester(), - fileWriter: fw, - driver: lbs.driver, - path: path, + ctx: ctx, + blobStore: lbs, + id: uuid, + startedAt: startedAt, + digester: digest.Canonical.Digester(), + fileWriter: fw, + driver: lbs.driver, + path: path, resumableDigestEnabled: lbs.resumableDigestEnabled, } diff --git a/registry/storage/linkedblobstore_test.go b/registry/storage/linkedblobstore_test.go index 43774d85..7682b45c 100644 --- a/registry/storage/linkedblobstore_test.go +++ b/registry/storage/linkedblobstore_test.go @@ -161,8 +161,8 @@ type mockBlobDescriptorServiceFactory struct { func (f *mockBlobDescriptorServiceFactory) BlobAccessController(svc distribution.BlobDescriptorService) distribution.BlobDescriptorService { return &mockBlobDescriptorService{ BlobDescriptorService: svc, - t: f.t, - stats: f.stats, + t: f.t, + stats: f.stats, } } From d5a615b8c9a469c88bebec7294d0ae0c9931773a Mon Sep 17 00:00:00 2001 From: Honglin Feng Date: Fri, 15 Feb 2019 21:55:08 +0800 Subject: [PATCH 5/6] update the event number Signed-off-by: Honglin Feng --- notifications/metrics.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/notifications/metrics.go b/notifications/metrics.go index 4464edd8..3b43c51f 100644 --- a/notifications/metrics.go +++ b/notifications/metrics.go @@ -77,7 +77,7 @@ func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Eve emsl.Successes += len(events) statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1) - eventsCounter.WithValues("Successes", emsl.EndpointName).Inc(1) + eventsCounter.WithValues("Successes", emsl.EndpointName).Inc(float64(len(events))) } func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) { @@ -87,7 +87,7 @@ func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Eve emsl.Failures += len(events) statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1) - eventsCounter.WithValues("Failures", emsl.EndpointName).Inc(1) + eventsCounter.WithValues("Failures", emsl.EndpointName).Inc(float64(len(events))) } func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) { @@ -95,7 +95,7 @@ func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) { defer emsl.safeMetrics.Unlock() emsl.Errors += len(events) - eventsCounter.WithValues("Errors", emsl.EndpointName).Inc(1) + eventsCounter.WithValues("Errors", emsl.EndpointName).Inc(float64(len(events))) } // endpointMetricsEventQueueListener maintains the incoming events counter and @@ -111,7 +111,7 @@ func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) { eqc.Pending += len(events) eventsCounter.WithValues("Events", eqc.EndpointName).Inc() - pendingGauge.WithValues(eqc.EndpointName).Inc(1) + pendingGauge.WithValues(eqc.EndpointName).Inc(float64(len(events))) } func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) { From 92a6436714be6438f1cc8e4490a0c638868881e9 Mon Sep 17 00:00:00 2001 From: Honglin Feng Date: Sun, 17 Feb 2019 12:46:01 +0800 Subject: [PATCH 6/6] rename the metrics label Signed-off-by: Honglin Feng --- notifications/metrics.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/notifications/metrics.go b/notifications/metrics.go index 3b43c51f..657b2aa0 100644 --- a/notifications/metrics.go +++ b/notifications/metrics.go @@ -12,11 +12,11 @@ import ( var ( // eventsCounter counts total events of incoming, success, failure, and errors - eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type", "to") + eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type", "endpoint") // pendingGauge measures the pending queue size - pendingGauge = prometheus.NotificationsNamespace.NewLabeledGauge("pending", "The gauge of pending events in queue", metrics.Total, "to") + pendingGauge = prometheus.NotificationsNamespace.NewLabeledGauge("pending", "The gauge of pending events in queue", metrics.Total, "endpoint") // statusCounter counts the total notification call per each status code - statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code", "to") + statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code", "endpoint") ) // EndpointMetrics track various actions taken by the endpoint, typically by