diff --git a/workers/dockerfilebuild.py b/workers/dockerfilebuild.py index 6e0c53694..76b6ff9e7 100644 --- a/workers/dockerfilebuild.py +++ b/workers/dockerfilebuild.py @@ -20,6 +20,7 @@ from datetime import datetime, timedelta from threading import Event from uuid import uuid4 from collections import defaultdict +from requests.exceptions import ConnectionError from data import model from workers.worker import Worker, WorkerUnhealthyException, JobException @@ -35,6 +36,19 @@ CACHE_EXPIRATION_PERIOD_HOURS = 24 NO_TAGS = [':'] RESERVATION_TIME = (TIMEOUT_PERIOD_MINUTES + 5) * 60 +def matches_system_error(status_str): + """ Returns true if the given status string matches a known system error in the + Docker builder. + """ + KNOWN_MATCHES = ['lxc-start: invalid', 'lxc-start: failed to', 'lxc-start: Permission denied'] + + for match in KNOWN_MATCHES: + # 4 because we might have a Unix control code at the start. + if status_str.find(match[0:len(match) + 4]) <= 4: + return True + + return False + class StatusWrapper(object): def __init__(self, build_uuid): @@ -141,13 +155,20 @@ class DockerfileBuildContext(object): message = 'Docker installation is no longer healthy.' logger.exception(message) raise WorkerUnhealthyException(message) + return self def __exit__(self, exc_type, value, traceback): - self.__cleanup_containers() - shutil.rmtree(self._build_dir) + try: + self.__cleanup_containers() + except APIError: + sentry.client.captureException() + message = 'Docker installation is no longer healthy.' + logger.exception(message) + raise WorkerUnhealthyException(message) + @staticmethod def __inject_quay_repo_env(parsed_dockerfile, quay_reponame): env_command = { @@ -247,6 +268,11 @@ class DockerfileBuildContext(object): fully_unwrapped = status status_str = str(fully_unwrapped.encode('utf-8')) + + # Check for system errors when building. + if matches_system_error(status_str): + raise WorkerUnhealthyException(status_str) + logger.debug('Status: %s', status_str) step_increment = re.search(r'Step ([0-9]+) :', status_str) if step_increment: @@ -437,17 +463,20 @@ class DockerfileBuildWorker(Worker): def watchdog(self): logger.debug('Running build watchdog code.') - docker_cl = Client() + try: + docker_cl = Client() - # Iterate the running containers and kill ones that have been running more than 20 minutes - for container in docker_cl.containers(): - start_time = datetime.fromtimestamp(container['Created']) - running_time = datetime.now() - start_time - if running_time > timedelta(minutes=TIMEOUT_PERIOD_MINUTES): - logger.warning('Container has been running too long: %s with command: %s', - container['Id'], container['Command']) - docker_cl.kill(container['Id']) - self._timeout.set() + # Iterate the running containers and kill ones that have been running more than 20 minutes + for container in docker_cl.containers(): + start_time = datetime.fromtimestamp(container['Created']) + running_time = datetime.now() - start_time + if running_time > timedelta(minutes=TIMEOUT_PERIOD_MINUTES): + logger.warning('Container has been running too long: %s with command: %s', + container['Id'], container['Command']) + docker_cl.kill(container['Id']) + self._timeout.set() + except ConnectionError as exc: + raise WorkerUnhealthyException(exc.message) def process_queue_item(self, job_details): self._timeout.clear() @@ -548,6 +577,12 @@ class DockerfileBuildWorker(Worker): # Need a separate handler for this so it doesn't get caught by catch all below raise exc + except ConnectionError as exc: + # A connection exception means the worker has become unhealthy (Docker is down) + # so we re-raise as that exception. + log_appender('Docker daemon has gone away. Will retry shortly.', build_logs.ERROR) + raise WorkerUnhealthyException(exc.message) + except Exception as exc: sentry.client.captureException() log_appender('error', build_logs.PHASE) diff --git a/workers/worker.py b/workers/worker.py index 5e04c1d5e..f13f29fc0 100644 --- a/workers/worker.py +++ b/workers/worker.py @@ -96,6 +96,14 @@ class Worker(object): if self.current_queue_item is not None: self._queue.extend_processing(self.current_queue_item, seconds_from_now) + def run_watchdog(self): + logger.debug('Running watchdog.') + try: + self.watchdog() + except WorkerUnhealthyException: + logger.error('The worker has encountered an error and will not take new jobs.') + self._stop.set() + def poll_queue(self): logger.debug('Getting work item from queue.') @@ -112,7 +120,7 @@ class Worker(object): logger.warning('An error occurred processing request: %s', self.current_queue_item.body) self._queue.incomplete(self.current_queue_item) except WorkerUnhealthyException: - logger.error('The worker has encountered an error and will not take new jobs.') + logger.error('The worker has encountered an error and will not take new jobs. Job is being requeued.') self._stop.set() self._queue.incomplete(self.current_queue_item, restore_retry=True) finally: @@ -147,7 +155,7 @@ class Worker(object): self._sched.add_interval_job(self.poll_queue, seconds=self._poll_period_seconds, start_date=soon) self._sched.add_interval_job(self.update_queue_metrics, seconds=60, start_date=soon) - self._sched.add_interval_job(self.watchdog, seconds=self._watchdog_period_seconds) + self._sched.add_interval_job(self.run_watchdog, seconds=self._watchdog_period_seconds) signal.signal(signal.SIGTERM, self.terminate) signal.signal(signal.SIGINT, self.terminate)