Allow the individual build manager types to specify how long the queue should wait before retring a job that fails to schedule.

This commit is contained in:
Jake Moshenko 2015-06-10 14:17:32 -04:00
parent a99414b222
commit d31e25d5cd
4 changed files with 28 additions and 19 deletions

View file

@ -32,8 +32,9 @@ class BaseManager(object):
@coroutine @coroutine
def schedule(self, build_job): def schedule(self, build_job):
""" Schedules a queue item to be built. Returns True if the item was properly scheduled """ Schedules a queue item to be built. Returns a 2-tuple with (True, None) if the item was
and False if all workers are busy. properly scheduled and (False, a retry timeout in seconds) if all workers are busy or an
error occurs.
""" """
raise NotImplementedError raise NotImplementedError

View file

@ -8,6 +8,7 @@ from buildman.manager.basemanager import BaseManager
from trollius import From, Return, coroutine from trollius import From, Return, coroutine
REGISTRATION_REALM = 'registration' REGISTRATION_REALM = 'registration'
RETRY_TIMEOUT = 5
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class DynamicRegistrationComponent(BaseComponent): class DynamicRegistrationComponent(BaseComponent):
@ -61,13 +62,13 @@ class EnterpriseManager(BaseManager):
def schedule(self, build_job): def schedule(self, build_job):
""" Schedules a build for an Enterprise Registry. """ """ Schedules a build for an Enterprise Registry. """
if self.shutting_down or not self.ready_components: if self.shutting_down or not self.ready_components:
raise Return(False) raise Return(False, RETRY_TIMEOUT)
component = self.ready_components.pop() component = self.ready_components.pop()
yield From(component.start_build(build_job)) yield From(component.start_build(build_job))
raise Return(True) raise Return(True, None)
@coroutine @coroutine
def build_component_ready(self, build_component): def build_component_ready(self, build_component):

View file

@ -22,6 +22,8 @@ logger = logging.getLogger(__name__)
ETCD_DISABLE_TIMEOUT = 0 ETCD_DISABLE_TIMEOUT = 0
EC2_API_TIMEOUT = 20
RETRY_IMMEDIATELY_TIMEOUT = 0
class EtcdAction(object): class EtcdAction(object):
@ -214,22 +216,23 @@ class EphemeralBuilderManager(BaseManager):
workers_alive = 0 workers_alive = 0
except etcd.EtcdException: except etcd.EtcdException:
logger.exception('Exception when reading job count from etcd') logger.exception('Exception when reading job count from etcd')
raise Return(False) raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
logger.debug('Total jobs: %s', workers_alive) logger.debug('Total jobs: %s', workers_alive)
if workers_alive >= allowed_worker_count: if workers_alive >= allowed_worker_count:
logger.info('Too many workers alive, unable to start new worker. %s >= %s', workers_alive, logger.info('Too many workers alive, unable to start new worker. %s >= %s', workers_alive,
allowed_worker_count) allowed_worker_count)
raise Return(False) raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
job_key = self._etcd_job_key(build_job) job_key = self._etcd_job_key(build_job)
# First try to take a lock for this job, meaning we will be responsible for its lifeline # First try to take a lock for this job, meaning we will be responsible for its lifeline
realm = str(uuid.uuid4()) realm = str(uuid.uuid4())
token = str(uuid.uuid4()) token = str(uuid.uuid4())
ttl = self.setup_time() nonce = str(uuid.uuid4())
expiration = datetime.utcnow() + timedelta(seconds=ttl) setup_time = self.setup_time()
expiration = datetime.utcnow() + timedelta(seconds=setup_time)
machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200) machine_max_expiration = self._manager_config.get('MACHINE_MAX_TIME', 7200)
max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration) max_expiration = datetime.utcnow() + timedelta(seconds=machine_max_expiration)
@ -237,17 +240,20 @@ class EphemeralBuilderManager(BaseManager):
payload = { payload = {
'expiration': calendar.timegm(expiration.timetuple()), 'expiration': calendar.timegm(expiration.timetuple()),
'max_expiration': calendar.timegm(max_expiration.timetuple()), 'max_expiration': calendar.timegm(max_expiration.timetuple()),
'nonce': nonce,
} }
lock_payload = json.dumps(payload)
try: try:
yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=False, ttl=ttl)) yield From(self._etcd_client.write(job_key, lock_payload, prevExist=False,
ttl=EC2_API_TIMEOUT))
except (KeyError, etcd.EtcdKeyError): except (KeyError, etcd.EtcdKeyError):
# The job was already taken by someone else, we are probably a retry # The job was already taken by someone else, we are probably a retry
logger.exception('Job already exists in etcd, are timeouts misconfigured or is the queue broken?') logger.exception('Job already exists in etcd, are timeouts misconfigured or is the queue broken?')
raise Return(False) raise Return(False)
except etcd.EtcdException: except etcd.EtcdException:
logger.exception('Exception when writing job %s to etcd', build_uuid) logger.exception('Exception when writing job %s to etcd', build_uuid)
raise Return(False) raise Return(False, RETRY_IMMEDIATELY_TIMEOUT)
logger.debug('Starting builder with executor: %s', self._executor) logger.debug('Starting builder with executor: %s', self._executor)
@ -255,15 +261,16 @@ class EphemeralBuilderManager(BaseManager):
builder_id = yield From(self._executor.start_builder(realm, token, build_uuid)) builder_id = yield From(self._executor.start_builder(realm, token, build_uuid))
except: except:
logger.exception('Exception when starting builder for job: %s', build_uuid) logger.exception('Exception when starting builder for job: %s', build_uuid)
raise Return(False) raise Return(False, EC2_API_TIMEOUT)
# Store the builder in etcd associated with the job id # Store the builder in etcd associated with the job id
try: try:
payload['builder_id'] = builder_id payload['builder_id'] = builder_id
yield From(self._etcd_client.write(job_key, json.dumps(payload), prevExist=True, ttl=ttl)) yield From(self._etcd_client.write(job_key, json.dumps(payload), prevValue=lock_payload,
ttl=setup_time))
except etcd.EtcdException: except etcd.EtcdException:
logger.exception('Exception when writing job %s to etcd', build_uuid) logger.exception('Exception when writing job %s to etcd', build_uuid)
raise Return(False) raise Return(False, EC2_API_TIMEOUT)
# Store the realm spec which will allow any manager to accept this builder when it connects # Store the realm spec which will allow any manager to accept this builder when it connects
realm_spec = json.dumps({ realm_spec = json.dumps({
@ -275,15 +282,15 @@ class EphemeralBuilderManager(BaseManager):
try: try:
yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False, yield From(self._etcd_client.write(self._etcd_realm_key(realm), realm_spec, prevExist=False,
ttl=ttl)) ttl=setup_time))
except (KeyError, etcd.EtcdKeyError): except (KeyError, etcd.EtcdKeyError):
logger.error('Realm already exists in etcd. UUID collision or something is very very wrong.') logger.error('Realm already exists in etcd. UUID collision or something is very very wrong.')
raise Return(False) raise Return(False, setup_time)
except etcd.EtcdException: except etcd.EtcdException:
logger.exception('Exception when writing realm %s to etcd', realm) logger.exception('Exception when writing realm %s to etcd', realm)
raise Return(False) raise Return(False)
raise Return(True) raise Return(True, None)
@coroutine @coroutine
def build_component_ready(self, build_component): def build_component_ready(self, build_component):

View file

@ -172,13 +172,13 @@ class BuilderServer(object):
logger.debug('Build job found. Checking for an avaliable worker.') logger.debug('Build job found. Checking for an avaliable worker.')
try: try:
scheduled = yield From(self._lifecycle_manager.schedule(build_job)) schedule_success, retry_timeout = yield From(self._lifecycle_manager.schedule(build_job))
except: except:
logger.exception('Exception when scheduling job') logger.exception('Exception when scheduling job')
self._current_status = BuildServerStatus.EXCEPTION self._current_status = BuildServerStatus.EXCEPTION
return return
if scheduled: if schedule_success:
logger.debug('Marking build %s as scheduled', build_job.repo_build.uuid) logger.debug('Marking build %s as scheduled', build_job.repo_build.uuid)
status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid) status_handler = StatusHandler(self._build_logs, build_job.repo_build.uuid)
status_handler.set_phase('build-scheduled') status_handler.set_phase('build-scheduled')
@ -188,7 +188,7 @@ class BuilderServer(object):
self._job_count) self._job_count)
else: else:
logger.debug('All workers are busy. Requeuing.') logger.debug('All workers are busy. Requeuing.')
self._queue.incomplete(job_item, restore_retry=True, retry_after=WORK_CHECK_TIMEOUT + 5) self._queue.incomplete(job_item, restore_retry=True, retry_after=retry_timeout)
@trollius.coroutine @trollius.coroutine
def _queue_metrics_updater(self): def _queue_metrics_updater(self):