Track whether builders ever came online in etcd. Mark builds which never successfully heartbeated as incomplete.

This commit is contained in:
Jake Moshenko 2015-05-20 11:32:37 -04:00
parent 79f1181a63
commit f767fc4d03
2 changed files with 58 additions and 6 deletions

View file

@ -15,6 +15,7 @@ from buildman.manager.executor import PopenExecutor, EC2Executor
from buildman.component.buildcomponent import BuildComponent from buildman.component.buildcomponent import BuildComponent
from buildman.jobutil.buildjob import BuildJob from buildman.jobutil.buildjob import BuildJob
from buildman.asyncutil import AsyncWrapper from buildman.asyncutil import AsyncWrapper
from buildman.server import BuildJobResult
from util.morecollections import AttrDict from util.morecollections import AttrDict
@ -91,6 +92,7 @@ class EphemeralBuilderManager(BaseManager):
logger.debug('Scheduling watch of key: %s%s', etcd_key, '/*' if recursive else '') logger.debug('Scheduling watch of key: %s%s', etcd_key, '/*' if recursive else '')
self._watch_tasks[watch_task_key] = async(watch_future) self._watch_tasks[watch_task_key] = async(watch_future)
@coroutine
def _handle_builder_expiration(self, etcd_result): def _handle_builder_expiration(self, etcd_result):
if etcd_result is None: if etcd_result is None:
return return
@ -101,8 +103,23 @@ class EphemeralBuilderManager(BaseManager):
job_metadata = json.loads(etcd_result._prev_node.value) job_metadata = json.loads(etcd_result._prev_node.value)
if 'builder_id' in job_metadata: if 'builder_id' in job_metadata:
logger.info('Terminating expired build node.') builder_id = job_metadata['builder_id']
async(self._executor.stop_builder(job_metadata['builder_id']))
try:
lock_key = self._etcd_lock_key(builder_id)
yield From(self._etcd_client.write(lock_key, '', prevExist=False, ttl=self.setup_time()))
except (KeyError, etcd.EtcdKeyError):
logger.debug('Somebody else is cleaning up the build node: %s', builder_id)
return
if not job_metadata.get('had_heartbeat', True):
logger.warning('Build node failed to successfully boot: %s', builder_id)
build_job = BuildJob(AttrDict(job_metadata['job_queue_item']))
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
logger.info('Terminating expired build node: %s', builder_id)
yield From(self._executor.stop_builder(builder_id))
def _handle_realm_change(self, etcd_result): def _handle_realm_change(self, etcd_result):
if etcd_result is None: if etcd_result is None:
@ -182,6 +199,8 @@ class EphemeralBuilderManager(BaseManager):
self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/') self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/')
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change) self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change)
self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/')
# Load components for all realms currently known to the cluster # Load components for all realms currently known to the cluster
async(self._register_existing_realms()) async(self._register_existing_realms())
@ -241,6 +260,8 @@ class EphemeralBuilderManager(BaseManager):
'expiration': calendar.timegm(expiration.timetuple()), 'expiration': calendar.timegm(expiration.timetuple()),
'max_expiration': calendar.timegm(max_expiration.timetuple()), 'max_expiration': calendar.timegm(max_expiration.timetuple()),
'nonce': nonce, 'nonce': nonce,
'had_heartbeat': False,
'job_queue_item': build_job.job_item,
} }
lock_payload = json.dumps(payload) lock_payload = json.dumps(payload)
@ -342,7 +363,9 @@ class EphemeralBuilderManager(BaseManager):
payload = { payload = {
'expiration': calendar.timegm(new_expiration.timetuple()), 'expiration': calendar.timegm(new_expiration.timetuple()),
'builder_id': build_job_metadata['builder_id'], 'builder_id': build_job_metadata['builder_id'],
'job_queue_item': build_job.job_item,
'max_expiration': build_job_metadata['max_expiration'], 'max_expiration': build_job_metadata['max_expiration'],
'had_heartbeat': True,
} }
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl)) yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl))
@ -354,6 +377,11 @@ class EphemeralBuilderManager(BaseManager):
""" """
return os.path.join(self._etcd_builder_prefix, build_job.job_details['build_uuid']) return os.path.join(self._etcd_builder_prefix, build_job.job_details['build_uuid'])
def _etcd_lock_key(self, unique_lock_id):
""" Create a key which is used to create a temporary lock in etcd.
"""
return os.path.join(self._etcd_lock_prefix, unique_lock_id)
def _etcd_realm_key(self, realm): def _etcd_realm_key(self, realm):
""" Create a key which is used to track an incoming connection on a realm. """ Create a key which is used to track an incoming connection on a realm.
""" """

View file

@ -5,7 +5,7 @@ import time
import json import json
from trollius import coroutine, get_event_loop, From, Future, sleep, Return from trollius import coroutine, get_event_loop, From, Future, sleep, Return
from mock import Mock from mock import Mock, ANY
from threading import Event from threading import Event
from urllib3.exceptions import ReadTimeoutError from urllib3.exceptions import ReadTimeoutError
@ -191,13 +191,36 @@ class TestEphemeral(unittest.TestCase):
expired_result._prev_node = Mock(spec=etcd.EtcdResult) expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({'builder_id': '1234'}) expired_result._prev_node.value = json.dumps({'builder_id': '1234'})
self.manager._handle_builder_expiration(expired_result) yield From(self.manager._handle_builder_expiration(expired_result))
yield From(sleep(.01))
self.test_executor.stop_builder.assert_called_once_with('1234') self.test_executor.stop_builder.assert_called_once_with('1234')
self.assertEqual(self.test_executor.stop_builder.call_count, 1) self.assertEqual(self.test_executor.stop_builder.call_count, 1)
@async_test
def test_builder_never_starts(self):
test_component = yield From(self._setup_job_for_managers())
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'builder_id': '1234',
'had_heartbeat': False,
'job_queue_item': self.mock_job.job_item,
})
yield From(self.manager._handle_builder_expiration(expired_result))
self.test_executor.stop_builder.assert_called_once_with('1234')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE)
@async_test @async_test
def test_change_worker(self): def test_change_worker(self):
# Send a signal to the callback that a worker key has been changed # Send a signal to the callback that a worker key has been changed
@ -233,3 +256,4 @@ class TestEphemeral(unittest.TestCase):
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()