This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/test_buildman.py

235 lines
7.9 KiB
Python

import unittest
import etcd
import os.path
import time
import json
from trollius import coroutine, get_event_loop, From, Future, sleep, Return
from mock import Mock
from threading import Event
from urllib3.exceptions import ReadTimeoutError
from buildman.manager.executor import BuilderExecutor
from buildman.manager.ephemeral import EphemeralBuilderManager, EtcdAction
from buildman.server import BuildJobResult
from buildman.component.buildcomponent import BuildComponent
BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
REALM_ID = '1234-realm'
def async_test(f):
def wrapper(*args, **kwargs):
coro = coroutine(f)
future = coro(*args, **kwargs)
loop = get_event_loop()
loop.run_until_complete(future)
return wrapper
class TestEphemeral(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.etcd_client_mock = None
self.etcd_wait_event = Event()
self.test_executor = None
super(TestEphemeral, self).__init__(*args, **kwargs)
def _create_mock_etcd_client(self, *args, **kwargs):
def hang_until_event(*args, **kwargs):
time.sleep(.01) # 10ms to simulate network latency
self.etcd_wait_event.wait()
self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client')
self.etcd_client_mock.watch = Mock(side_effect=hang_until_event)
return self.etcd_client_mock
def _create_completed_future(self, result=None):
def inner(*args, **kwargs):
new_future = Future()
new_future.set_result(result)
return new_future
return inner
def _create_mock_executor(self, *args, **kwargs):
self.test_executor = Mock(spec=BuilderExecutor)
self.test_executor.start_builder = Mock(side_effect=self._create_completed_future('123'))
self.test_executor.stop_builder = Mock(side_effect=self._create_completed_future())
return self.test_executor
def _create_build_job(self):
mock_job = Mock()
mock_job.job_details = {
'build_uuid': BUILD_UUID,
}
mock_job.job_item = {
'body': json.dumps(mock_job.job_details),
'id': 1,
}
return mock_job
def setUp(self):
EphemeralBuilderManager._executors['test'] = self._create_mock_executor
self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass
EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client
self.etcd_wait_event.clear()
self.register_component_callback = Mock()
self.unregister_component_callback = Mock()
self.job_heartbeat_callback = Mock()
self.job_complete_callback = Mock()
self.manager = EphemeralBuilderManager(
self.register_component_callback,
self.unregister_component_callback,
self.job_heartbeat_callback,
self.job_complete_callback,
'127.0.0.1',
30,
)
self.manager.initialize({'EXECUTOR': 'test'})
self.mock_job = self._create_build_job()
self.mock_job_key = os.path.join('building/', BUILD_UUID)
def tearDown(self):
self.etcd_wait_event.set()
self.manager.shutdown()
del EphemeralBuilderManager._executors['test']
EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass
@coroutine
def _setup_job_for_managers(self):
# Test that we are watching the realm location before anything else happens
self.etcd_client_mock.watch.assert_any_call('realm/', recursive=True, timeout=0)
self.etcd_client_mock.read = Mock(side_effect=KeyError)
test_component = Mock(spec=BuildComponent)
test_component.builder_realm = REALM_ID
test_component.start_build = Mock(side_effect=self._create_completed_future())
self.register_component_callback.return_value = test_component
# Ask for a builder to be scheduled
is_scheduled = yield From(self.manager.schedule(self.mock_job))
self.assertTrue(is_scheduled)
self.etcd_client_mock.read.assert_called_once_with('building/', recursive=True)
self.assertEqual(self.test_executor.start_builder.call_count, 1)
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], self.mock_job_key)
# Right now the job is not registered with any managers because etcd has not accepted the job
self.assertEqual(self.register_component_callback.call_count, 0)
realm_created = Mock(spec=etcd.EtcdResult)
realm_created.action = EtcdAction.CREATE
realm_created.key = os.path.join('realm/', REALM_ID)
realm_created.value = json.dumps({
'realm': REALM_ID,
'token': 'beef',
'builder_id': '123',
'job_queue_item': self.mock_job.job_item,
})
self.manager._handle_realm_change(realm_created)
self.assertEqual(self.register_component_callback.call_count, 1)
raise Return(test_component)
@async_test
def test_schedule_and_complete(self):
# Test that a job is properly registered with all of the managers
test_component = yield From(self._setup_job_for_managers())
# Take the job ourselves
yield From(self.manager.build_component_ready(test_component))
self.etcd_client_mock.delete.assert_called_once_with(os.path.join('realm/', REALM_ID))
self.etcd_client_mock.delete.reset_mock()
# Finish the job
yield From(self.manager.job_completed(self.mock_job, BuildJobResult.COMPLETE, test_component))
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)
@async_test
def test_another_manager_takes_job(self):
# Prepare a job to be taken by another manager
test_component = yield From(self._setup_job_for_managers())
realm_deleted = Mock(spec=etcd.EtcdResult)
realm_deleted.action = EtcdAction.DELETE
realm_deleted.key = os.path.join('realm/', REALM_ID)
realm_deleted._prev_node = Mock(spec=etcd.EtcdResult)
realm_deleted._prev_node.value = json.dumps({
'realm': REALM_ID,
'token': 'beef',
'builder_id': '123',
'job_queue_item': self.mock_job.job_item,
})
self.manager._handle_realm_change(realm_deleted)
self.unregister_component_callback.assert_called_once_with(test_component)
@async_test
def test_expiring_worker(self):
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({'builder_id': '1234'})
self.manager._handle_builder_expiration(expired_result)
yield From(sleep(.01))
self.test_executor.stop_builder.assert_called_once_with('1234')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
@async_test
def test_change_worker(self):
# Send a signal to the callback that a worker key has been changed
set_result = Mock(sepc=etcd.EtcdResult)
set_result.action = 'set'
set_result.key = self.mock_job_key
self.manager._handle_builder_expiration(set_result)
yield From(sleep(.01))
self.assertEquals(self.test_executor.stop_builder.call_count, 0)
@async_test
def test_heartbeat_response(self):
expiration_timestamp = time.time() + 60
builder_result = Mock(spec=etcd.EtcdResult)
builder_result.value = json.dumps({
'builder_id': '123',
'expiration': expiration_timestamp,
'max_expiration': expiration_timestamp,
})
self.etcd_client_mock.read = Mock(return_value=builder_result)
yield From(self.manager.job_heartbeat(self.mock_job))
# Wait for threads to complete
yield From(sleep(.01))
self.job_heartbeat_callback.assert_called_once_with(self.mock_job)
self.assertEqual(self.etcd_client_mock.write.call_count, 1)
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
if __name__ == '__main__':
unittest.main()