Merge pull request #1737 from coreos-inc/fix-job-watch
Fix watch of the jobs key in the build manager
This commit is contained in:
commit
ad9ec4fee2
2 changed files with 64 additions and 36 deletions
|
@ -150,25 +150,29 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
self._watch_tasks[watch_task_key] = async(watch_future)
|
self._watch_tasks[watch_task_key] = async(watch_future)
|
||||||
|
|
||||||
@coroutine
|
@coroutine
|
||||||
def _handle_job_expiration_or_delete(self, etcd_result):
|
def _handle_job_change(self, etcd_result):
|
||||||
""" Handler invoked whenever a job expires or is deleted in etcd. """
|
""" Handler invoked whenever a job expires or is deleted in etcd. """
|
||||||
if etcd_result is None:
|
if etcd_result is None:
|
||||||
raise Return()
|
raise Return()
|
||||||
|
|
||||||
|
if etcd_result.action == EtcdAction.CREATE:
|
||||||
|
raise Return()
|
||||||
|
|
||||||
|
elif etcd_result.action in (EtcdAction.DELETE, EtcdAction.EXPIRE):
|
||||||
# Handle the expiration/deletion
|
# Handle the expiration/deletion
|
||||||
job_metadata = json.loads(etcd_result._prev_node.value)
|
job_metadata = json.loads(etcd_result._prev_node.value)
|
||||||
build_job = BuildJob(AttrDict(job_metadata['job_queue_item']))
|
build_job = BuildJob(AttrDict(job_metadata['job_queue_item']))
|
||||||
logger.debug('Job %s %s', etcd_result.action, build_job.build_uuid)
|
logger.debug('Got "%s" of job %s', etcd_result.action, build_job.build_uuid)
|
||||||
|
|
||||||
# Pop the build info.
|
# Pop the build info.
|
||||||
build_info = self._build_uuid_to_info.get(build_job.build_uuid, None)
|
build_info = self._build_uuid_to_info.get(build_job.build_uuid, None)
|
||||||
if build_info is None:
|
if build_info is None:
|
||||||
logger.debug('No build info for %s job %s (%s); was probably already deleted by this manager',
|
logger.debug('No build info for "%s" job %s (%s); probably already deleted by this manager',
|
||||||
etcd_result.action, build_job.build_uuid, job_metadata)
|
etcd_result.action, build_job.build_uuid, job_metadata)
|
||||||
raise Return()
|
raise Return()
|
||||||
|
|
||||||
# If the etcd action was not an expiration, then it was already deleted and the execution
|
# If the etcd action was not an expiration, then it was already deleted by some manager and
|
||||||
# shutdown.
|
# the execution was therefore already shutdown.
|
||||||
if etcd_result.action != EtcdAction.EXPIRE:
|
if etcd_result.action != EtcdAction.EXPIRE:
|
||||||
# Build information will no longer be needed; pop it off.
|
# Build information will no longer be needed; pop it off.
|
||||||
self._build_uuid_to_info.pop(build_job.build_uuid, None)
|
self._build_uuid_to_info.pop(build_job.build_uuid, None)
|
||||||
|
@ -188,6 +192,10 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
build_job.build_uuid, execution_id)
|
build_job.build_uuid, execution_id)
|
||||||
yield From(self.kill_builder_executor(build_job.build_uuid))
|
yield From(self.kill_builder_executor(build_job.build_uuid))
|
||||||
|
|
||||||
|
else:
|
||||||
|
logger.warning('Unexpected action (%s) on job key: %s', etcd_result.action, etcd_result.key)
|
||||||
|
|
||||||
|
|
||||||
@coroutine
|
@coroutine
|
||||||
def _handle_realm_change(self, etcd_result):
|
def _handle_realm_change(self, etcd_result):
|
||||||
if etcd_result is None:
|
if etcd_result is None:
|
||||||
|
@ -198,7 +206,7 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
realm_spec = json.loads(etcd_result.value)
|
realm_spec = json.loads(etcd_result.value)
|
||||||
self._register_realm(realm_spec)
|
self._register_realm(realm_spec)
|
||||||
|
|
||||||
elif etcd_result.action == EtcdAction.DELETE or etcd_result.action == EtcdAction.EXPIRE:
|
elif etcd_result.action in (EtcdAction.DELETE, EtcdAction.EXPIRE):
|
||||||
# We must stop listening for new connections on the specified realm, if we did not get the
|
# We must stop listening for new connections on the specified realm, if we did not get the
|
||||||
# connection
|
# connection
|
||||||
realm_spec = json.loads(etcd_result._prev_node.value)
|
realm_spec = json.loads(etcd_result._prev_node.value)
|
||||||
|
@ -326,7 +334,7 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
)
|
)
|
||||||
|
|
||||||
self._etcd_job_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/')
|
self._etcd_job_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/')
|
||||||
self._watch_etcd(self._etcd_job_prefix, self._handle_job_expiration_or_delete)
|
self._watch_etcd(self._etcd_job_prefix, self._handle_job_change)
|
||||||
|
|
||||||
self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/')
|
self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/')
|
||||||
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change,
|
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change,
|
||||||
|
|
|
@ -264,12 +264,32 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
||||||
'job_queue_item': self.mock_job.job_item,
|
'job_queue_item': self.mock_job.job_item,
|
||||||
})
|
})
|
||||||
|
|
||||||
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
|
yield From(self.manager._handle_job_change(expired_result))
|
||||||
|
|
||||||
# Ensure the job was removed from the info, but stop was not called.
|
# Ensure the job was removed from the info, but stop was not called.
|
||||||
self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
|
self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
|
||||||
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
|
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
|
||||||
|
|
||||||
|
@async_test
|
||||||
|
def test_job_started_by_other_manager(self):
|
||||||
|
# Test that we are watching before anything else happens
|
||||||
|
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
|
||||||
|
timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)
|
||||||
|
|
||||||
|
# Send a signal to the callback that the job has been created.
|
||||||
|
expired_result = Mock(spec=etcd.EtcdResult)
|
||||||
|
expired_result.action = EtcdAction.CREATE
|
||||||
|
expired_result.key = self.mock_job_key
|
||||||
|
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
|
||||||
|
expired_result._prev_node.value = json.dumps({
|
||||||
|
'had_heartbeat': False,
|
||||||
|
'job_queue_item': self.mock_job.job_item,
|
||||||
|
})
|
||||||
|
|
||||||
|
# Ensure the create does nothing.
|
||||||
|
yield From(self.manager._handle_job_change(expired_result))
|
||||||
|
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
|
||||||
|
|
||||||
@async_test
|
@async_test
|
||||||
def test_expiring_worker_not_started(self):
|
def test_expiring_worker_not_started(self):
|
||||||
# Test that we are watching before anything else happens
|
# Test that we are watching before anything else happens
|
||||||
|
@ -287,7 +307,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
||||||
})
|
})
|
||||||
|
|
||||||
# Since the realm was never registered, expiration should do nothing.
|
# Since the realm was never registered, expiration should do nothing.
|
||||||
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
|
yield From(self.manager._handle_job_change(expired_result))
|
||||||
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
|
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
|
||||||
|
|
||||||
@async_test
|
@async_test
|
||||||
|
@ -308,7 +328,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
||||||
'job_queue_item': self.mock_job.job_item,
|
'job_queue_item': self.mock_job.job_item,
|
||||||
})
|
})
|
||||||
|
|
||||||
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
|
yield From(self.manager._handle_job_change(expired_result))
|
||||||
|
|
||||||
self.test_executor.stop_builder.assert_called_once_with('123')
|
self.test_executor.stop_builder.assert_called_once_with('123')
|
||||||
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
|
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
|
||||||
|
@ -331,7 +351,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
||||||
'job_queue_item': self.mock_job.job_item,
|
'job_queue_item': self.mock_job.job_item,
|
||||||
})
|
})
|
||||||
|
|
||||||
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
|
yield From(self.manager._handle_job_change(expired_result))
|
||||||
|
|
||||||
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
|
self.assertEqual(self.test_executor.stop_builder.call_count, 0)
|
||||||
self.assertEqual(self.job_complete_callback.call_count, 0)
|
self.assertEqual(self.job_complete_callback.call_count, 0)
|
||||||
|
@ -355,7 +375,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
||||||
'job_queue_item': self.mock_job.job_item,
|
'job_queue_item': self.mock_job.job_item,
|
||||||
})
|
})
|
||||||
|
|
||||||
yield From(self.manager._handle_job_expiration_or_delete(expired_result))
|
yield From(self.manager._handle_job_change(expired_result))
|
||||||
|
|
||||||
self.test_executor.stop_builder.assert_called_once_with('123')
|
self.test_executor.stop_builder.assert_called_once_with('123')
|
||||||
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
|
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
|
||||||
|
@ -369,7 +389,7 @@ class TestEphemeralLifecycle(EphemeralBuilderTestCase):
|
||||||
set_result.action = 'set'
|
set_result.action = 'set'
|
||||||
set_result.key = self.mock_job_key
|
set_result.key = self.mock_job_key
|
||||||
|
|
||||||
self.manager._handle_job_expiration_or_delete(set_result)
|
self.manager._handle_job_change(set_result)
|
||||||
self.assertEquals(self.test_executor.stop_builder.call_count, 0)
|
self.assertEquals(self.test_executor.stop_builder.call_count, 0)
|
||||||
|
|
||||||
@async_test
|
@async_test
|
||||||
|
|
Reference in a new issue