Add duration metric collector decorator (#1885)
Track time-to-start for builders Track time-to-build for builders Track ec2 builder fallbacks Track build time
This commit is contained in:
parent
85d611e2fb
commit
832ee89923
5 changed files with 110 additions and 5 deletions
|
@ -4,6 +4,7 @@ import uuid
|
||||||
import calendar
|
import calendar
|
||||||
import os.path
|
import os.path
|
||||||
import json
|
import json
|
||||||
|
import time
|
||||||
|
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
@ -385,7 +386,7 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
build_uuid = build_job.job_details['build_uuid']
|
build_uuid = build_job.job_details['build_uuid']
|
||||||
logger.debug('Calling schedule with job: %s', build_uuid)
|
logger.debug('Calling schedule with job: %s', build_uuid)
|
||||||
|
|
||||||
# Check if there are worker slots avialable by checking the number of jobs in etcd
|
# Check if there are worker slots available by checking the number of jobs in etcd
|
||||||
allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 1)
|
allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 1)
|
||||||
try:
|
try:
|
||||||
active_jobs = yield From(self._etcd_client.read(self._etcd_job_prefix, recursive=True))
|
active_jobs = yield From(self._etcd_client.read(self._etcd_job_prefix, recursive=True))
|
||||||
|
@ -450,6 +451,7 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
|
|
||||||
# Check if we can use this executor based on the retries remaining.
|
# Check if we can use this executor based on the retries remaining.
|
||||||
if executor.minimum_retry_threshold > build_job.retries_remaining:
|
if executor.minimum_retry_threshold > build_job.retries_remaining:
|
||||||
|
metric_queue.builder_fallback.Inc()
|
||||||
logger.debug('Job %s cannot use executor %s as it is below retry threshold %s (retry #%s)',
|
logger.debug('Job %s cannot use executor %s as it is below retry threshold %s (retry #%s)',
|
||||||
build_uuid, executor.name, executor.minimum_retry_threshold,
|
build_uuid, executor.name, executor.minimum_retry_threshold,
|
||||||
build_job.retries_remaining)
|
build_job.retries_remaining)
|
||||||
|
@ -499,6 +501,7 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
'execution_id': execution_id,
|
'execution_id': execution_id,
|
||||||
'executor_name': started_with_executor.name,
|
'executor_name': started_with_executor.name,
|
||||||
'job_queue_item': build_job.job_item,
|
'job_queue_item': build_job.job_item,
|
||||||
|
'start_time': time.time(),
|
||||||
})
|
})
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -535,6 +538,14 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
job.build_uuid, build_component.builder_realm)
|
job.build_uuid, build_component.builder_realm)
|
||||||
yield From(build_component.start_build(job))
|
yield From(build_component.start_build(job))
|
||||||
|
|
||||||
|
try:
|
||||||
|
# log start time to prometheus
|
||||||
|
realm_data = yield From(self._etcd_client.read(self._etcd_realm_key(build_component.builder_realm)))
|
||||||
|
start_time = json.loads(realm_data.value)['start_time']
|
||||||
|
metric_queue.builder_time_to_build(time.time() - start_time, labelvalues=[realm_data.executor_name])
|
||||||
|
except (KeyError, etcd.EtcdKeyError):
|
||||||
|
logger.warning('Could not read realm key %s', build_component.builder_realm)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Clean up the bookkeeping for allowing any manager to take the job.
|
# Clean up the bookkeeping for allowing any manager to take the job.
|
||||||
yield From(self._etcd_client.delete(self._etcd_realm_key(build_component.builder_realm)))
|
yield From(self._etcd_client.delete(self._etcd_realm_key(build_component.builder_realm)))
|
||||||
|
@ -556,9 +567,17 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
|
|
||||||
self.job_complete_callback(build_job, job_status, executor_name)
|
self.job_complete_callback(build_job, job_status, executor_name)
|
||||||
|
|
||||||
# Kill the ephmeral builder.
|
# Kill the ephemeral builder.
|
||||||
yield From(self.kill_builder_executor(build_job.build_uuid))
|
yield From(self.kill_builder_executor(build_job.build_uuid))
|
||||||
|
|
||||||
|
try:
|
||||||
|
# log build time to prometheus
|
||||||
|
realm_data = yield From(self._etcd_client.read(self._etcd_realm_key(build_component.builder_realm)))
|
||||||
|
start_time = json.loads(realm_data.value)['start_time']
|
||||||
|
metric_queue.build_time(time.time() - start_time, labelvalues=[realm_data.executor_name])
|
||||||
|
except (KeyError, etcd.EtcdKeyError):
|
||||||
|
logger.warning('Could not read realm key %s', build_component.builder_realm)
|
||||||
|
|
||||||
# Delete the build job from etcd.
|
# Delete the build job from etcd.
|
||||||
job_key = self._etcd_job_key(build_job)
|
job_key = self._etcd_job_key(build_job)
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -17,6 +17,7 @@ from functools import partial
|
||||||
from buildman.asyncutil import AsyncWrapper
|
from buildman.asyncutil import AsyncWrapper
|
||||||
from container_cloud_config import CloudConfigContext
|
from container_cloud_config import CloudConfigContext
|
||||||
from app import metric_queue, app
|
from app import metric_queue, app
|
||||||
|
from util.metrics.metricqueue import duration_collector_async
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -131,6 +132,7 @@ class EC2Executor(BuilderExecutor):
|
||||||
return stack_amis[ec2_region]
|
return stack_amis[ec2_region]
|
||||||
|
|
||||||
@coroutine
|
@coroutine
|
||||||
|
@duration_collector_async(metric_queue.builder_time_to_start, ['ec2'])
|
||||||
def start_builder(self, realm, token, build_uuid):
|
def start_builder(self, realm, token, build_uuid):
|
||||||
region = self.executor_config['EC2_REGION']
|
region = self.executor_config['EC2_REGION']
|
||||||
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
|
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
|
||||||
|
@ -240,6 +242,7 @@ class PopenExecutor(BuilderExecutor):
|
||||||
""" Executor which uses Popen to fork a quay-builder process.
|
""" Executor which uses Popen to fork a quay-builder process.
|
||||||
"""
|
"""
|
||||||
@coroutine
|
@coroutine
|
||||||
|
@duration_collector_async(metric_queue.builder_time_to_start, ['fork'])
|
||||||
def start_builder(self, realm, token, build_uuid):
|
def start_builder(self, realm, token, build_uuid):
|
||||||
# Now start a machine for this job, adding the machine id to the etcd information
|
# Now start a machine for this job, adding the machine id to the etcd information
|
||||||
logger.debug('Forking process for build')
|
logger.debug('Forking process for build')
|
||||||
|
@ -391,6 +394,7 @@ class KubernetesExecutor(BuilderExecutor):
|
||||||
}
|
}
|
||||||
|
|
||||||
@coroutine
|
@coroutine
|
||||||
|
@duration_collector_async(metric_queue.builder_time_to_start, ['k8s'])
|
||||||
def start_builder(self, realm, token, build_uuid):
|
def start_builder(self, realm, token, build_uuid):
|
||||||
# generate resource
|
# generate resource
|
||||||
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
|
channel = self.executor_config.get('COREOS_CHANNEL', 'stable')
|
||||||
|
|
|
@ -13,7 +13,8 @@ from buildman.manager.ephemeral import (EphemeralBuilderManager, EtcdAction,
|
||||||
ETCD_MAX_WATCH_TIMEOUT)
|
ETCD_MAX_WATCH_TIMEOUT)
|
||||||
from buildman.component.buildcomponent import BuildComponent
|
from buildman.component.buildcomponent import BuildComponent
|
||||||
from buildman.server import BuildJobResult
|
from buildman.server import BuildJobResult
|
||||||
|
from util.metrics.metricqueue import duration_collector_async
|
||||||
|
from app import metric_queue
|
||||||
|
|
||||||
BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
|
BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
|
||||||
REALM_ID = '1234-realm'
|
REALM_ID = '1234-realm'
|
||||||
|
@ -33,6 +34,7 @@ class TestExecutor(BuilderExecutor):
|
||||||
job_stopped = None
|
job_stopped = None
|
||||||
|
|
||||||
@coroutine
|
@coroutine
|
||||||
|
@duration_collector_async(metric_queue.builder_time_to_start, labelvalues=["testlabel"])
|
||||||
def start_builder(self, realm, token, build_uuid):
|
def start_builder(self, realm, token, build_uuid):
|
||||||
self.job_started = str(uuid.uuid4())
|
self.job_started = str(uuid.uuid4())
|
||||||
raise Return(self.job_started)
|
raise Return(self.job_started)
|
||||||
|
@ -45,6 +47,7 @@ class TestExecutor(BuilderExecutor):
|
||||||
|
|
||||||
class BadExecutor(BuilderExecutor):
|
class BadExecutor(BuilderExecutor):
|
||||||
@coroutine
|
@coroutine
|
||||||
|
@duration_collector_async(metric_queue.builder_time_to_start, labelvalues=["testlabel"])
|
||||||
def start_builder(self, realm, token, build_uuid):
|
def start_builder(self, realm, token, build_uuid):
|
||||||
raise ExecutorException('raised on purpose!')
|
raise ExecutorException('raised on purpose!')
|
||||||
|
|
||||||
|
@ -210,6 +213,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
||||||
# Take the job ourselves
|
# Take the job ourselves
|
||||||
yield From(self.manager.build_component_ready(test_component))
|
yield From(self.manager.build_component_ready(test_component))
|
||||||
|
|
||||||
|
self.etcd_client_mock.read.assert_called_with(os.path.join('realm/', REALM_ID))
|
||||||
self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID))
|
self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID))
|
||||||
self.etcd_client_mock.delete.reset_mock()
|
self.etcd_client_mock.delete.reset_mock()
|
||||||
|
|
||||||
|
@ -743,4 +747,3 @@ class TestEphemeral(EphemeralBuilderTestCase):
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
||||||
|
|
50
test/test_metricqueue.py
Normal file
50
test/test_metricqueue.py
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
import time
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from mock import Mock
|
||||||
|
from trollius import coroutine, Return, get_event_loop
|
||||||
|
|
||||||
|
from util.metrics.metricqueue import duration_collector_async
|
||||||
|
|
||||||
|
|
||||||
|
mock_histogram = Mock()
|
||||||
|
|
||||||
|
class NonReturn(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@coroutine
|
||||||
|
@duration_collector_async(mock_histogram, labelvalues=["testlabel"])
|
||||||
|
def duration_decorated():
|
||||||
|
time.sleep(1)
|
||||||
|
raise Return("fin")
|
||||||
|
|
||||||
|
|
||||||
|
@coroutine
|
||||||
|
@duration_collector_async(mock_histogram, labelvalues=["testlabel"])
|
||||||
|
def duration_decorated_error():
|
||||||
|
raise NonReturn("not a Return error")
|
||||||
|
|
||||||
|
|
||||||
|
class DurationDecoratorTestCase(unittest.TestCase):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
self.etcd_client_mock = None
|
||||||
|
self.loop = get_event_loop()
|
||||||
|
super(DurationDecoratorTestCase, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
def test_duration_decorator(self):
|
||||||
|
self.loop.run_until_complete(duration_decorated())
|
||||||
|
assert mock_histogram.Observe.called
|
||||||
|
assert 1 - mock_histogram.Observe.call_args[0][0] < 1 # duration should be close to 1s
|
||||||
|
assert mock_histogram.Observe.call_args[1]["labelvalues"] == ["testlabel"]
|
||||||
|
|
||||||
|
def test_duration_decorator_error(self):
|
||||||
|
mock_histogram.reset_mock()
|
||||||
|
|
||||||
|
with self.assertRaises(NonReturn):
|
||||||
|
self.loop.run_until_complete(duration_decorated_error())
|
||||||
|
assert not mock_histogram.Observe.called
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
|
@ -6,6 +6,7 @@ from functools import wraps
|
||||||
from Queue import Queue, Full
|
from Queue import Queue, Full
|
||||||
|
|
||||||
from flask import g, request
|
from flask import g, request
|
||||||
|
from trollius import Return
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -13,6 +14,9 @@ logger = logging.getLogger(__name__)
|
||||||
# Buckets for the API response times.
|
# Buckets for the API response times.
|
||||||
API_RESPONSE_TIME_BUCKETS = [.01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0]
|
API_RESPONSE_TIME_BUCKETS = [.01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0]
|
||||||
|
|
||||||
|
# Buckets for the builder start times.
|
||||||
|
BUILDER_START_TIME_BUCKETS = [.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 180.0, 240.0, 300.0, 600.0]
|
||||||
|
|
||||||
|
|
||||||
class MetricQueue(object):
|
class MetricQueue(object):
|
||||||
""" Object to which various metrics are written, for distribution to metrics collection
|
""" Object to which various metrics are written, for distribution to metrics collection
|
||||||
|
@ -33,6 +37,16 @@ class MetricQueue(object):
|
||||||
'Multipart upload ends.', labelnames=['type'])
|
'Multipart upload ends.', labelnames=['type'])
|
||||||
self.build_capacity_shortage = prom.create_gauge('build_capacity_shortage',
|
self.build_capacity_shortage = prom.create_gauge('build_capacity_shortage',
|
||||||
'Build capacity shortage.')
|
'Build capacity shortage.')
|
||||||
|
self.builder_time_to_start = prom.create_histogram('builder_tts',
|
||||||
|
'Time from triggering to starting a builder.',
|
||||||
|
labelnames=['builder_type'],
|
||||||
|
buckets=BUILDER_START_TIME_BUCKETS)
|
||||||
|
self.builder_time_to_build = prom.create_histogram('builder_ttb',
|
||||||
|
'Time from triggering to actually starting a build',
|
||||||
|
labelnames=['builder_type'],
|
||||||
|
buckets=BUILDER_START_TIME_BUCKETS)
|
||||||
|
self.build_time = prom.create_histogram('build_time', 'Time spent buildig', labelnames=['builder_type'])
|
||||||
|
self.builder_fallback = prom.create_counter('builder_fallback', 'Builder fell back to ec2 executor')
|
||||||
self.percent_building = prom.create_gauge('build_percent_building', 'Percent building.')
|
self.percent_building = prom.create_gauge('build_percent_building', 'Percent building.')
|
||||||
self.build_counter = prom.create_counter('builds', 'Number of builds', labelnames=['name'])
|
self.build_counter = prom.create_counter('builds', 'Number of builds', labelnames=['name'])
|
||||||
self.ephemeral_build_workers = prom.create_counter('ephemeral_build_workers',
|
self.ephemeral_build_workers = prom.create_counter('ephemeral_build_workers',
|
||||||
|
@ -88,6 +102,22 @@ class MetricQueue(object):
|
||||||
return self._queue.get_nowait()
|
return self._queue.get_nowait()
|
||||||
|
|
||||||
|
|
||||||
|
def duration_collector_async(metric, labelvalues):
|
||||||
|
""" Decorates a method to have its duration time logged to the metric. """
|
||||||
|
def decorator(func):
|
||||||
|
@wraps(func)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
trigger_time = time.time()
|
||||||
|
try:
|
||||||
|
rv = func(*args, **kwargs)
|
||||||
|
except Return as e:
|
||||||
|
metric.Observe(time.time() - trigger_time, labelvalues=labelvalues)
|
||||||
|
raise e
|
||||||
|
return rv
|
||||||
|
return wrapper
|
||||||
|
return decorator
|
||||||
|
|
||||||
|
|
||||||
def time_decorator(name, metric_queue):
|
def time_decorator(name, metric_queue):
|
||||||
""" Decorates an endpoint method to have its request time logged to the metrics queue. """
|
""" Decorates an endpoint method to have its request time logged to the metrics queue. """
|
||||||
after = _time_after_request(name, metric_queue)
|
after = _time_after_request(name, metric_queue)
|
||||||
|
@ -135,4 +165,3 @@ def _time_after_request(name, metric_queue):
|
||||||
|
|
||||||
return r
|
return r
|
||||||
return f
|
return f
|
||||||
|
|
||||||
|
|
Reference in a new issue