Improve tests for the ephemeral build manager.

This commit is contained in:
Jake Moshenko 2014-12-22 16:22:07 -05:00
parent e53b6b0e21
commit 2b6c2a2a50
2 changed files with 115 additions and 35 deletions

View file

@ -2,9 +2,11 @@ import logging
import etcd
import uuid
import calendar
import os.path
from datetime import datetime, timedelta
from trollius import From, coroutine, Return
from trollius import From, coroutine, Return, async
from concurrent.futures import ThreadPoolExecutor
from buildman.manager.basemanager import BaseManager
from buildman.manager.executor import PopenExecutor, EC2Executor
@ -16,25 +18,11 @@ logger = logging.getLogger(__name__)
ETCD_BUILDER_PREFIX = 'building/'
def clear_etcd(client):
""" Debugging method used to clear out the section of etcd we are using to track jobs in flight.
"""
try:
building = client.read(ETCD_BUILDER_PREFIX, recursive=True)
for child in building.leaves:
if not child.dir:
logger.warning('Deleting key: %s', child.key)
client.delete(child.key)
except KeyError:
pass
ETCD_EXPIRE_RESULT = 'expire'
class EphemeralBuilderManager(BaseManager):
""" Build manager implementation for the Enterprise Registry. """
shutting_down = False
_executors = {
'popen': PopenExecutor,
'ec2': EC2Executor,
@ -43,7 +31,10 @@ class EphemeralBuilderManager(BaseManager):
_etcd_client_klass = etcd.Client
def __init__(self, *args, **kwargs):
self._shutting_down = False
self._manager_config = None
self._async_thread_executor = None
self._etcd_client = None
self._component_to_job = {}
@ -51,8 +42,35 @@ class EphemeralBuilderManager(BaseManager):
self._executor = None
self._worker_watch_task = None
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
def _watch_builders(self):
""" Watch the builders key for expirations.
"""
if not self._shutting_down:
workers_future = self._etcd_client.watch(ETCD_BUILDER_PREFIX, recursive=True)
workers_future.add_done_callback(self._handle_key_expiration)
logger.debug('Scheduling watch task.')
self._worker_watch_task = async(workers_future)
def _handle_key_expiration(self, changed_key_future):
""" Handle when a builder expires
"""
if self._worker_watch_task is None or self._worker_watch_task.done():
self._watch_builders()
if changed_key_future.cancelled():
# Due to lack of interest, tomorrow has been cancelled
return
etcd_result = changed_key_future.result()
if etcd_result.action == ETCD_EXPIRE_RESULT:
# Handle the expiration
logger.debug('Builder expired, clean up the old build node')
async(self._clean_up_old_builder(etcd_result.key, etcd_result._prev_node.value))
def initialize(self, manager_config):
logger.debug('Calling initialize')
self._manager_config = manager_config
@ -65,7 +83,11 @@ class EphemeralBuilderManager(BaseManager):
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port)
self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port))
self._async_thread_executor = ThreadPoolExecutor(self._manager_config.get('ETCD_WORKERS', 5))
self._etcd_client = AsyncWrapper(self._etcd_client_klass(host=etcd_host, port=etcd_port),
executor=self._async_thread_executor)
self._watch_builders()
def setup_time(self):
setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300)
@ -73,8 +95,17 @@ class EphemeralBuilderManager(BaseManager):
return setup_time
def shutdown(self):
logger.debug('Calling shutdown.')
raise NotImplementedError
logger.debug('Shutting down worker.')
self._shutting_down = True
if self._worker_watch_task is not None:
logger.debug('Canceling watch task.')
self._worker_watch_task.cancel()
self._worker_watch_task = None
if self._async_thread_executor is not None:
logger.debug('Shutting down thread pool executor.')
self._async_thread_executor.shutdown()
@coroutine
def schedule(self, build_job, loop):
@ -161,9 +192,8 @@ class EphemeralBuilderManager(BaseManager):
yield From(self._etcd_client.delete(job_key))
@staticmethod
def _etcd_job_key(build_job):
""" Create a key which is used to track a job in etcd.
"""
return '{0}{1}'.format(ETCD_BUILDER_PREFIX, build_job.job_details['build_uuid'])
return os.path.join(ETCD_BUILDER_PREFIX, build_job.job_details['build_uuid'])

View file

@ -1,12 +1,15 @@
import unittest
import etcd
import os.path
import time
from trollius import coroutine, get_event_loop, From, Future
from trollius import coroutine, get_event_loop, From, Future, sleep
from mock import Mock
from functools import partial
from threading import Event
from buildman.manager.executor import BuilderExecutor
from buildman.manager.ephemeral import EphemeralBuilderManager, ETCD_BUILDER_PREFIX
from buildman.manager.ephemeral import (EphemeralBuilderManager, ETCD_BUILDER_PREFIX,
ETCD_EXPIRE_RESULT)
from buildman.server import BuildJobResult
from buildman.component.buildcomponent import BuildComponent
@ -14,10 +17,6 @@ from buildman.component.buildcomponent import BuildComponent
BUILD_UUID = 'deadbeef-dead-beef-dead-deadbeefdead'
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def async_test(f):
def wrapper(*args, **kwargs):
coro = coroutine(f)
@ -29,11 +28,17 @@ def async_test(f):
class TestEphemeral(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.etcd_client_mock = None
self.etcd_wait_event = Event()
self.test_executor = None
super(TestEphemeral, self).__init__(*args, **kwargs)
def _create_mock_etcd_client(self, *args, **kwargs):
def hang_until_event(*args, **kwargs):
time.sleep(.01) # 10ms to simulate network latency
self.etcd_wait_event.wait()
self.etcd_client_mock = Mock(spec=etcd.Client, name='etcd.Client')
self.etcd_client_mock.watch = Mock(side_effect=hang_until_event)
return self.etcd_client_mock
def _create_mock_executor(self, *args, **kwargs):
@ -61,6 +66,7 @@ class TestEphemeral(unittest.TestCase):
self.old_etcd_client_klass = EphemeralBuilderManager._etcd_client_klass
EphemeralBuilderManager._etcd_client_klass = self._create_mock_etcd_client
self.etcd_wait_event.clear()
self.register_component_callback = Mock()
self.uniregister_component_callback = Mock()
@ -77,7 +83,13 @@ class TestEphemeral(unittest.TestCase):
self.manager.initialize({'EXECUTOR': 'test'})
self.mock_job_key = os.path.join(ETCD_BUILDER_PREFIX, BUILD_UUID)
def tearDown(self):
self.etcd_wait_event.set()
self.manager.shutdown()
del EphemeralBuilderManager._executors['test']
EphemeralBuilderManager._etcd_client_klass = self.old_etcd_client_klass
@ -95,15 +107,53 @@ class TestEphemeral(unittest.TestCase):
self.assertTrue(is_scheduled)
job_key = ETCD_BUILDER_PREFIX + mock_job.job_details['build_uuid']
self.etcd_client_mock.read.assert_called_once_with(ETCD_BUILDER_PREFIX, recursive=True)
self.assertEqual(len(self.test_executor.start_builder.call_args_list), 1)
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], job_key)
self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], job_key)
self.assertEqual(self.test_executor.start_builder.call_count, 1)
self.assertEqual(self.etcd_client_mock.write.call_args_list[0][0][0], self.mock_job_key)
self.assertEqual(self.etcd_client_mock.write.call_args_list[1][0][0], self.mock_job_key)
self.assertEqual(len(self.register_component_callback.call_args_list), 1)
self.assertEqual(self.register_component_callback.call_count, 1)
yield From(self.manager.job_completed(mock_job, BuildJobResult.COMPLETE, test_component))
self.assertEqual(len(self.test_executor.stop_builder.call_args_list), 1)
self.etcd_client_mock.delete.assert_called_once_with(job_key)
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)
@async_test
def test_expiring_worker(self):
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_called_once_with(ETCD_BUILDER_PREFIX, recursive=True)
# Send a signal to the callback that a worker has expired
expired_result = Mock(sepc=etcd.EtcdResult)
expired_result.action = ETCD_EXPIRE_RESULT
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = {'builder_id': '1234'}
expired_future = Future()
expired_future.set_result(expired_result)
self.manager._handle_key_expiration(expired_future)
yield From(sleep(.01))
self.test_executor.stop_builder.assert_called_once_with('1234')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.etcd_client_mock.delete.assert_called_once_with(self.mock_job_key)
@async_test
def test_change_worker(self):
# Send a signal to the callback that a worker key has been changed
set_result = Mock(sepc=etcd.EtcdResult)
set_result.action = 'set'
set_result.key = self.mock_job_key
set_future = Future()
set_future.set_result(set_result)
self.manager._handle_key_expiration(set_future)
yield From(sleep(.01))
self.assertEquals(self.test_executor.stop_builder.call_count, 0)