Fix multiple reporting of incomplete

This commit is contained in:
Joseph Schorr 2016-08-17 16:01:28 -04:00
parent 0b50928900
commit 3112388004

View file

@ -25,12 +25,15 @@ logger = logging.getLogger(__name__)
ETCD_MAX_WATCH_TIMEOUT = 30
ETCD_ATOMIC_OP_TIMEOUT = 30
RETRY_IMMEDIATELY_TIMEOUT = 0
NO_WORKER_AVAILABLE_TIMEOUT = 10
DEFAULT_EPHEMERAL_API_TIMEOUT = 20
DEFAULT_EPHEMERAL_SETUP_TIMEOUT = 300
class EtcdAction(object):
""" Enumeration of the various kinds of etcd actions we can observe via a watch. """
GET = 'get'
SET = 'set'
EXPIRE = 'expire'
@ -70,6 +73,7 @@ class EphemeralBuilderManager(BaseManager):
self._etcd_realm_prefix = None
self._etcd_job_prefix = None
self._etcd_lock_prefix = None
self._ephemeral_api_timeout = DEFAULT_EPHEMERAL_API_TIMEOUT
self._ephemeral_setup_timeout = DEFAULT_EPHEMERAL_SETUP_TIMEOUT
@ -155,16 +159,16 @@ class EphemeralBuilderManager(BaseManager):
if etcd_result is None:
raise Return()
if etcd_result.action == EtcdAction.CREATE:
if etcd_result.action in (EtcdAction.CREATE, EtcdAction.SET):
raise Return()
elif etcd_result.action in (EtcdAction.DELETE, EtcdAction.EXPIRE):
# Handle the expiration/deletion
# Handle the expiration/deletion.
job_metadata = json.loads(etcd_result._prev_node.value)
build_job = BuildJob(AttrDict(job_metadata['job_queue_item']))
logger.debug('Got "%s" of job %s', etcd_result.action, build_job.build_uuid)
# Pop the build info.
# Get the build info.
build_info = self._build_uuid_to_info.get(build_job.build_uuid, None)
if build_info is None:
logger.debug('No build info for "%s" job %s (%s); probably already deleted by this manager',
@ -185,9 +189,15 @@ class EphemeralBuilderManager(BaseManager):
if not job_metadata.get('had_heartbeat', True):
logger.warning('Build executor failed to successfully boot with execution id %s',
execution_id)
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
# Finally, we terminate the build execution for the job.
# Take a lock to ensure that only one manager reports the build as incomplete.
got_lock = yield From(self._take_etcd_atomic_lock('job-expired', build_job.build_uuid))
if got_lock:
logger.warning('Marking job %s as incomplete', build_job.build_uuid)
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
# Finally, we terminate the build execution for the job. We don't do this under a lock as
# terminating a node is an atomic operation; better to make sure it is terminated than not.
logger.info('Terminating expired build executor for job %s with execution id %s',
build_job.build_uuid, execution_id)
yield From(self.kill_builder_executor(build_job.build_uuid))
@ -340,6 +350,8 @@ class EphemeralBuilderManager(BaseManager):
self._watch_etcd(self._etcd_realm_prefix, self._handle_realm_change,
restarter=self._register_existing_realms)
self._etcd_lock_prefix = self._manager_config.get('ETCD_LOCK_PREFIX', 'lock/')
self._ephemeral_api_timeout = self._manager_config.get('API_TIMEOUT',
DEFAULT_EPHEMERAL_API_TIMEOUT)
@ -591,6 +603,23 @@ class EphemeralBuilderManager(BaseManager):
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=etcd_ttl))
self.job_heartbeat_callback(build_job)
@coroutine
def _take_etcd_atomic_lock(self, path, *args):
""" Takes a lock for atomic operations via etcd over the given path. Returns true if the lock
was granted and false otherwise.
"""
pieces = [self._etcd_lock_prefix, path]
pieces.extend(*args)
lock_key = os.path.join(*pieces)
try:
yield From(self._etcd_client.write(lock_key, {}, prevExist=False, ttl=ETCD_ATOMIC_OP_TIMEOUT))
raise Return(True)
except (KeyError, etcd.EtcdKeyError):
raise Return(False)
def _etcd_job_key(self, build_job):
""" Create a key which is used to track a job in etcd.
"""