diff --git a/buildman/jobutil/buildstatus.py b/buildman/jobutil/buildstatus.py index b79776c46..11c732080 100644 --- a/buildman/jobutil/buildstatus.py +++ b/buildman/jobutil/buildstatus.py @@ -1,5 +1,6 @@ -from data.database import BUILD_PHASE +from data.database import BUILD_PHASE, UseThenDisconnect from data import model +from app import app class StatusHandler(object): """ Context wrapper for writing status to build logs. """ @@ -43,9 +44,11 @@ class StatusHandler(object): self._append_log_message(phase, self._build_logs.PHASE, extra_data) # Update the repository build with the new phase - repo_build = model.get_repository_build(self._uuid) - repo_build.phase = phase - repo_build.save() + with UseThenDisconnect(app.config): + repo_build = model.get_repository_build(self._uuid) + repo_build.phase = phase + repo_build.save() + return True def __enter__(self): diff --git a/buildman/server.py b/buildman/server.py index d8702d962..1a9da0913 100644 --- a/buildman/server.py +++ b/buildman/server.py @@ -12,7 +12,9 @@ from trollius.coroutines import From from datetime import timedelta from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException +from data import database from data.queue import WorkQueue +from app import app logger = logging.getLogger(__name__) @@ -118,30 +120,33 @@ class BuilderServer(object): self._session_factory.remove(component) def _job_heartbeat(self, build_job): - WorkQueue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS, - retry_count=1, minimum_extension=MINIMUM_JOB_EXTENSION) + with database.UseThenDisconnect(app.config): + WorkQueue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS, + retry_count=1, minimum_extension=MINIMUM_JOB_EXTENSION) def _job_complete(self, build_job, job_status): - if job_status == BuildJobResult.INCOMPLETE: - self._queue.incomplete(build_job.job_item, restore_retry=True, retry_after=30) - elif job_status == BuildJobResult.ERROR: - self._queue.incomplete(build_job.job_item, restore_retry=False) - else: - self._queue.complete(build_job.job_item) + with database.UseThenDisconnect(app.config): + if job_status == BuildJobResult.INCOMPLETE: + self._queue.incomplete(build_job.job_item, restore_retry=True, retry_after=30) + elif job_status == BuildJobResult.ERROR: + self._queue.incomplete(build_job.job_item, restore_retry=False) + else: + self._queue.complete(build_job.job_item) self._job_count = self._job_count - 1 if self._current_status == 'shutting_down' and not self._job_count: self._shutdown_event.set() - # TODO(jschorr): check for work here? - @trollius.coroutine def _work_checker(self): while self._current_status == 'running': logger.debug('Checking for more work for %d active workers', self._lifecycle_manager.num_workers()) - job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time()) + + with database.UseThenDisconnect(app.config): + job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time()) + if job_item is None: logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT) yield From(trollius.sleep(WORK_CHECK_TIMEOUT)) @@ -151,7 +156,9 @@ class BuilderServer(object): build_job = BuildJob(job_item) except BuildJobLoadException as irbe: logger.exception(irbe) - self._queue.incomplete(job_item, restore_retry=False) + + with database.UseThenDisconnect(app.config): + self._queue.incomplete(job_item, restore_retry=False) logger.debug('Build job found. Checking for an avaliable worker.') scheduled = yield From(self._lifecycle_manager.schedule(build_job)) @@ -160,7 +167,9 @@ class BuilderServer(object): logger.debug('Build job scheduled. Running: %s', self._job_count) else: logger.debug('All workers are busy. Requeuing.') - self._queue.incomplete(job_item, restore_retry=True, retry_after=0) + + with database.UseThenDisconnect(app.config): + self._queue.incomplete(job_item, restore_retry=True, retry_after=0) @trollius.coroutine def _initialize(self, loop, host, websocket_port, controller_port, ssl=None):