Make sure to not hold DB connections open in the new build manager
This commit is contained in:
		
							parent
							
								
									84ba5d5f23
								
							
						
					
					
						commit
						cf35da30bc
					
				
					 2 changed files with 29 additions and 17 deletions
				
			
		|  | @ -1,5 +1,6 @@ | |||
| from data.database import BUILD_PHASE | ||||
| from data.database import BUILD_PHASE, UseThenDisconnect | ||||
| from data import model | ||||
| from app import app | ||||
| 
 | ||||
| class StatusHandler(object): | ||||
|   """ Context wrapper for writing status to build logs. """ | ||||
|  | @ -43,9 +44,11 @@ class StatusHandler(object): | |||
|     self._append_log_message(phase, self._build_logs.PHASE, extra_data) | ||||
| 
 | ||||
|     # Update the repository build with the new phase | ||||
|     repo_build = model.get_repository_build(self._uuid) | ||||
|     repo_build.phase = phase | ||||
|     repo_build.save() | ||||
|     with UseThenDisconnect(app.config): | ||||
|       repo_build = model.get_repository_build(self._uuid) | ||||
|       repo_build.phase = phase | ||||
|       repo_build.save() | ||||
| 
 | ||||
|     return True | ||||
| 
 | ||||
|   def __enter__(self): | ||||
|  |  | |||
|  | @ -12,7 +12,9 @@ from trollius.coroutines import From | |||
| from datetime import timedelta | ||||
| 
 | ||||
| from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException | ||||
| from data import database | ||||
| from data.queue import WorkQueue | ||||
| from app import app | ||||
| 
 | ||||
| logger = logging.getLogger(__name__) | ||||
| 
 | ||||
|  | @ -118,30 +120,33 @@ class BuilderServer(object): | |||
|     self._session_factory.remove(component) | ||||
| 
 | ||||
|   def _job_heartbeat(self, build_job): | ||||
|     WorkQueue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS, | ||||
|                                 retry_count=1, minimum_extension=MINIMUM_JOB_EXTENSION) | ||||
|     with database.UseThenDisconnect(app.config): | ||||
|       WorkQueue.extend_processing(build_job.job_item, seconds_from_now=JOB_TIMEOUT_SECONDS, | ||||
|                                   retry_count=1, minimum_extension=MINIMUM_JOB_EXTENSION) | ||||
| 
 | ||||
|   def _job_complete(self, build_job, job_status): | ||||
|     if job_status == BuildJobResult.INCOMPLETE: | ||||
|       self._queue.incomplete(build_job.job_item, restore_retry=True, retry_after=30) | ||||
|     elif job_status == BuildJobResult.ERROR: | ||||
|       self._queue.incomplete(build_job.job_item, restore_retry=False) | ||||
|     else: | ||||
|       self._queue.complete(build_job.job_item) | ||||
|     with database.UseThenDisconnect(app.config): | ||||
|       if job_status == BuildJobResult.INCOMPLETE: | ||||
|         self._queue.incomplete(build_job.job_item, restore_retry=True, retry_after=30) | ||||
|       elif job_status == BuildJobResult.ERROR: | ||||
|         self._queue.incomplete(build_job.job_item, restore_retry=False) | ||||
|       else: | ||||
|         self._queue.complete(build_job.job_item) | ||||
| 
 | ||||
|     self._job_count = self._job_count - 1 | ||||
| 
 | ||||
|     if self._current_status == 'shutting_down' and not self._job_count: | ||||
|       self._shutdown_event.set() | ||||
| 
 | ||||
|     # TODO(jschorr): check for work here? | ||||
| 
 | ||||
|   @trollius.coroutine | ||||
|   def _work_checker(self): | ||||
|     while self._current_status == 'running': | ||||
|       logger.debug('Checking for more work for %d active workers', | ||||
|                    self._lifecycle_manager.num_workers()) | ||||
|       job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time()) | ||||
| 
 | ||||
|       with database.UseThenDisconnect(app.config): | ||||
|         job_item = self._queue.get(processing_time=self._lifecycle_manager.setup_time()) | ||||
| 
 | ||||
|       if job_item is None: | ||||
|         logger.debug('No additional work found. Going to sleep for %s seconds', WORK_CHECK_TIMEOUT) | ||||
|         yield From(trollius.sleep(WORK_CHECK_TIMEOUT)) | ||||
|  | @ -151,7 +156,9 @@ class BuilderServer(object): | |||
|         build_job = BuildJob(job_item) | ||||
|       except BuildJobLoadException as irbe: | ||||
|         logger.exception(irbe) | ||||
|         self._queue.incomplete(job_item, restore_retry=False) | ||||
| 
 | ||||
|         with database.UseThenDisconnect(app.config): | ||||
|           self._queue.incomplete(job_item, restore_retry=False) | ||||
| 
 | ||||
|       logger.debug('Build job found. Checking for an avaliable worker.') | ||||
|       scheduled = yield From(self._lifecycle_manager.schedule(build_job)) | ||||
|  | @ -160,7 +167,9 @@ class BuilderServer(object): | |||
|         logger.debug('Build job scheduled. Running: %s', self._job_count) | ||||
|       else: | ||||
|         logger.debug('All workers are busy. Requeuing.') | ||||
|         self._queue.incomplete(job_item, restore_retry=True, retry_after=0) | ||||
| 
 | ||||
|         with database.UseThenDisconnect(app.config): | ||||
|           self._queue.incomplete(job_item, restore_retry=True, retry_after=0) | ||||
| 
 | ||||
|   @trollius.coroutine | ||||
|   def _initialize(self, loop, host, websocket_port, controller_port, ssl=None): | ||||
|  |  | |||
		Reference in a new issue