Fix some issues around state in the build managers

- Make sure to cleanup the job if the executor could not be started
- Change the setup leeway to further ensure there isn't any crossover between the queue item timing out and the cleanup of the jobs
- Make the lock used for marking jobs as internal error extremely long, but also based on the execution ID. This should ensure we don't get duplicates while allowing different executions to be handled properly.
- Make sure to invoke the callback update for the queue before we run off to etcd; should reduce certain timeouts

Hopefully Fixes #1836
This commit is contained in:
Joseph Schorr 2016-09-15 14:37:45 -04:00
parent 949ceae4eb
commit f9f60b9faf
2 changed files with 24 additions and 13 deletions

View file

@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
ETCD_MAX_WATCH_TIMEOUT = 30 ETCD_MAX_WATCH_TIMEOUT = 30
ETCD_ATOMIC_OP_TIMEOUT = 120 ETCD_ATOMIC_OP_TIMEOUT = 10000
RETRY_IMMEDIATELY_TIMEOUT = 0 RETRY_IMMEDIATELY_TIMEOUT = 0
NO_WORKER_AVAILABLE_TIMEOUT = 10 NO_WORKER_AVAILABLE_TIMEOUT = 10
DEFAULT_EPHEMERAL_API_TIMEOUT = 20 DEFAULT_EPHEMERAL_API_TIMEOUT = 20
@ -186,12 +186,14 @@ class EphemeralBuilderManager(BaseManager):
# If we have not yet received a heartbeat, then the node failed to boot in some way. We mark # If we have not yet received a heartbeat, then the node failed to boot in some way. We mark
# the job as incomplete here. # the job as incomplete here.
if not job_metadata.get('had_heartbeat', True): if not job_metadata.get('had_heartbeat', False):
logger.warning('Build executor failed to successfully boot with execution id %s', logger.warning('Build executor failed to successfully boot with execution id %s',
execution_id) execution_id)
# Take a lock to ensure that only one manager reports the build as incomplete. # Take a lock to ensure that only one manager reports the build as incomplete for this
got_lock = yield From(self._take_etcd_atomic_lock('job-expired', build_job.build_uuid)) # execution.
got_lock = yield From(self._take_etcd_atomic_lock('job-expired', build_job.build_uuid,
execution_id))
if got_lock: if got_lock:
logger.warning('Marking job %s as incomplete', build_job.build_uuid) logger.warning('Marking job %s as incomplete', build_job.build_uuid)
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE) self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
@ -419,7 +421,6 @@ class EphemeralBuilderManager(BaseManager):
} }
lock_payload = json.dumps(payload) lock_payload = json.dumps(payload)
logger.debug('Writing key for job %s with expiration in %s seconds', build_uuid, logger.debug('Writing key for job %s with expiration in %s seconds', build_uuid,
self._ephemeral_setup_timeout) self._ephemeral_setup_timeout)
try: try:
@ -427,17 +428,17 @@ class EphemeralBuilderManager(BaseManager):
ttl=self._ephemeral_setup_timeout)) ttl=self._ephemeral_setup_timeout))
except (KeyError, etcd.EtcdKeyError): except (KeyError, etcd.EtcdKeyError):
# The job was already taken by someone else, we are probably a retry # The job was already taken by someone else, we are probably a retry
logger.error('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid) logger.warning('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid)
raise Return(False, self._ephemeral_api_timeout) raise Return(False, self._ephemeral_api_timeout)
except etcd.EtcdException: except etcd.EtcdException:
logger.exception('Exception when writing job %s to etcd', build_uuid) logger.exception('Exception when writing job %s to etcd', build_uuid)
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT) raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
# Got a lock, now lets boot the job via one of the registered executors.
started_with_executor = None started_with_executor = None
execution_id = None execution_id = None
logger.debug("Registered executors are: %s", [ex.name for ex in self._ordered_executors]) logger.debug("Registered executors are: %s", [ex.name for ex in self._ordered_executors])
for executor in self._ordered_executors: for executor in self._ordered_executors:
# Check if we can use this executor based on its whitelist, by namespace. # Check if we can use this executor based on its whitelist, by namespace.
namespace = build_job.namespace namespace = build_job.namespace
@ -474,10 +475,19 @@ class EphemeralBuilderManager(BaseManager):
# Break out of the loop now that we've started a builder successfully. # Break out of the loop now that we've started a builder successfully.
break break
# If we didn't start the job, cleanup and return it to the queue.
if started_with_executor is None: if started_with_executor is None:
logger.error('Could not start ephemeral worker for build %s', build_uuid) logger.error('Could not start ephemeral worker for build %s', build_uuid)
# Delete the associated build job record.
try:
yield From(self._etcd_client.delete(job_key))
except (KeyError, etcd.EtcdKeyError):
logger.warning('Could not delete job key %s', job_key)
raise Return(False, self._ephemeral_api_timeout) raise Return(False, self._ephemeral_api_timeout)
# Job was started!
logger.debug('Started execution with ID %s for job: %s with executor: %s', logger.debug('Started execution with ID %s for job: %s with executor: %s',
execution_id, build_uuid, started_with_executor.name) execution_id, build_uuid, started_with_executor.name)
@ -581,7 +591,10 @@ class EphemeralBuilderManager(BaseManager):
@coroutine @coroutine
def job_heartbeat(self, build_job): def job_heartbeat(self, build_job):
# Extend the deadline in etcd # Extend the queue item.
self.job_heartbeat_callback(build_job)
# Extend the deadline in etcd.
job_key = self._etcd_job_key(build_job) job_key = self._etcd_job_key(build_job)
try: try:
@ -607,7 +620,6 @@ class EphemeralBuilderManager(BaseManager):
# to ensure that if the TTL is < 0, the key will expire immediately. # to ensure that if the TTL is < 0, the key will expire immediately.
etcd_ttl = max(ttl, 0) etcd_ttl = max(ttl, 0)
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=etcd_ttl)) yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=etcd_ttl))
self.job_heartbeat_callback(build_job)
@coroutine @coroutine
@ -616,7 +628,7 @@ class EphemeralBuilderManager(BaseManager):
was granted and false otherwise. was granted and false otherwise.
""" """
pieces = [self._etcd_lock_prefix, path] pieces = [self._etcd_lock_prefix, path]
pieces.extend(*args) pieces.extend(args)
lock_key = os.path.join(*pieces) lock_key = os.path.join(*pieces)
try: try:

View file

@ -17,15 +17,14 @@ from buildman.jobutil.buildstatus import StatusHandler
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
from data import database from data import database
from app import app, metric_queue from app import app, metric_queue
from app import app
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
WORK_CHECK_TIMEOUT = 10 WORK_CHECK_TIMEOUT = 10
TIMEOUT_PERIOD_MINUTES = 20 TIMEOUT_PERIOD_MINUTES = 20
JOB_TIMEOUT_SECONDS = 300 JOB_TIMEOUT_SECONDS = 300
SETUP_LEEWAY_SECONDS = 10 SETUP_LEEWAY_SECONDS = 30
MINIMUM_JOB_EXTENSION = timedelta(minutes=2) MINIMUM_JOB_EXTENSION = timedelta(minutes=1)
HEARTBEAT_PERIOD_SEC = 30 HEARTBEAT_PERIOD_SEC = 30