diff --git a/data/queue.py b/data/queue.py index af1df3045..61a03a631 100644 --- a/data/queue.py +++ b/data/queue.py @@ -7,6 +7,9 @@ from app import app transaction_factory = app.config['DB_TRANSACTION_FACTORY'] +MINIMUM_EXTENSION = timedelta(seconds=20) + + class WorkQueue(object): def __init__(self, queue_name, canonical_name_match_list=None): self.queue_name = queue_name @@ -80,17 +83,24 @@ class WorkQueue(object): completed_item.delete_instance() @staticmethod - def incomplete(incomplete_item, retry_after=300): + def incomplete(incomplete_item, retry_after=300, restore_retry=False): retry_date = datetime.now() + timedelta(seconds=retry_after) incomplete_item.available_after = retry_date incomplete_item.available = True + + if restore_retry: + incomplete_item.retries_remaining += 1 + incomplete_item.save() @staticmethod def extend_processing(queue_item, seconds_from_now): new_expiration = datetime.now() + timedelta(seconds=seconds_from_now) - queue_item.processing_expires = new_expiration - queue_item.save() + + # Only actually write the new expiration to the db if it moves the expiration some minimum + if new_expiration - queue_item.processing_expires > MINIMUM_EXTENSION: + queue_item.processing_expires = new_expiration + queue_item.save() image_diff_queue = WorkQueue(app.config['DIFFS_QUEUE_NAME']) diff --git a/workers/dockerfilebuild.py b/workers/dockerfilebuild.py index 0bc7d7467..b6a098fe5 100644 --- a/workers/dockerfilebuild.py +++ b/workers/dockerfilebuild.py @@ -20,7 +20,7 @@ from collections import defaultdict from data.queue import dockerfile_build_queue from data import model -from workers.worker import Worker +from workers.worker import Worker, WorkerUnhealthyException, JobException from app import app, userfiles as user_files from util.safetar import safe_extractall from util.dockerfileparse import parse_dockerfile, ParsedDockerfile, serialize_dockerfile @@ -39,6 +39,7 @@ build_logs = app.config['BUILDLOGS'] TIMEOUT_PERIOD_MINUTES = 20 CACHE_EXPIRATION_PERIOD_HOURS = 24 NO_TAGS = [':'] +RESERVATION_TIME = (TIMEOUT_PERIOD_MINUTES + 5) * 60 class StatusWrapper(object): @@ -122,7 +123,7 @@ class DockerfileBuildContext(object): 'Dockerfile') if not os.path.exists(dockerfile_path): raise RuntimeError('Build job did not contain a Dockerfile.') - + # Compute the number of steps with open(dockerfile_path, 'r') as dockerfileobj: self._parsed_dockerfile = parse_dockerfile(dockerfileobj.read()) @@ -137,9 +138,14 @@ class DockerfileBuildContext(object): self._tag_names) def __enter__(self): - self.__cleanup_containers() - self.__cleanup_images() - self.__prune_cache() + try: + self.__cleanup_containers() + self.__cleanup_images() + self.__prune_cache() + except APIError: + message = 'Docker installation is no longer healthy.' + logger.exception(message) + raise WorkerUnhealthyException(message) return self def __exit__(self, exc_type, value, traceback): @@ -209,7 +215,7 @@ class DockerfileBuildContext(object): self.__monitor_completion(pull_status, 'Downloading', self._status, 'pull_completion') - def build(self): + def build(self, reservation_extension_method): # Start the build itself. logger.debug('Starting build.') @@ -249,6 +255,9 @@ class DockerfileBuildContext(object): logger.debug('Step now: %s/%s', current_step, self._num_steps) with self._status as status_update: status_update['current_command'] = current_step + + # Tell the queue that we're making progress every time we advance a step + reservation_extension_method(RESERVATION_TIME) continue else: self._build_logger(status_str) @@ -482,8 +491,9 @@ class DockerfileBuildWorker(Worker): log_appender('error', build_logs.PHASE) repository_build.phase = 'error' repository_build.save() - log_appender('Unknown mime-type: %s' % c_type, build_logs.ERROR) - return True + message = 'Unknown mime-type: %s' % c_type + log_appender(message, build_logs.ERROR) + raise JobException(message) build_dir = self._mime_processors[c_type](docker_resource) @@ -496,21 +506,26 @@ class DockerfileBuildWorker(Worker): repository_build.save() build_ctxt.pull() + self.extend_processing(RESERVATION_TIME) + log_appender('building', build_logs.PHASE) repository_build.phase = 'building' repository_build.save() - built_image = build_ctxt.build() + built_image = build_ctxt.build(self.extend_processing) if not built_image: log_appender('error', build_logs.PHASE) repository_build.phase = 'error' repository_build.save() + + message = 'Unable to build dockerfile.' if self._timeout.is_set(): - log_appender('Build step was terminated after %s minutes.' % TIMEOUT_PERIOD_MINUTES, - build_logs.ERROR) - else: - log_appender('Unable to build dockerfile.', build_logs.ERROR) - return True + message = 'Build step was terminated after %s minutes.' % TIMEOUT_PERIOD_MINUTES + + log_appender(message, build_logs.ERROR) + raise JobException(message) + + self.extend_processing(RESERVATION_TIME) log_appender('pushing', build_logs.PHASE) repository_build.phase = 'pushing' @@ -522,15 +537,17 @@ class DockerfileBuildWorker(Worker): repository_build.phase = 'complete' repository_build.save() + except WorkerUnhealthyException as exc: + # Need a separate handler for this so it doesn't get caught by catch all below + raise exc + except Exception as exc: log_appender('error', build_logs.PHASE) logger.exception('Exception when processing request.') repository_build.phase = 'error' repository_build.save() log_appender(str(exc), build_logs.ERROR) - return True - - return True + raise JobException(str(exc)) desc = 'Worker daemon to monitor dockerfile build' @@ -544,8 +561,8 @@ parser.add_argument('--cachegb', default=20, type=float, args = parser.parse_args() -ONE_HOUR = 60*60 -worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue, reservation_seconds=ONE_HOUR) +worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue, + reservation_seconds=RESERVATION_TIME) if args.D: handler = logging.FileHandler(args.log) diff --git a/workers/worker.py b/workers/worker.py index 57375f63d..2186087ba 100644 --- a/workers/worker.py +++ b/workers/worker.py @@ -9,6 +9,19 @@ from datetime import datetime, timedelta logger = logging.getLogger(__name__) +class JobException(Exception): + """ A job exception is an exception that is caused by something being malformed in the job. When + a worker raises this exception the job will be terminated and the retry will not be returned + to the queue. """ + pass + + +class WorkerUnhealthyException(Exception): + """ When this exception is raised, the worker is no longer healthy and will not accept any more + work. When this is raised while processing a queue item, the item should be returned to the + queue along with another retry. """ + pass + class Worker(object): def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300, @@ -19,6 +32,7 @@ class Worker(object): self._watchdog_period_seconds = watchdog_period_seconds self._stop = Event() self._queue = queue + self.current_queue_item = None def process_queue_item(self, job_details): """ Return True if complete, False if it should be retried. """ @@ -28,24 +42,37 @@ class Worker(object): """ Function that gets run once every watchdog_period_seconds. """ pass + def extend_processing(self, seconds_from_now): + if self.current_queue_item is not None: + self._queue.extend_processing(self.current_queue_item, seconds_from_now) + def poll_queue(self): logger.debug('Getting work item from queue.') - item = self._queue.get() - while item: - logger.debug('Queue gave us some work: %s' % item.body) + self.current_queue_item = self._queue.get() + while self.current_queue_item: + logger.debug('Queue gave us some work: %s' % self.current_queue_item.body) - job_details = json.loads(item.body) + job_details = json.loads(self.current_queue_item.body) - if self.process_queue_item(job_details): - self._queue.complete(item) - else: - logger.warning('An error occurred processing request: %s' % item.body) - self._queue.incomplete(item) + try: + self.process_queue_item(job_details) + self._queue.complete(self.current_queue_item) + except JobException: + logger.warning('An error occurred processing request: %s', self.current_queue_item.body) + self._queue.incomplete(self.current_queue_item) + except WorkerUnhealthyException: + logger.error('The worker has encountered an error and will not take new jobs.') + self._stop.set() + self._queue.incomplete(self.current_queue_item, restore_retry=True) + finally: + self.current_queue_item = None - item = self._queue.get(processing_time=self._reservation_seconds) + if not self._stop.is_set(): + self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds) - logger.debug('No more work.') + if not self._stop.is_set(): + logger.debug('No more work.') def start(self): logger.debug("Scheduling worker.") @@ -58,7 +85,7 @@ class Worker(object): self._sched.add_interval_job(self.watchdog, seconds=self._watchdog_period_seconds) signal.signal(signal.SIGTERM, self.join) - signal.signal(signal.SIGINT, self.join) + signal.signal(signal.SIGINT, self.join) while not self._stop.wait(1): pass @@ -70,3 +97,8 @@ class Worker(object): def join(self, signal_num=None, stack_frame=None): logger.debug('Shutting down worker gracefully.') self._stop.set() + + # Give back the retry that we took for this queue item so that if it were down to zero + # retries it will still be picked up by another worker + if self.current_queue_item is not None: + self._queue.incomplete(self.current_queue_item, restore_retry=True)