Merge pull request #1840 from coreos-inc/build-internal-error
Fix some issues around state in the build managers
This commit is contained in:
commit
1a49f87972
2 changed files with 24 additions and 13 deletions
|
@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
ETCD_MAX_WATCH_TIMEOUT = 30
|
ETCD_MAX_WATCH_TIMEOUT = 30
|
||||||
ETCD_ATOMIC_OP_TIMEOUT = 120
|
ETCD_ATOMIC_OP_TIMEOUT = 10000
|
||||||
RETRY_IMMEDIATELY_TIMEOUT = 0
|
RETRY_IMMEDIATELY_TIMEOUT = 0
|
||||||
NO_WORKER_AVAILABLE_TIMEOUT = 10
|
NO_WORKER_AVAILABLE_TIMEOUT = 10
|
||||||
DEFAULT_EPHEMERAL_API_TIMEOUT = 20
|
DEFAULT_EPHEMERAL_API_TIMEOUT = 20
|
||||||
|
@ -186,12 +186,14 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
|
|
||||||
# If we have not yet received a heartbeat, then the node failed to boot in some way. We mark
|
# If we have not yet received a heartbeat, then the node failed to boot in some way. We mark
|
||||||
# the job as incomplete here.
|
# the job as incomplete here.
|
||||||
if not job_metadata.get('had_heartbeat', True):
|
if not job_metadata.get('had_heartbeat', False):
|
||||||
logger.warning('Build executor failed to successfully boot with execution id %s',
|
logger.warning('Build executor failed to successfully boot with execution id %s',
|
||||||
execution_id)
|
execution_id)
|
||||||
|
|
||||||
# Take a lock to ensure that only one manager reports the build as incomplete.
|
# Take a lock to ensure that only one manager reports the build as incomplete for this
|
||||||
got_lock = yield From(self._take_etcd_atomic_lock('job-expired', build_job.build_uuid))
|
# execution.
|
||||||
|
got_lock = yield From(self._take_etcd_atomic_lock('job-expired', build_job.build_uuid,
|
||||||
|
execution_id))
|
||||||
if got_lock:
|
if got_lock:
|
||||||
logger.warning('Marking job %s as incomplete', build_job.build_uuid)
|
logger.warning('Marking job %s as incomplete', build_job.build_uuid)
|
||||||
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
|
self.job_complete_callback(build_job, BuildJobResult.INCOMPLETE)
|
||||||
|
@ -419,7 +421,6 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
}
|
}
|
||||||
|
|
||||||
lock_payload = json.dumps(payload)
|
lock_payload = json.dumps(payload)
|
||||||
|
|
||||||
logger.debug('Writing key for job %s with expiration in %s seconds', build_uuid,
|
logger.debug('Writing key for job %s with expiration in %s seconds', build_uuid,
|
||||||
self._ephemeral_setup_timeout)
|
self._ephemeral_setup_timeout)
|
||||||
try:
|
try:
|
||||||
|
@ -427,17 +428,17 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
ttl=self._ephemeral_setup_timeout))
|
ttl=self._ephemeral_setup_timeout))
|
||||||
except (KeyError, etcd.EtcdKeyError):
|
except (KeyError, etcd.EtcdKeyError):
|
||||||
# The job was already taken by someone else, we are probably a retry
|
# The job was already taken by someone else, we are probably a retry
|
||||||
logger.error('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid)
|
logger.warning('Job: %s already exists in etcd, timeout may be misconfigured', build_uuid)
|
||||||
raise Return(False, self._ephemeral_api_timeout)
|
raise Return(False, self._ephemeral_api_timeout)
|
||||||
except etcd.EtcdException:
|
except etcd.EtcdException:
|
||||||
logger.exception('Exception when writing job %s to etcd', build_uuid)
|
logger.exception('Exception when writing job %s to etcd', build_uuid)
|
||||||
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
|
raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
|
||||||
|
|
||||||
|
# Got a lock, now lets boot the job via one of the registered executors.
|
||||||
started_with_executor = None
|
started_with_executor = None
|
||||||
execution_id = None
|
execution_id = None
|
||||||
|
|
||||||
logger.debug("Registered executors are: %s", [ex.name for ex in self._ordered_executors])
|
logger.debug("Registered executors are: %s", [ex.name for ex in self._ordered_executors])
|
||||||
|
|
||||||
for executor in self._ordered_executors:
|
for executor in self._ordered_executors:
|
||||||
# Check if we can use this executor based on its whitelist, by namespace.
|
# Check if we can use this executor based on its whitelist, by namespace.
|
||||||
namespace = build_job.namespace
|
namespace = build_job.namespace
|
||||||
|
@ -474,10 +475,19 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
# Break out of the loop now that we've started a builder successfully.
|
# Break out of the loop now that we've started a builder successfully.
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# If we didn't start the job, cleanup and return it to the queue.
|
||||||
if started_with_executor is None:
|
if started_with_executor is None:
|
||||||
logger.error('Could not start ephemeral worker for build %s', build_uuid)
|
logger.error('Could not start ephemeral worker for build %s', build_uuid)
|
||||||
|
|
||||||
|
# Delete the associated build job record.
|
||||||
|
try:
|
||||||
|
yield From(self._etcd_client.delete(job_key))
|
||||||
|
except (KeyError, etcd.EtcdKeyError):
|
||||||
|
logger.warning('Could not delete job key %s', job_key)
|
||||||
|
|
||||||
raise Return(False, self._ephemeral_api_timeout)
|
raise Return(False, self._ephemeral_api_timeout)
|
||||||
|
|
||||||
|
# Job was started!
|
||||||
logger.debug('Started execution with ID %s for job: %s with executor: %s',
|
logger.debug('Started execution with ID %s for job: %s with executor: %s',
|
||||||
execution_id, build_uuid, started_with_executor.name)
|
execution_id, build_uuid, started_with_executor.name)
|
||||||
|
|
||||||
|
@ -581,7 +591,10 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
|
|
||||||
@coroutine
|
@coroutine
|
||||||
def job_heartbeat(self, build_job):
|
def job_heartbeat(self, build_job):
|
||||||
# Extend the deadline in etcd
|
# Extend the queue item.
|
||||||
|
self.job_heartbeat_callback(build_job)
|
||||||
|
|
||||||
|
# Extend the deadline in etcd.
|
||||||
job_key = self._etcd_job_key(build_job)
|
job_key = self._etcd_job_key(build_job)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -607,7 +620,6 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
# to ensure that if the TTL is < 0, the key will expire immediately.
|
# to ensure that if the TTL is < 0, the key will expire immediately.
|
||||||
etcd_ttl = max(ttl, 0)
|
etcd_ttl = max(ttl, 0)
|
||||||
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=etcd_ttl))
|
yield From(self._etcd_client.write(job_key, json.dumps(payload), ttl=etcd_ttl))
|
||||||
self.job_heartbeat_callback(build_job)
|
|
||||||
|
|
||||||
|
|
||||||
@coroutine
|
@coroutine
|
||||||
|
@ -616,7 +628,7 @@ class EphemeralBuilderManager(BaseManager):
|
||||||
was granted and false otherwise.
|
was granted and false otherwise.
|
||||||
"""
|
"""
|
||||||
pieces = [self._etcd_lock_prefix, path]
|
pieces = [self._etcd_lock_prefix, path]
|
||||||
pieces.extend(*args)
|
pieces.extend(args)
|
||||||
|
|
||||||
lock_key = os.path.join(*pieces)
|
lock_key = os.path.join(*pieces)
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -17,15 +17,14 @@ from buildman.jobutil.buildstatus import StatusHandler
|
||||||
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
|
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
|
||||||
from data import database
|
from data import database
|
||||||
from app import app, metric_queue
|
from app import app, metric_queue
|
||||||
from app import app
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
WORK_CHECK_TIMEOUT = 10
|
WORK_CHECK_TIMEOUT = 10
|
||||||
TIMEOUT_PERIOD_MINUTES = 20
|
TIMEOUT_PERIOD_MINUTES = 20
|
||||||
JOB_TIMEOUT_SECONDS = 300
|
JOB_TIMEOUT_SECONDS = 300
|
||||||
SETUP_LEEWAY_SECONDS = 10
|
SETUP_LEEWAY_SECONDS = 30
|
||||||
MINIMUM_JOB_EXTENSION = timedelta(minutes=2)
|
MINIMUM_JOB_EXTENSION = timedelta(minutes=1)
|
||||||
|
|
||||||
HEARTBEAT_PERIOD_SEC = 30
|
HEARTBEAT_PERIOD_SEC = 30
|
||||||
|
|
||||||
|
|
Reference in a new issue