Another fix for the record keeping in buildman

Adds some more mocked tests as well
This commit is contained in:
Joseph Schorr 2016-07-21 17:22:37 -04:00
parent 040977a35f
commit 392242d20b
2 changed files with 89 additions and 22 deletions

View file

@ -131,7 +131,6 @@ class EphemeralBuilderManager(BaseManager):
except ProtocolError:
logger.exception('Exception on etcd watch: %s', etcd_key)
if watch_task_key not in self._watch_tasks or self._watch_tasks[watch_task_key].done():
self._watch_etcd(etcd_key, change_callback, start_index=new_index, restarter=restarter)
@ -148,22 +147,28 @@ class EphemeralBuilderManager(BaseManager):
self._watch_tasks[watch_task_key] = async(watch_future)
@coroutine
def _handle_job_expiration(self, etcd_result):
""" Handler invoked whenever a job expires in etcd. """
def _handle_job_expiration_or_delete(self, etcd_result):
""" Handler invoked whenever a job expires or is deleted in etcd. """
if etcd_result is None:
return
if etcd_result.action != EtcdAction.EXPIRE:
return
# Handle the expiration
logger.debug('Builder expired, clean up the old build node')
# Handle the expiration/deletion
job_metadata = json.loads(etcd_result._prev_node.value)
build_job = BuildJob(AttrDict(job_metadata['job_queue_item']))
build_info = self._build_uuid_to_info.get(build_job.build_uuid)
logger.debug('Job %s %s', etcd_result.action, build_job.build_uuid)
# Pop the build info.
build_info = self._build_uuid_to_info.get(build_job.build_uuid, None)
if build_info is None:
logger.error('Could not find build info for job %s under etcd expire with metadata: %s',
build_job.build_uuid, job_metadata)
logger.debug('No build info for %s job %s (%s); was probably already deleted by this manager',
etcd_result.action, build_job.build_uuid, job_metadata)
return
# If the etcd action was not an expiration, then it was already deleted and the execution
# shutdown.
if etcd_result.action != EtcdAction.EXPIRE:
# Build information will no longer be needed; pop it off.
self._build_uuid_to_info.pop(build_job.build_uuid, None)
return
execution_id = build_info.execution_id
@ -196,8 +201,8 @@ class EphemeralBuilderManager(BaseManager):
build_job = BuildJob(AttrDict(realm_spec['job_queue_item']))
build_uuid = build_job.build_uuid
logger.debug('Realm key expired for build %s', build_uuid)
build_info = self._build_uuid_to_info.pop(build_uuid, None)
logger.debug('Realm key %s for build %s', etcd_result.action, build_uuid)
build_info = self._build_uuid_to_info.get(build_uuid, None)
if build_info is not None:
# Pop the component off. If we find one, then the build has not connected to this manager,
# so we can safely unregister its component.
@ -297,12 +302,18 @@ class EphemeralBuilderManager(BaseManager):
logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port)
worker_threads = self._manager_config.get('ETCD_WORKER_THREADS', 5)
(self._etcd_client, self._async_thread_executor) = self._etcd_client_creator(worker_threads,
host=etcd_host, port=etcd_port, cert=etcd_auth, ca_cert=etcd_ca_cert,
protocol=etcd_protocol, read_timeout=5)
(self._etcd_client, self._async_thread_executor) = self._etcd_client_creator(
worker_threads,
host=etcd_host,
port=etcd_port,
cert=etcd_auth,
ca_cert=etcd_ca_cert,
protocol=etcd_protocol,
read_timeout=5,
)
self._etcd_job_prefix = self._manager_config.get('ETCD_BUILDER_PREFIX', 'building/')
self._watch_etcd(self._etcd_job_prefix, self._handle_job_expiration)
self._watch_etcd(self._etcd_job_prefix, self._handle_job_expiration_or_delete)
self._etcd_realm_prefix = self._manager_config.get('ETCD_REALM_PREFIX', 'realm/')
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change,
@ -511,7 +522,7 @@ class EphemeralBuilderManager(BaseManager):
logger.info('Starting termination of executor for job %s', build_uuid)
build_info = self._build_uuid_to_info.pop(build_uuid, None)
if build_info is None:
logger.error('Could not find build information for build %s', build_uuid)
logger.debug('Build information not found for build %s; skipping termination', build_uuid)
return
# Remove the build's component.