Merge pull request #46 from jakedt/thegooddieyoung

Fix the problem of the queue and etcd getting out of sync. Fix the buildman to track whether a builder ever successfully registers.
This commit is contained in:
josephschorr 2015-06-10 17:48:40 -04:00
commit 7ba000a256
6 changed files with 106 additions and 38 deletions

View file

@ -32,8 +32,9 @@ class BaseManager(object):
@coroutine @coroutine
def schedule(self, build_job): def schedule(self, build_job):
""" Schedules a queue item to be built. Returns True if the item was properly scheduled """ Schedules a queue item to be built. Returns a 2-tuple with (True, None) if the item was
and False if all workers are busy. properly scheduled and (False, a retry timeout in seconds) if all workers are busy or an
error occurs.
""" """
raise NotImplementedError raise NotImplementedError

View file

@ -8,6 +8,7 @@ from buildman.manager.basemanager import BaseManager
from trollius import From, Return, coroutine from trollius import From, Return, coroutine
REGISTRATION_REALM = 'registration' REGISTRATION_REALM = 'registration'
RETRY_TIMEOUT = 5
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class DynamicRegistrationComponent(BaseComponent): class DynamicRegistrationComponent(BaseComponent):
@ -61,13 +62,13 @@ class EnterpriseManager(BaseManager):
def schedule(self, build_job): def schedule(self, build_job):
""" Schedules a build for an Enterprise Registry. """ """ Schedules a build for an Enterprise Registry. """
if self.shutting_down or not self.ready_components: if self.shutting_down or not self.ready_components:
raise Return(False) raise Return(False, RETRY_TIMEOUT)
component = self.ready_components.pop() component = self.ready_components.pop()
yield From(component.start_build(build_job)) yield From(component.start_build(build_job))
raise Return(True) raise Return(True, None)
@coroutine @coroutine
def build_component_ready(self, build_component): def build_component_ready(self, build_component):

View file

@ -15,6 +15,7 @@ from buildman.manager.executor import PopenExecutor, EC2Executor
from buildman.component.buildcomponent import BuildComponent from buildman.component.buildcomponent import BuildComponent
from buildman.jobutil.buildjob import BuildJob from buildman.jobutil.buildjob import BuildJob
from buildman.asyncutil import AsyncWrapper from buildman.asyncutil import AsyncWrapper
from buildman.server import BuildJobResult
from util.morecollections import AttrDict from util.morecollections import AttrDict
@ -22,6 +23,8 @@ logger = logging.getLogger(__name__)
ETCD_DISABLE_TIMEOUT = 0 ETCD_DISABLE_TIMEOUT = 0
EC2_API_TIMEOUT = 20
RETRY_IMMEDIATELY_TIMEOUT = 0
class EtcdAction(object): class EtcdAction(object):
@ -89,6 +92,7 @@ class EphemeralBuilderManager(BaseManager):
logger.debug('Scheduling watch of key: %s%s', etcd_key, '/*' if recursive else '') logger.debug('Scheduling watch of key: %s%s', etcd_key, '/*' if recursive else '')
self._watch_tasks[watch_task_key] = async(watch_future) self._watch_tasks[watch_task_key] = async(watch_future)
@coroutine
def _handle_builder_expiration(self, etcd_result): def _handle_builder_expiration(self, etcd_result):
if etcd_result is None: if etcd_result is None:
return return
@ -99,8 +103,25 @@ class EphemeralBuilderManager(BaseManager):
job_metadata = json.loads(etcd_result._prev_node.value) job_metadata = json.loads(etcd_result._prev_node.value)
if 'builder_id' in job_metadata: if 'builder_id' in job_metadata:
logger.info('Terminating expired build node.') builder_id = job_metadata['builder_id']
async(self._executor.stop_builder(job_metadata['builder_id']))
# Before we delete the build node, we take a lock to make sure that only one manager
# can terminate the node.
try:
lock_key = self._etcd_lock_key(builder_id)
yield From(self._etcd_client.write(lock_key, '', prevExist=False, ttl=self.setup_time()))
except (KeyError, etcd.EtcdKeyError):
logger.debug('Somebody else is cleaning up the build node: %s', builder_id)
return
if not job_metadata.get('had_heartbeat', True):
logger.warning('Build node failed to successfully boot: %s', builder_id)
build_job = BuildJob(AttrDict(job_metadata['job_queue_item']))
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
logger.info('Terminating expired build node: %s', builder_id)
yield From(self._executor.stop_builder(builder_id))
def _handle_realm_change(self, etcd_result): def _handle_realm_change(self, etcd_result):
if etcd_result is None: if etcd_result is None:
@ -180,6 +201,8 @@ class EphemeralBuilderManager(BaseManager):
self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/') self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/')
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change) self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change)
self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'locks/')
# Load components for all realms currently known to the cluster # Load components for all realms currently known to the cluster
async(self._register_existing_realms()) async(self._register_existing_realms())
@ -213,23 +236,24 @@ class EphemeralBuilderManager(BaseManager):
except (KeyError, etcd.EtcdKeyError): except (KeyError, etcd.EtcdKeyError):
workers_alive = 0 workers_alive = 0
except etcd.EtcdException: except etcd.EtcdException:
logger.exception('Exception when reading job count from etcd') logger.exception('Exception when reading job count from etcd for job: %s', build_uuid)
raise Return(False) raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
logger.debug('Total jobs: %s', workers_alive) logger.debug('Total jobs: %s', workers_alive)
if workers_alive >= allowed_worker_count: if workers_alive >= allowed_worker_count:
logger.info('Too many workers alive, unable to start new worker. %s >= %s', workers_alive, logger.info('Too many workers alive, unable to start new worker for build job: %s. %s >= %s',
allowed_worker_count) build_uuid, workers_alive, allowed_worker_count)
raise Return(False) raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
job_key = self._etcd_job_key(build_job) job_key = self._etcd_job_key(build_job)
# First try to take a lock for this job, meaning we will be responsible for its lifeline # First try to take a lock for this job, meaning we will be responsible for its lifeline
realm = str(uuid.uuid4()) realm = str(uuid.uuid4())
token = str(uuid.uuid4()) token = str(uuid.uuid4())
ttl = self.setup_time() nonce = str(uuid.uuid4())
expiration = datetime.utcnow() + timedelta(seconds=ttl) setup_time = self.setup_time()
expiration = datetime.utcnow() + timedelta(seconds=setup_time)
machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200) machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200)
max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration) max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration)
@ -237,33 +261,40 @@ class EphemeralBuilderManager(BaseManager):
payload = { payload = {
'expiration': calendar.timegm(expiration.timetuple()), 'expiration': calendar.timegm(expiration.timetuple()),
'max_expiration': calendar.timegm(max_expiration.timetuple()), 'max_expiration': calendar.timegm(max_expiration.timetuple()),
'nonce': nonce,
'had_heartbeat': False,
'job_queue_item': build_job.job_item,
} }
lock_payload = json.dumps(payload)
try: try:
yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=False, ttl=ttl)) yield From(self._etcd_client.write(job_key, lock_payload, prevExist=False,
ttl=EC2_API_TIMEOUT))
except (KeyError, etcd.EtcdKeyError): except (KeyError, etcd.EtcdKeyError):
# The job was already taken by someone else, we are probably a retry # The job was already taken by someone else, we are probably a retry
logger.exception('Job already exists in etcd, are timeouts misconfigured or is the queue broken?') logger.error('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid)
raise Return(False) raise Return(False, EC2_API_TIMEOUT)
except etcd.EtcdException: except etcd.EtcdException:
logger.exception('Exception when writing job %s to etcd', build_uuid) logger.exception('Exception when writing job %s to etcd', build_uuid)
raise Return(False) raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
logger.debug('Starting builder with executor: %s', self._executor) executor_type = self._executor.__class__.__name__
logger.debug('Starting builder for job: %s with executor: %s', build_uuid, executor_type)
try: try:
builder_id = yield From(self._executor.start_builder(realm, token, build_uuid)) builder_id = yield From(self._executor.start_builder(realm, token, build_uuid))
except: except:
logger.exception('Exception when starting builder for job: %s', build_uuid) logger.exception('Exception when starting builder for job: %s', build_uuid)
raise Return(False) raise Return(False, EC2_API_TIMEOUT)
# Store the builder in etcd associated with the job id # Store the builder in etcd associated with the job id
try: try:
payload['builder_id'] = builder_id payload['builder_id'] = builder_id
yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=True, ttl=ttl)) yield From(self._etcd_client.write(job_key, json.dumps(payload), prevValue=lock_payload,
ttl=setup_time))
except etcd.EtcdException: except etcd.EtcdException:
logger.exception('Exception when writing job %s to etcd', build_uuid) logger.exception('Exception when writing job %s to etcd', build_uuid)
raise Return(False) raise Return(False, EC2_API_TIMEOUT)
# Store the realm spec which will allow any manager to accept this builder when it connects # Store the realm spec which will allow any manager to accept this builder when it connects
realm_spec = json.dumps({ realm_spec = json.dumps({
@ -275,15 +306,16 @@ class EphemeralBuilderManager(BaseManager):
try: try:
yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False, yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False,
ttl=ttl)) ttl=setup_time))
except (KeyError, etcd.EtcdKeyError): except (KeyError, etcd.EtcdKeyError):
logger.error('Realm already exists in etcd. UUID collision or something is very very wrong.') logger.error('Realm %s already exists in etcd for job %s ' +
raise Return(False) 'UUID collision or something is very very wrong.', realm, build_uuid)
raise Return(False, setup_time)
except etcd.EtcdException: except etcd.EtcdException:
logger.exception('Exception when writing realm %s to etcd', realm) logger.exception('Exception when writing realm %s to etcd for job %s', realm, build_uuid)
raise Return(False) raise Return(False, setup_time)
raise Return(True) raise Return(True, None)
@coroutine @coroutine
def build_component_ready(self, build_component): def build_component_ready(self, build_component):
@ -333,7 +365,9 @@ class EphemeralBuilderManager(BaseManager):
payload = { payload = {
'expiration': calendar.timegm(new_expiration.timetuple()), 'expiration': calendar.timegm(new_expiration.timetuple()),
'builder_id': build_job_metadata['builder_id'], 'builder_id': build_job_metadata['builder_id'],
'job_queue_item': build_job.job_item,
'max_expiration': build_job_metadata['max_expiration'], 'max_expiration': build_job_metadata['max_expiration'],
'had_heartbeat': True,
} }
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl)) yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=ttl))
@ -345,6 +379,11 @@ class EphemeralBuilderManager(BaseManager):
""" """
return os.path.join(self._etcd_builder_prefix, build_job.job_details['build_uuid']) return os.path.join(self._etcd_builder_prefix, build_job.job_details['build_uuid'])
def _etcd_lock_key(self, unique_lock_id):
""" Create a key which is used to create a temporary lock in etcd.
"""
return os.path.join(self._etcd_lock_prefix, unique_lock_id)
def _etcd_realm_key(self, realm): def _etcd_realm_key(self, realm):
""" Create a key which is used to track an incoming connection on a realm. """ Create a key which is used to track an incoming connection on a realm.
""" """

View file

@ -169,26 +169,28 @@ class BuilderServer(object):
self._queue.incomplete(job_item, restore_retry=False) self._queue.incomplete(job_item, restore_retry=False)
continue continue
logger.debug('Build job found. Checking for an avaliable worker.') logger.debug('Checking for an avaliable worker for build job %s',
build_job.repo_build.uuid)
try: try:
scheduled = yield From(self._lifecycle_manager.schedule(build_job)) schedule_success, retry_timeout = yield From(self._lifecycle_manager.schedule(build_job))
except: except:
logger.exception('Exception when scheduling job') logger.exception('Exception when scheduling job: %s', build_job.repo_build.uuid)
self._current_status = BuildServerStatus.EXCEPTION self._current_status = BuildServerStatus.EXCEPTION
return return
if scheduled: if schedule_success:
logger.debug('Marking build %s as scheduled', build_job.repo_build.uuid) logger.debug('Marking build %s as scheduled', build_job.repo_build.uuid)
status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid) status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid)
status_handler.set_phase('build-scheduled') status_handler.set_phase(database.BUILD_PHASE.BUILD_SCHEDULED)
self._job_count = self._job_count + 1 self._job_count = self._job_count + 1
logger.debug('Build job %s scheduled. Running: %s', build_job.repo_build.uuid, logger.debug('Build job %s scheduled. Running: %s', build_job.repo_build.uuid,
self._job_count) self._job_count)
else: else:
logger.debug('All workers are busy. Requeuing.') logger.debug('All workers are busy for job %s Requeuing after %s seconds.',
self._queue.incomplete(job_item, restore_retry=True, retry_after=WORK_CHECK_TIMEOUT + 5) build_job.repo_build.uuid, retry_timeout)
self._queue.incomplete(job_item, restore_retry=True, retry_after=retry_timeout)
@trollius.coroutine @trollius.coroutine
def _queue_metrics_updater(self): def _queue_metrics_updater(self):

View file

@ -515,6 +515,7 @@ class BUILD_PHASE(object):
""" Build phases enum """ """ Build phases enum """
ERROR = 'error' ERROR = 'error'
INTERNAL_ERROR = 'internalerror' INTERNAL_ERROR = 'internalerror'
BUILD_SCHEDULED = 'build-scheduled'
UNPACKING = 'unpacking' UNPACKING = 'unpacking'
PULLING = 'pulling' PULLING = 'pulling'
BUILDING = 'building' BUILDING = 'building'

View file

@ -5,7 +5,7 @@ import time
import json import json
from trollius import coroutine, get_event_loop, From, Future, sleep, Return from trollius import coroutine, get_event_loop, From, Future, sleep, Return
from mock import Mock from mock import Mock, ANY
from threading import Event from threading import Event
from urllib3.exceptions import ReadTimeoutError from urllib3.exceptions import ReadTimeoutError
@ -191,13 +191,36 @@ class TestEphemeral(unittest.TestCase):
expired_result._prev_node = Mock(spec=etcd.EtcdResult) expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({'builder_id': '1234'}) expired_result._prev_node.value = json.dumps({'builder_id': '1234'})
self.manager._handle_builder_expiration(expired_result) yield From(self.manager._handle_builder_expiration(expired_result))
yield From(sleep(.01))
self.test_executor.stop_builder.assert_called_once_with('1234') self.test_executor.stop_builder.assert_called_once_with('1234')
self.assertEqual(self.test_executor.stop_builder.call_count, 1) self.assertEqual(self.test_executor.stop_builder.call_count, 1)
@async_test
def test_builder_never_starts(self):
test_component = yield From(self._setup_job_for_managers())
# Test that we are watching before anything else happens
self.etcd_client_mock.watch.assert_any_call('building/', recursive=True, timeout=0)
# Send a signal to the callback that a worker has expired
expired_result = Mock(spec=etcd.EtcdResult)
expired_result.action = EtcdAction.EXPIRE
expired_result.key = self.mock_job_key
expired_result._prev_node = Mock(spec=etcd.EtcdResult)
expired_result._prev_node.value = json.dumps({
'builder_id': '1234',
'had_heartbeat': False,
'job_queue_item': self.mock_job.job_item,
})
yield From(self.manager._handle_builder_expiration(expired_result))
self.test_executor.stop_builder.assert_called_once_with('1234')
self.assertEqual(self.test_executor.stop_builder.call_count, 1)
self.job_complete_callback.assert_called_once_with(ANY, BuildJobResult.INCOMPLETE)
@async_test @async_test
def test_change_worker(self): def test_change_worker(self):
# Send a signal to the callback that a worker key has been changed # Send a signal to the callback that a worker key has been changed
@ -233,3 +256,4 @@ class TestEphemeral(unittest.TestCase):
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()