import unittest
import etcd
import time
import json
import uuid
import os

from trollius import coroutine, get_event_loop, From, Future, Return
from mock import Mock, ANY, call

from buildman.manager.executor import BuilderExecutor, ExecutorException
from buildman.manager.ephemeral import (EphemeralBuilderManager, EtcdAction,
                                        ETCD_MAX_WATCH_TIMEOUT)
from buildman.component.buildcomponent import BuildComponent
from buildman.server import BuildJobResult
from buildman.asyncutil import AsyncWrapper
from util.metrics.metricqueue import duration_collector_async
from app import metric_queue

BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
REALM_ID = '1234-realm'


def async_test(f):
  def wrapper(*args, **kwargs):
    coro = coroutine(f)
    future = coro(*args, **kwargs)
    loop = get_event_loop()
    loop.run_until_complete(future)
  return wrapper


class TestExecutor(BuilderExecutor):
  job_started = None
  job_stopped = None

  @coroutine
  @duration_collector_async(metric_queue.builder_time_to_start, labelvalues=["testlabel"])
  def start_builder(self, realm, token, build_uuid):
    self.job_started = str(uuid.uuid4())
    raise Return(self.job_started)

  @coroutine
  def stop_builder(self, execution_id):
    self.job_stopped = execution_id



class BadExecutor(BuilderExecutor):
  @coroutine
  @duration_collector_async(metric_queue.builder_time_to_start, labelvalues=["testlabel"])
  def start_builder(self, realm, token, build_uuid):
    raise ExecutorException('raised on purpose!')


class EphemeralBuilderTestCase(unittest.TestCase):
  def __init__(self, *args, **kwargs):
    self.etcd_client_mock = None
    super(EphemeralBuilderTestCase, self).__init__(*args, **kwargs)

  def _create_mock_etcd_client(self, *args, **kwargs):
    def create_future(*args, **kwargs):
      return Future()

    self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client')
    self.etcd_client_mock.read = Mock(side_effect=KeyError)
    self.etcd_client_mock.delete = Mock(side_effect=self._create_completed_future())
    self.etcd_client_mock.watch = Mock(side_effect=create_future)
    self.etcd_client_mock.write = Mock(side_effect=self._create_completed_future('some_exec_id'))

    return (self.etcd_client_mock, None)

  def _create_completed_future(self, result=None):
    def inner(*args, **kwargs):
      new_future = Future()
      new_future.set_result(result)
      return new_future
    return inner

  def setUp(self):
    self._existing_executors = dict(EphemeralBuilderManager.EXECUTORS)

  def tearDown(self):
    EphemeralBuilderManager.EXECUTORS = self._existing_executors

  @coroutine
  def _register_component(self, realm_spec, build_component, token):
    raise Return('hello')

  def _create_build_job(self, namespace='namespace', retries=3):
    mock_job = Mock()
    mock_job.job_details = {
        'build_uuid': BUILD_UUID,
    }
    mock_job.job_item = {
        'body': json.dumps(mock_job.job_details),
        'id': 1,
    }

    mock_job.namespace = namespace
    mock_job.retries_remaining = retries
    mock_job.build_uuid = BUILD_UUID
    return mock_job



class TestEphemeralLifecycle(EphemeralBuilderTestCase):
  """ Tests the various lifecycles of the ephemeral builder and its interaction with etcd. """

  def __init__(self, *args, **kwargs):
    super(TestEphemeralLifecycle, self).__init__(*args, **kwargs)
    self.etcd_client_mock = None
    self.test_executor = None

  def _create_completed_future(self, result=None):
    def inner(*args, **kwargs):
      new_future = Future()
      new_future.set_result(result)
      return new_future
    return inner

  def _create_mock_executor(self, *args, **kwargs):
    self.test_executor = Mock(spec=BuilderExecutor)
    self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123'))
    self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future())
    self.test_executor.name = 'MockExecutor'
    self.test_executor.minimum_retry_threshold = 0
    return self.test_executor

  def setUp(self):
    super(TestEphemeralLifecycle, self).setUp()

    EphemeralBuilderManager.EXECUTORS['test'] = self._create_mock_executor

    self.register_component_callback = Mock()
    self.unregister_component_callback = Mock()
    self.job_heartbeat_callback = Mock()
    self.job_complete_callback = AsyncWrapper(Mock())

    self.manager = EphemeralBuilderManager(
      self.register_component_callback,
      self.unregister_component_callback,
      self.job_heartbeat_callback,
      self.job_complete_callback,
      '127.0.0.1',
      30,
      etcd_creator=self._create_mock_etcd_client,
    )

    self.manager.initialize({'EXECUTOR': 'test'})

    # Test that we are watching the realm and jobs key once initialized.
    self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, index=None,
                                                timeout=ETCD_MAX_WATCH_TIMEOUT)

    self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, index=None,
                                                timeout=ETCD_MAX_WATCH_TIMEOUT)


    self.mock_job = self._create_build_job()
    self.mock_job_key = os.path.join('building/', BUILD_UUID)

  def tearDown(self):
    super(TestEphemeralLifecycle, self).tearDown()
    self.manager.shutdown()


  @coroutine
  def _setup_job_for_managers(self):
    self.etcd_client_mock.read = Mock(side_effect=KeyError)
    test_component = Mock(spec=BuildComponent)
    test_component.builder_realm = REALM_ID
    test_component.start_build = Mock(side_effect=self._create_completed_future())
    self.register_component_callback.return_value = test_component

    # Ask for a builder to be scheduled
    self.etcd_client_mock.write.reset()

    is_scheduled = yield From(self.manager.schedule(self.mock_job))
    self.assertTrue(is_scheduled)
    self.assertEqual(self.test_executor.start_builder.call_count, 1)

    # Ensure the job and realm and metric were added to etcd.
    self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
    self.assertTrue(self.etcd_client_mock.write.call_args_list[1][0][0].find('metric/') == 0)
    self.assertTrue(self.etcd_client_mock.write.call_args_list[2][0][0].find('realm/') == 0)
    realm_data = json.loads(self.etcd_client_mock.write.call_args_list[2][0][1])
    realm_data['realm'] = REALM_ID

    # Right now the job is not registered with any managers because etcd has not accepted the job
    self.assertEqual(self.register_component_callback.call_count, 0)

    # Fire off a realm changed with the same data.
    realm_created = Mock(spec=etcd.EtcdResult)
    realm_created.action = EtcdAction.CREATE
    realm_created.key = os.path.join('realm/', REALM_ID)
    realm_created.value = json.dumps(realm_data)

    yield From(self.manager._handle_realm_change(realm_created))
    self.assertEqual(self.register_component_callback.call_count, 1)

    # Ensure that we have at least one component node.
    self.assertEquals(1, self.manager.num_workers())

    # Ensure that the build info exists.
    self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID))

    raise Return(test_component)

  @async_test
  def test_schedule_and_complete(self):
    # Test that a job is properly registered with all of the managers
    test_component = yield From(self._setup_job_for_managers())

    # Take the job ourselves
    yield From(self.manager.build_component_ready(test_component))
    read_calls = [call('building/', recursive=True), call(os.path.join('metric/', REALM_ID))]
    self.etcd_client_mock.read.assert_has_calls(read_calls)

    delete_calls = [call('building/', recursive=True), call(os.path.join('metric/', REALM_ID))]
    self.etcd_client_mock.read.assert_has_calls(delete_calls)
    self.etcd_client_mock.delete.reset_mock()

    self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID))

    # Finish the job
    yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component))

    # Ensure that the executor kills the job.
    self.assertEqual(self.test_executor.stop_builder.call_count, 1)
    self.etcd_client_mock.delete.assert_has_calls([call(self.mock_job_key)])

    # Ensure the build information is cleaned up.
    self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
    self.assertEquals(0, self.manager.num_workers())

  @async_test
  def test_another_manager_takes_job(self):
    # Prepare a job to be taken by another manager
    test_component = yield From(self._setup_job_for_managers())

    realm_deleted = Mock(spec=etcd.EtcdResult)
    realm_deleted.action = EtcdAction.DELETE
    realm_deleted.key = os.path.join('realm/', REALM_ID)

    realm_deleted._prev_node = Mock(spec=etcd.EtcdResult)
    realm_deleted._prev_node.value = json.dumps({
        'realm': REALM_ID,
        'token': 'beef',
        'execution_id': '123',
        'job_queue_item': self.mock_job.job_item,
    })

    yield From(self.manager._handle_realm_change(realm_deleted))

    self.unregister_component_callback.assert_called_once_with(test_component)

    # Ensure that the executor does not kill the job.
    self.assertEqual(self.test_executor.stop_builder.call_count, 0)

    # Ensure that we still have the build info, but not the component.
    self.assertEquals(0, self.manager.num_workers())
    self.assertIsNotNone(self.manager._build_uuid_to_info.get(BUILD_UUID))

    # Delete the job once it has "completed".
    expired_result = Mock(spec=etcd.EtcdResult)
    expired_result.action = EtcdAction.DELETE
    expired_result.key = self.mock_job_key
    expired_result._prev_node = Mock(spec=etcd.EtcdResult)
    expired_result._prev_node.value = json.dumps({
      'had_heartbeat': False,
      'job_queue_item': self.mock_job.job_item,
    })

    yield From(self.manager._handle_job_change(expired_result))

    # Ensure the job was removed from the info, but stop was not called.
    self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID))
    self.assertEqual(self.test_executor.stop_builder.call_count, 0)

  @async_test
  def test_job_started_by_other_manager(self):
    # Test that we are watching before anything else happens
    self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
                                                timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)

    # Send a signal to the callback that the job has been created.
    expired_result = Mock(spec=etcd.EtcdResult)
    expired_result.action = EtcdAction.CREATE
    expired_result.key = self.mock_job_key
    expired_result._prev_node = Mock(spec=etcd.EtcdResult)
    expired_result._prev_node.value = json.dumps({
      'had_heartbeat': False,
      'job_queue_item': self.mock_job.job_item,
    })

    # Ensure the create does nothing.
    yield From(self.manager._handle_job_change(expired_result))
    self.assertEqual(self.test_executor.stop_builder.call_count, 0)

  @async_test
  def test_expiring_worker_not_started(self):
    # Test that we are watching before anything else happens
    self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
                                                timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)

    # Send a signal to the callback that a worker has expired
    expired_result = Mock(spec=etcd.EtcdResult)
    expired_result.action = EtcdAction.EXPIRE
    expired_result.key = self.mock_job_key
    expired_result._prev_node = Mock(spec=etcd.EtcdResult)
    expired_result._prev_node.value = json.dumps({
      'had_heartbeat': True,
      'job_queue_item': self.mock_job.job_item,
    })

    # Since the realm was never registered, expiration should do nothing.
    yield From(self.manager._handle_job_change(expired_result))
    self.assertEqual(self.test_executor.stop_builder.call_count, 0)

  @async_test
  def test_expiring_worker_started(self):
    test_component = yield From(self._setup_job_for_managers())

    # Test that we are watching before anything else happens
    self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
                                                timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)

    # Send a signal to the callback that a worker has expired
    expired_result = Mock(spec=etcd.EtcdResult)
    expired_result.action = EtcdAction.EXPIRE
    expired_result.key = self.mock_job_key
    expired_result._prev_node = Mock(spec=etcd.EtcdResult)
    expired_result._prev_node.value = json.dumps({
      'had_heartbeat': True,
      'job_queue_item': self.mock_job.job_item,
    })

    yield From(self.manager._handle_job_change(expired_result))

    self.test_executor.stop_builder.assert_called_once_with('123')
    self.assertEqual(self.test_executor.stop_builder.call_count, 1)

  @async_test
  def test_buildjob_deleted(self):
    test_component = yield From(self._setup_job_for_managers())

    # Test that we are watching before anything else happens
    self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
                                                timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)

    # Send a signal to the callback that a worker has expired
    expired_result = Mock(spec=etcd.EtcdResult)
    expired_result.action = EtcdAction.DELETE
    expired_result.key = self.mock_job_key
    expired_result._prev_node = Mock(spec=etcd.EtcdResult)
    expired_result._prev_node.value = json.dumps({
      'had_heartbeat': False,
      'job_queue_item': self.mock_job.job_item,
    })

    yield From(self.manager._handle_job_change(expired_result))

    self.assertEqual(self.test_executor.stop_builder.call_count, 0)
    self.assertEqual(self.job_complete_callback.call_count, 0)
    self.assertIsNone(self.manager._build_uuid_to_info.get(BUILD_UUID))

  @async_test
  def test_builder_never_starts(self):
    test_component = yield From(self._setup_job_for_managers())

    # Test that we are watching before anything else happens
    self.etcd_client_mock.watch.assert_any_call('building/', recursive=True,
                                                timeout=ETCD_MAX_WATCH_TIMEOUT, index=None)

    # Send a signal to the callback that a worker has expired
    expired_result = Mock(spec=etcd.EtcdResult)
    expired_result.action = EtcdAction.EXPIRE
    expired_result.key = self.mock_job_key
    expired_result._prev_node = Mock(spec=etcd.EtcdResult)
    expired_result._prev_node.value = json.dumps({
      'had_heartbeat': False,
      'job_queue_item': self.mock_job.job_item,
    })

    yield From(self.manager._handle_job_change(expired_result))

    self.test_executor.stop_builder.assert_called_once_with('123')
    self.assertEqual(self.test_executor.stop_builder.call_count, 1)

    # Ensure the job was marked as incomplete, with an update_phase to True (so the DB record and
    # logs are updated as well)
    yield From(self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE,
                                                                  'MockExecutor',
                                                                  update_phase=True))

  @async_test
  def test_change_worker(self):
    # Send a signal to the callback that a worker key has been changed
    set_result = Mock(sepc=etcd.EtcdResult)
    set_result.action = 'set'
    set_result.key = self.mock_job_key

    self.manager._handle_job_change(set_result)
    self.assertEquals(self.test_executor.stop_builder.call_count, 0)

  @async_test
  def test_realm_expired(self):
    test_component = yield From(self._setup_job_for_managers())

    # Send a signal to the callback that a realm has expired
    expired_result = Mock(spec=etcd.EtcdResult)
    expired_result.action = EtcdAction.EXPIRE
    expired_result.key = self.mock_job_key
    expired_result._prev_node = Mock(spec=etcd.EtcdResult)
    expired_result._prev_node.value = json.dumps({
      'realm': REALM_ID,
      'execution_id': 'foobar',
      'executor_name': 'MockExecutor',
      'job_queue_item': {'body': '{"build_uuid": "fakeid"}'},
    })

    yield From(self.manager._handle_realm_change(expired_result))

    # Ensure that the cleanup code for the executor was called.
    self.test_executor.stop_builder.assert_called_once_with('foobar')
    self.assertEqual(self.test_executor.stop_builder.call_count, 1)


  @async_test
  def test_heartbeat_response(self):
    yield From(self.assertHeartbeatWithExpiration(100, self.manager.heartbeat_period_sec * 2))

  @async_test
  def test_heartbeat_future_expiration(self):
    yield From(self.assertHeartbeatWithExpiration(10, 10, ranged=True))

  @async_test
  def test_heartbeat_expired(self):
    yield From(self.assertHeartbeatWithExpiration(-60, 0))

  @coroutine
  def assertHeartbeatWithExpiration(self, expires_in_sec, expected_ttl, ranged=False):
    expiration_timestamp = time.time() + expires_in_sec
    builder_result = Mock(spec=etcd.EtcdResult)
    builder_result.value = json.dumps({
        'expiration': expiration_timestamp,
        'max_expiration': expiration_timestamp,
    })
    self.etcd_client_mock.read = Mock(side_effect=self._create_completed_future(builder_result))

    yield From(self.manager.job_heartbeat(self.mock_job))

    self.job_heartbeat_callback.assert_called_once_with(self.mock_job)
    self.assertEqual(self.etcd_client_mock.write.call_count, 1)
    self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)

    job_key_data = json.loads(self.etcd_client_mock.write.call_args_list[0][0][1])
    self.assertTrue(job_key_data['had_heartbeat'])
    self.assertEquals(self.mock_job.job_item, job_key_data['job_queue_item'])

    if not ranged:
      self.assertEquals(expected_ttl, self.etcd_client_mock.write.call_args_list[0][1]['ttl'])
    else:
      self.assertTrue(self.etcd_client_mock.write.call_args_list[0][1]['ttl'] <= expected_ttl)


class TestEphemeral(EphemeralBuilderTestCase):
  """ Simple unit tests for the ephemeral builder around config management, starting and stopping
      jobs.
  """

  def setUp(self):
    super(TestEphemeral, self).setUp()

    unregister_component_callback = Mock()
    job_heartbeat_callback = Mock()

    @coroutine
    def job_complete_callback(*args, **kwargs):
      raise Return()

    self.manager = EphemeralBuilderManager(
      self._register_component,
      unregister_component_callback,
      job_heartbeat_callback,
      job_complete_callback,
      '127.0.0.1',
      30,
      etcd_creator=self._create_mock_etcd_client,
    )

  def tearDown(self):
    super(TestEphemeral, self).tearDown()
    self.manager.shutdown()

  def test_verify_executor_oldconfig(self):
    EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
    self.manager.initialize({
      'EXECUTOR': 'test',
      'EXECUTOR_CONFIG': dict(MINIMUM_RETRY_THRESHOLD=42)
    })

    # Ensure that we have a single test executor.
    self.assertEquals(1, len(self.manager.registered_executors))
    self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
    self.assertEquals('TestExecutor', self.manager.registered_executors[0].name)

  def test_verify_executor_newconfig(self):
    EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
    self.manager.initialize({
      'EXECUTORS': [{
        'EXECUTOR': 'test',
        'MINIMUM_RETRY_THRESHOLD': 42
      }]
    })

    # Ensure that we have a single test executor.
    self.assertEquals(1, len(self.manager.registered_executors))
    self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)


  def test_multiple_executors_samename(self):
    EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
    EphemeralBuilderManager.EXECUTORS['anotherexecutor'] = TestExecutor

    with self.assertRaises(Exception):
      self.manager.initialize({
        'EXECUTORS': [
          {
            'NAME': 'primary',
            'EXECUTOR': 'test',
            'MINIMUM_RETRY_THRESHOLD': 42
          },
          {
            'NAME': 'primary',
            'EXECUTOR': 'anotherexecutor',
            'MINIMUM_RETRY_THRESHOLD': 24
          },
        ]
      })


  def test_verify_multiple_executors(self):
    EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
    EphemeralBuilderManager.EXECUTORS['anotherexecutor'] = TestExecutor

    self.manager.initialize({
      'EXECUTORS': [
        {
          'NAME': 'primary',
          'EXECUTOR': 'test',
          'MINIMUM_RETRY_THRESHOLD': 42
        },
        {
          'NAME': 'secondary',
          'EXECUTOR': 'anotherexecutor',
          'MINIMUM_RETRY_THRESHOLD': 24
        },
      ]
    })

    # Ensure that we have a two test executors.
    self.assertEquals(2, len(self.manager.registered_executors))
    self.assertEquals(42, self.manager.registered_executors[0].minimum_retry_threshold)
    self.assertEquals(24, self.manager.registered_executors[1].minimum_retry_threshold)

  def test_skip_invalid_executor(self):
    self.manager.initialize({
      'EXECUTORS': [
        {
          'EXECUTOR': 'unknown',
          'MINIMUM_RETRY_THRESHOLD': 42
        },
      ]
    })

    self.assertEquals(0, len(self.manager.registered_executors))

  @async_test
  def test_schedule_job_namespace_filter(self):
    EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
    self.manager.initialize({
      'EXECUTORS': [{
        'EXECUTOR': 'test',
        'NAMESPACE_WHITELIST': ['something'],
      }]
    })

    # Try with a build job in an invalid namespace.
    build_job = self._create_build_job(namespace='somethingelse')
    result = yield From(self.manager.schedule(build_job))
    self.assertFalse(result[0])

    # Try with a valid namespace.
    build_job = self._create_build_job(namespace='something')
    result = yield From(self.manager.schedule(build_job))
    self.assertTrue(result[0])

  @async_test
  def test_schedule_job_retries_filter(self):
    EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor
    self.manager.initialize({
      'EXECUTORS': [{
        'EXECUTOR': 'test',
        'MINIMUM_RETRY_THRESHOLD': 2,
      }]
    })

    # Try with a build job that has too few retries.
    build_job = self._create_build_job(retries=1)
    result = yield From(self.manager.schedule(build_job))
    self.assertFalse(result[0])

    # Try with a valid job.
    build_job = self._create_build_job(retries=2)
    result = yield From(self.manager.schedule(build_job))
    self.assertTrue(result[0])


  @async_test
  def test_schedule_job_executor_fallback(self):
    EphemeralBuilderManager.EXECUTORS['primary'] = TestExecutor
    EphemeralBuilderManager.EXECUTORS['secondary'] = TestExecutor

    self.manager.initialize({
      'EXECUTORS': [
        {
          'NAME': 'primary',
          'EXECUTOR': 'primary',
          'NAMESPACE_WHITELIST': ['something'],
          'MINIMUM_RETRY_THRESHOLD': 3,
        },
        {
          'NAME': 'secondary',
          'EXECUTOR': 'secondary',
          'MINIMUM_RETRY_THRESHOLD': 2,
        },
      ]
    })

    # Try a job not matching the primary's namespace filter. Should schedule on secondary.
    build_job = self._create_build_job(namespace='somethingelse')
    result = yield From(self.manager.schedule(build_job))
    self.assertTrue(result[0])

    self.assertIsNone(self.manager.registered_executors[0].job_started)
    self.assertIsNotNone(self.manager.registered_executors[1].job_started)

    self.manager.registered_executors[0].job_started = None
    self.manager.registered_executors[1].job_started = None

    # Try a job not matching the primary's retry minimum. Should schedule on secondary.
    build_job = self._create_build_job(namespace='something', retries=2)
    result = yield From(self.manager.schedule(build_job))
    self.assertTrue(result[0])

    self.assertIsNone(self.manager.registered_executors[0].job_started)
    self.assertIsNotNone(self.manager.registered_executors[1].job_started)

    self.manager.registered_executors[0].job_started = None
    self.manager.registered_executors[1].job_started = None

    # Try a job matching the primary. Should schedule on the primary.
    build_job = self._create_build_job(namespace='something', retries=3)
    result = yield From(self.manager.schedule(build_job))
    self.assertTrue(result[0])

    self.assertIsNotNone(self.manager.registered_executors[0].job_started)
    self.assertIsNone(self.manager.registered_executors[1].job_started)

    self.manager.registered_executors[0].job_started = None
    self.manager.registered_executors[1].job_started = None

    # Try a job not matching either's restrictions.
    build_job = self._create_build_job(namespace='somethingelse', retries=1)
    result = yield From(self.manager.schedule(build_job))
    self.assertFalse(result[0])

    self.assertIsNone(self.manager.registered_executors[0].job_started)
    self.assertIsNone(self.manager.registered_executors[1].job_started)

    self.manager.registered_executors[0].job_started = None
    self.manager.registered_executors[1].job_started = None


  @async_test
  def test_schedule_job_single_executor(self):
    EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor

    self.manager.initialize({
      'EXECUTOR': 'test',
      'EXECUTOR_CONFIG': {},
    })

    build_job = self._create_build_job(namespace='something', retries=3)
    result = yield From(self.manager.schedule(build_job))
    self.assertTrue(result[0])

    self.assertIsNotNone(self.manager.registered_executors[0].job_started)
    self.manager.registered_executors[0].job_started = None


    build_job = self._create_build_job(namespace='something', retries=0)
    result = yield From(self.manager.schedule(build_job))
    self.assertTrue(result[0])

    self.assertIsNotNone(self.manager.registered_executors[0].job_started)
    self.manager.registered_executors[0].job_started = None


  @async_test
  def test_executor_exception(self):
    EphemeralBuilderManager.EXECUTORS['bad'] = BadExecutor

    self.manager.initialize({
      'EXECUTOR': 'bad',
      'EXECUTOR_CONFIG': {},
    })

    build_job = self._create_build_job(namespace='something', retries=3)
    result = yield From(self.manager.schedule(build_job))
    self.assertFalse(result[0])


  @async_test
  def test_schedule_and_stop(self):
    EphemeralBuilderManager.EXECUTORS['test'] = TestExecutor

    self.manager.initialize({
      'EXECUTOR': 'test',
      'EXECUTOR_CONFIG': {},
    })

    # Start the build job.
    build_job = self._create_build_job(namespace='something', retries=3)
    result = yield From(self.manager.schedule(build_job))
    self.assertTrue(result[0])

    executor = self.manager.registered_executors[0]
    self.assertIsNotNone(executor.job_started)

    # Register the realm so the build information is added.
    yield From(self.manager._register_realm({
      'realm': str(uuid.uuid4()),
      'token': str(uuid.uuid4()),
      'execution_id': executor.job_started,
      'executor_name': 'TestExecutor',
      'build_uuid': build_job.build_uuid,
      'job_queue_item': build_job.job_item,
    }))

    # Stop the build job.
    yield From(self.manager.kill_builder_executor(build_job.build_uuid))
    self.assertEquals(executor.job_stopped, executor.job_started)


if __name__ == '__main__':
  unittest.main()