Add a watchdog timer to the build worker to kill a build step that takes more than 20 minutes.
This commit is contained in:
parent
204fecc1f9
commit
b95d3ec329
2 changed files with 38 additions and 5 deletions
|
@ -12,6 +12,8 @@ from docker import Client, APIError
|
|||
from tempfile import TemporaryFile, mkdtemp
|
||||
from zipfile import ZipFile
|
||||
from functools import partial
|
||||
from datetime import datetime, timedelta
|
||||
from threading import Event
|
||||
|
||||
from data.queue import dockerfile_build_queue
|
||||
from data import model
|
||||
|
@ -31,6 +33,8 @@ logger = logging.getLogger(__name__)
|
|||
user_files = app.config['USERFILES']
|
||||
build_logs = app.config['BUILDLOGS']
|
||||
|
||||
TIMEOUT_PERIOD_MINUTES = 20
|
||||
|
||||
|
||||
class StatusWrapper(object):
|
||||
def __init__(self, build_uuid):
|
||||
|
@ -316,6 +320,8 @@ class DockerfileBuildWorker(Worker):
|
|||
'application/gzip': DockerfileBuildWorker.__prepare_tarball,
|
||||
}
|
||||
|
||||
self._timeout = Event()
|
||||
|
||||
@staticmethod
|
||||
def __prepare_zip(request_file):
|
||||
build_dir = mkdtemp(prefix='docker-build-')
|
||||
|
@ -347,7 +353,24 @@ class DockerfileBuildWorker(Worker):
|
|||
|
||||
return build_dir
|
||||
|
||||
def watchdog(self):
|
||||
logger.debug('Running build watchdog code.')
|
||||
|
||||
docker_cl = Client()
|
||||
|
||||
# Iterate the running containers and kill ones that have been running more than 20 minutes
|
||||
for container in docker_cl.containers():
|
||||
start_time = datetime.fromtimestamp(container[u'Created'])
|
||||
running_time = datetime.now() - start_time
|
||||
if running_time > timedelta(minutes=TIMEOUT_PERIOD_MINUTES):
|
||||
logger.warning('Container has been running too long: %s with command: %s',
|
||||
container[u'Id'], container[u'Command'])
|
||||
docker_cl.kill(container[u'Id'])
|
||||
self._timeout.set()
|
||||
|
||||
def process_queue_item(self, job_details):
|
||||
self._timeout.clear()
|
||||
|
||||
repository_build = model.get_repository_build(job_details['namespace'],
|
||||
job_details['repository'],
|
||||
job_details['build_uuid'])
|
||||
|
@ -370,7 +393,7 @@ class DockerfileBuildWorker(Worker):
|
|||
|
||||
start_msg = ('Starting job with resource url: %s repo: %s' % (resource_url,
|
||||
repo))
|
||||
log_appender(start_msg)
|
||||
logger.debug(start_msg)
|
||||
|
||||
docker_resource = requests.get(resource_url, stream=True)
|
||||
c_type = docker_resource.headers['content-type']
|
||||
|
@ -402,7 +425,11 @@ class DockerfileBuildWorker(Worker):
|
|||
log_appender('error', build_logs.PHASE)
|
||||
repository_build.phase = 'error'
|
||||
repository_build.save()
|
||||
log_appender('Unable to build dockerfile.', build_logs.ERROR)
|
||||
if self._timeout.is_set():
|
||||
log_appender('Build step was terminated after %s minutes.' % TIMEOUT_PERIOD_MINUTES,
|
||||
build_logs.ERROR)
|
||||
else:
|
||||
log_appender('Unable to build dockerfile.', build_logs.ERROR)
|
||||
return True
|
||||
|
||||
log_appender('pushing', build_logs.PHASE)
|
||||
|
|
|
@ -9,10 +9,12 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class Worker(object):
|
||||
def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300):
|
||||
def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300,
|
||||
watchdog_period_seconds=60):
|
||||
self._sched = Scheduler()
|
||||
self._poll_period_seconds = poll_period_seconds
|
||||
self._reservation_seconds = reservation_seconds
|
||||
self._watchdog_period_seconds = watchdog_period_seconds
|
||||
self._stop = Event()
|
||||
self._queue = queue
|
||||
|
||||
|
@ -20,6 +22,10 @@ class Worker(object):
|
|||
""" Return True if complete, False if it should be retried. """
|
||||
raise NotImplementedError('Workers must implement run.')
|
||||
|
||||
def watchdog(self):
|
||||
""" Function that gets run once every watchdog_period_seconds. """
|
||||
pass
|
||||
|
||||
def poll_queue(self):
|
||||
logger.debug('Getting work item from queue.')
|
||||
|
||||
|
@ -43,8 +49,8 @@ class Worker(object):
|
|||
logger.debug("Scheduling worker.")
|
||||
|
||||
self._sched.start()
|
||||
self._sched.add_interval_job(self.poll_queue,
|
||||
seconds=self._poll_period_seconds)
|
||||
self._sched.add_interval_job(self.poll_queue, seconds=self._poll_period_seconds)
|
||||
self._sched.add_interval_job(self.watchdog, seconds=self._watchdog_period_seconds)
|
||||
|
||||
while not self._stop.wait(1):
|
||||
pass
|
||||
|
|
Reference in a new issue