import unittest
import etcd
import os.path
import time
import json

from trollius import coroutine, get_event_loop, From, Future, sleep, Return
from mock import Mock, ANY
from threading import Event
from urllib3.exceptions import ReadTimeoutError

from buildman.manager.executor import BuilderExecutor
from buildman.manager.ephemeral import EphemeralBuilderManager, EtcdAction
from buildman.server import BuildJobResult
from buildman.component.buildcomponent import BuildComponent


BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
REALM_ID = '1234-realm'


def async_test(f):
  def wrapper(*args, **kwargs):
    coro = coroutine(f)
    future = coro(*args, **kwargs)
    loop = get_event_loop()
    loop.run_until_complete(future)
  return wrapper

class TestEphemeral(unittest.TestCase):
  def __init__(self, *args, **kwargs):
    self.etcd_client_mock = None
    self.etcd_wait_event = Event()
    self.test_executor = None
    super(TestEphemeral, self).__init__(*args, **kwargs)

  def _create_mock_etcd_client(self, *args, **kwargs):
    def hang_until_event(*args, **kwargs):
      time.sleep(.01)  # 10ms to simulate network latency
      self.etcd_wait_event.wait()

    self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client')
    self.etcd_client_mock.watch = Mock(side_effect=hang_until_event)
    return self.etcd_client_mock

  def _create_completed_future(self, result=None):
    def inner(*args, **kwargs):
      new_future = Future()
      new_future.set_result(result)
      return new_future
    return inner

  def _create_mock_executor(self, *args, **kwargs):
    self.test_executor = Mock(spec=BuilderExecutor)
    self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123'))
    self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future())
    return self.test_executor

  def _create_build_job(self):
    mock_job = Mock()
    mock_job.job_details = {
        'build_uuid': BUILD_UUID,
    }
    mock_job.job_item = {
        'body': json.dumps(mock_job.job_details),
        'id': 1,
    }
    return mock_job

  def setUp(self):
    EphemeralBuilderManager._executors['test'] = self._create_mock_executor

    self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass
    EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client
    self.etcd_wait_event.clear()

    self.register_component_callback = Mock()
    self.unregister_component_callback = Mock()
    self.job_heartbeat_callback = Mock()
    self.job_complete_callback = Mock()

    self.manager = EphemeralBuilderManager(
        self.register_component_callback,
        self.unregister_component_callback,
        self.job_heartbeat_callback,
        self.job_complete_callback,
        '127.0.0.1',
        30,
    )

    self.manager.initialize({'EXECUTOR': 'test'})

    self.mock_job = self._create_build_job()
    self.mock_job_key = os.path.join('building/', BUILD_UUID)

  def tearDown(self):
    self.etcd_wait_event.set()

    self.manager.shutdown()

    del EphemeralBuilderManager._executors['test']
    EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass

  @coroutine
  def _setup_job_for_managers(self):
    # Test that we are watching the realm location before anything else happens
    self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, timeout=30, index=None)

    self.etcd_client_mock.read = Mock(side_effect=KeyError)
    test_component = Mock(spec=BuildComponent)
    test_component.builder_realm = REALM_ID
    test_component.start_build = Mock(side_effect=self._create_completed_future())
    self.register_component_callback.return_value = test_component

    # Ask for a builder to be scheduled
    is_scheduled = yield From(self.manager.schedule(self.mock_job))

    self.assertTrue(is_scheduled)

    self.etcd_client_mock.read.assert_called_once_with('building/', recursive=True)
    self.assertEqual(self.test_executor.start_builder.call_count, 1)
    self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
    self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], self.mock_job_key)

    # Right now the job is not registered with any managers because etcd has not accepted the job
    self.assertEqual(self.register_component_callback.call_count, 0)

    realm_created = Mock(spec=etcd.EtcdResult)
    realm_created.action = EtcdAction.CREATE
    realm_created.key = os.path.join('realm/', REALM_ID)
    realm_created.value = json.dumps({
        'realm': REALM_ID,
        'token': 'beef',
        'builder_id': '123',
        'job_queue_item': self.mock_job.job_item,
    })

    self.manager._handle_realm_change(realm_created)

    self.assertEqual(self.register_component_callback.call_count, 1)

    raise Return(test_component)

  @async_test
  def test_schedule_and_complete(self):
    # Test that a job is properly registered with all of the managers
    test_component = yield From(self._setup_job_for_managers())

    # Take the job ourselves
    yield From(self.manager.build_component_ready(test_component))

    self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID))
    self.etcd_client_mock.delete.reset_mock()

    # Finish the job
    yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component))

    self.assertEqual(self.test_executor.stop_builder.call_count, 1)
    self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)

  @async_test
  @unittest.skip('this test is flaky on Quay.io builders')
  def test_another_manager_takes_job(self):
    # Prepare a job to be taken by another manager
    test_component = yield From(self._setup_job_for_managers())

    realm_deleted = Mock(spec=etcd.EtcdResult)
    realm_deleted.action = EtcdAction.DELETE
    realm_deleted.key = os.path.join('realm/', REALM_ID)

    realm_deleted._prev_node = Mock(spec=etcd.EtcdResult)
    realm_deleted._prev_node.value = json.dumps({
        'realm': REALM_ID,
        'token': 'beef',
        'builder_id': '123',
        'job_queue_item': self.mock_job.job_item,
    })

    self.manager._handle_realm_change(realm_deleted)

    self.unregister_component_callback.assert_called_once_with(test_component)

  @async_test
  def test_expiring_worker(self):
    # Test that we are watching before anything else happens
    self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=30, index=None)

    # Send a signal to the callback that a worker has expired
    expired_result = Mock(spec=etcd.EtcdResult)
    expired_result.action = EtcdAction.EXPIRE
    expired_result.key = self.mock_job_key
    expired_result._prev_node = Mock(spec=etcd.EtcdResult)
    expired_result._prev_node.value = json.dumps({'builder_id': '1234'})

    yield From(self.manager._handle_builder_expiration(expired_result))

    self.test_executor.stop_builder.assert_called_once_with('1234')
    self.assertEqual(self.test_executor.stop_builder.call_count, 1)

  @async_test
  def test_builder_never_starts(self):
    test_component = yield From(self._setup_job_for_managers())

    # Test that we are watching before anything else happens
    self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=30, index=None)

    # Send a signal to the callback that a worker has expired
    expired_result = Mock(spec=etcd.EtcdResult)
    expired_result.action = EtcdAction.EXPIRE
    expired_result.key = self.mock_job_key
    expired_result._prev_node = Mock(spec=etcd.EtcdResult)
    expired_result._prev_node.value = json.dumps({
      'builder_id': '1234',
      'had_heartbeat': False,
      'job_queue_item': self.mock_job.job_item,
    })

    yield From(self.manager._handle_builder_expiration(expired_result))

    self.test_executor.stop_builder.assert_called_once_with('1234')
    self.assertEqual(self.test_executor.stop_builder.call_count, 1)

    self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE)

  @async_test
  def test_change_worker(self):
    # Send a signal to the callback that a worker key has been changed
    set_result = Mock(spec=etcd.EtcdResult)
    set_result.action = 'set'
    set_result.key = self.mock_job_key

    self.manager._handle_builder_expiration(set_result)

    yield From(sleep(.01))

    self.assertEquals(self.test_executor.stop_builder.call_count, 0)

  @async_test
  def test_heartbeat_response(self):
    expiration_timestamp = time.time() + 60
    builder_result = Mock(spec=etcd.EtcdResult)
    builder_result.value = json.dumps({
        'builder_id': '123',
        'expiration': expiration_timestamp,
        'max_expiration': expiration_timestamp,
    })
    self.etcd_client_mock.read = Mock(return_value=builder_result)

    yield From(self.manager.job_heartbeat(self.mock_job))

    # Wait for threads to complete
    yield From(sleep(.01))

    self.job_heartbeat_callback.assert_called_once_with(self.mock_job)
    self.assertEqual(self.etcd_client_mock.write.call_count, 1)
    self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)

if __name__ == '__main__':
  unittest.main()