Improve the builder response to being terminated or dying.

This commit is contained in:
Jake Moshenko 2014-05-06 18:46:19 -04:00
parent bd63ea98f3
commit 8a3af93b8c
3 changed files with 93 additions and 34 deletions

View file

@ -7,6 +7,9 @@ from app import app
transaction_factory = app.config['DB_TRANSACTION_FACTORY'] transaction_factory = app.config['DB_TRANSACTION_FACTORY']
MINIMUM_EXTENSION = timedelta(seconds=20)
class WorkQueue(object): class WorkQueue(object):
def __init__(self, queue_name, canonical_name_match_list=None): def __init__(self, queue_name, canonical_name_match_list=None):
self.queue_name = queue_name self.queue_name = queue_name
@ -80,17 +83,24 @@ class WorkQueue(object):
completed_item.delete_instance() completed_item.delete_instance()
@staticmethod @staticmethod
def incomplete(incomplete_item, retry_after=300): def incomplete(incomplete_item, retry_after=300, restore_retry=False):
retry_date = datetime.now() + timedelta(seconds=retry_after) retry_date = datetime.now() + timedelta(seconds=retry_after)
incomplete_item.available_after = retry_date incomplete_item.available_after = retry_date
incomplete_item.available = True incomplete_item.available = True
if restore_retry:
incomplete_item.retries_remaining += 1
incomplete_item.save() incomplete_item.save()
@staticmethod @staticmethod
def extend_processing(queue_item, seconds_from_now): def extend_processing(queue_item, seconds_from_now):
new_expiration = datetime.now() + timedelta(seconds=seconds_from_now) new_expiration = datetime.now() + timedelta(seconds=seconds_from_now)
queue_item.processing_expires = new_expiration
queue_item.save() # Only actually write the new expiration to the db if it moves the expiration some minimum
if new_expiration - queue_item.processing_expires > MINIMUM_EXTENSION:
queue_item.processing_expires = new_expiration
queue_item.save()
image_diff_queue = WorkQueue(app.config['DIFFS_QUEUE_NAME']) image_diff_queue = WorkQueue(app.config['DIFFS_QUEUE_NAME'])

View file

@ -20,7 +20,7 @@ from collections import defaultdict
from data.queue import dockerfile_build_queue from data.queue import dockerfile_build_queue
from data import model from data import model
from workers.worker import Worker from workers.worker import Worker, WorkerUnhealthyException, JobException
from app import app, userfiles as user_files from app import app, userfiles as user_files
from util.safetar import safe_extractall from util.safetar import safe_extractall
from util.dockerfileparse import parse_dockerfile, ParsedDockerfile, serialize_dockerfile from util.dockerfileparse import parse_dockerfile, ParsedDockerfile, serialize_dockerfile
@ -39,6 +39,7 @@ build_logs = app.config['BUILDLOGS']
TIMEOUT_PERIOD_MINUTES = 20 TIMEOUT_PERIOD_MINUTES = 20
CACHE_EXPIRATION_PERIOD_HOURS = 24 CACHE_EXPIRATION_PERIOD_HOURS = 24
NO_TAGS = ['<none>:<none>'] NO_TAGS = ['<none>:<none>']
RESERVATION_TIME = (TIMEOUT_PERIOD_MINUTES + 5) * 60
class StatusWrapper(object): class StatusWrapper(object):
@ -122,7 +123,7 @@ class DockerfileBuildContext(object):
'Dockerfile') 'Dockerfile')
if not os.path.exists(dockerfile_path): if not os.path.exists(dockerfile_path):
raise RuntimeError('Build job did not contain a Dockerfile.') raise RuntimeError('Build job did not contain a Dockerfile.')
# Compute the number of steps # Compute the number of steps
with open(dockerfile_path, 'r') as dockerfileobj: with open(dockerfile_path, 'r') as dockerfileobj:
self._parsed_dockerfile = parse_dockerfile(dockerfileobj.read()) self._parsed_dockerfile = parse_dockerfile(dockerfileobj.read())
@ -137,9 +138,14 @@ class DockerfileBuildContext(object):
self._tag_names) self._tag_names)
def __enter__(self): def __enter__(self):
self.__cleanup_containers() try:
self.__cleanup_images() self.__cleanup_containers()
self.__prune_cache() self.__cleanup_images()
self.__prune_cache()
except APIError:
message = 'Docker installation is no longer healthy.'
logger.exception(message)
raise WorkerUnhealthyException(message)
return self return self
def __exit__(self, exc_type, value, traceback): def __exit__(self, exc_type, value, traceback):
@ -209,7 +215,7 @@ class DockerfileBuildContext(object):
self.__monitor_completion(pull_status, 'Downloading', self._status, 'pull_completion') self.__monitor_completion(pull_status, 'Downloading', self._status, 'pull_completion')
def build(self): def build(self, reservation_extension_method):
# Start the build itself. # Start the build itself.
logger.debug('Starting build.') logger.debug('Starting build.')
@ -249,6 +255,9 @@ class DockerfileBuildContext(object):
logger.debug('Step now: %s/%s', current_step, self._num_steps) logger.debug('Step now: %s/%s', current_step, self._num_steps)
with self._status as status_update: with self._status as status_update:
status_update['current_command'] = current_step status_update['current_command'] = current_step
# Tell the queue that we're making progress every time we advance a step
reservation_extension_method(RESERVATION_TIME)
continue continue
else: else:
self._build_logger(status_str) self._build_logger(status_str)
@ -482,8 +491,9 @@ class DockerfileBuildWorker(Worker):
log_appender('error', build_logs.PHASE) log_appender('error', build_logs.PHASE)
repository_build.phase = 'error' repository_build.phase = 'error'
repository_build.save() repository_build.save()
log_appender('Unknown mime-type: %s' % c_type, build_logs.ERROR) message = 'Unknown mime-type: %s' % c_type
return True log_appender(message, build_logs.ERROR)
raise JobException(message)
build_dir = self._mime_processors[c_type](docker_resource) build_dir = self._mime_processors[c_type](docker_resource)
@ -496,21 +506,26 @@ class DockerfileBuildWorker(Worker):
repository_build.save() repository_build.save()
build_ctxt.pull() build_ctxt.pull()
self.extend_processing(RESERVATION_TIME)
log_appender('building', build_logs.PHASE) log_appender('building', build_logs.PHASE)
repository_build.phase = 'building' repository_build.phase = 'building'
repository_build.save() repository_build.save()
built_image = build_ctxt.build() built_image = build_ctxt.build(self.extend_processing)
if not built_image: if not built_image:
log_appender('error', build_logs.PHASE) log_appender('error', build_logs.PHASE)
repository_build.phase = 'error' repository_build.phase = 'error'
repository_build.save() repository_build.save()
message = 'Unable to build dockerfile.'
if self._timeout.is_set(): if self._timeout.is_set():
log_appender('Build step was terminated after %s minutes.' % TIMEOUT_PERIOD_MINUTES, message = 'Build step was terminated after %s minutes.' % TIMEOUT_PERIOD_MINUTES
build_logs.ERROR)
else: log_appender(message, build_logs.ERROR)
log_appender('Unable to build dockerfile.', build_logs.ERROR) raise JobException(message)
return True
self.extend_processing(RESERVATION_TIME)
log_appender('pushing', build_logs.PHASE) log_appender('pushing', build_logs.PHASE)
repository_build.phase = 'pushing' repository_build.phase = 'pushing'
@ -522,15 +537,17 @@ class DockerfileBuildWorker(Worker):
repository_build.phase = 'complete' repository_build.phase = 'complete'
repository_build.save() repository_build.save()
except WorkerUnhealthyException as exc:
# Need a separate handler for this so it doesn't get caught by catch all below
raise exc
except Exception as exc: except Exception as exc:
log_appender('error', build_logs.PHASE) log_appender('error', build_logs.PHASE)
logger.exception('Exception when processing request.') logger.exception('Exception when processing request.')
repository_build.phase = 'error' repository_build.phase = 'error'
repository_build.save() repository_build.save()
log_appender(str(exc), build_logs.ERROR) log_appender(str(exc), build_logs.ERROR)
return True raise JobException(str(exc))
return True
desc = 'Worker daemon to monitor dockerfile build' desc = 'Worker daemon to monitor dockerfile build'
@ -544,8 +561,8 @@ parser.add_argument('--cachegb', default=20, type=float,
args = parser.parse_args() args = parser.parse_args()
ONE_HOUR = 60*60 worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue,
worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue, reservation_seconds=ONE_HOUR) reservation_seconds=RESERVATION_TIME)
if args.D: if args.D:
handler = logging.FileHandler(args.log) handler = logging.FileHandler(args.log)

View file

@ -9,6 +9,19 @@ from datetime import datetime, timedelta
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class JobException(Exception):
""" A job exception is an exception that is caused by something being malformed in the job. When
a worker raises this exception the job will be terminated and the retry will not be returned
to the queue. """
pass
class WorkerUnhealthyException(Exception):
""" When this exception is raised, the worker is no longer healthy and will not accept any more
work. When this is raised while processing a queue item, the item should be returned to the
queue along with another retry. """
pass
class Worker(object): class Worker(object):
def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300, def __init__(self, queue, poll_period_seconds=30, reservation_seconds=300,
@ -19,6 +32,7 @@ class Worker(object):
self._watchdog_period_seconds = watchdog_period_seconds self._watchdog_period_seconds = watchdog_period_seconds
self._stop = Event() self._stop = Event()
self._queue = queue self._queue = queue
self.current_queue_item = None
def process_queue_item(self, job_details): def process_queue_item(self, job_details):
""" Return True if complete, False if it should be retried. """ """ Return True if complete, False if it should be retried. """
@ -28,24 +42,37 @@ class Worker(object):
""" Function that gets run once every watchdog_period_seconds. """ """ Function that gets run once every watchdog_period_seconds. """
pass pass
def extend_processing(self, seconds_from_now):
if self.current_queue_item is not None:
self._queue.extend_processing(self.current_queue_item, seconds_from_now)
def poll_queue(self): def poll_queue(self):
logger.debug('Getting work item from queue.') logger.debug('Getting work item from queue.')
item = self._queue.get() self.current_queue_item = self._queue.get()
while item: while self.current_queue_item:
logger.debug('Queue gave us some work: %s' % item.body) logger.debug('Queue gave us some work: %s' % self.current_queue_item.body)
job_details = json.loads(item.body) job_details = json.loads(self.current_queue_item.body)
if self.process_queue_item(job_details): try:
self._queue.complete(item) self.process_queue_item(job_details)
else: self._queue.complete(self.current_queue_item)
logger.warning('An error occurred processing request: %s' % item.body) except JobException:
self._queue.incomplete(item) logger.warning('An error occurred processing request: %s', self.current_queue_item.body)
self._queue.incomplete(self.current_queue_item)
except WorkerUnhealthyException:
logger.error('The worker has encountered an error and will not take new jobs.')
self._stop.set()
self._queue.incomplete(self.current_queue_item, restore_retry=True)
finally:
self.current_queue_item = None
item = self._queue.get(processing_time=self._reservation_seconds) if not self._stop.is_set():
self.current_queue_item = self._queue.get(processing_time=self._reservation_seconds)
logger.debug('No more work.') if not self._stop.is_set():
logger.debug('No more work.')
def start(self): def start(self):
logger.debug("Scheduling worker.") logger.debug("Scheduling worker.")
@ -58,7 +85,7 @@ class Worker(object):
self._sched.add_interval_job(self.watchdog, seconds=self._watchdog_period_seconds) self._sched.add_interval_job(self.watchdog, seconds=self._watchdog_period_seconds)
signal.signal(signal.SIGTERM, self.join) signal.signal(signal.SIGTERM, self.join)
signal.signal(signal.SIGINT, self.join) signal.signal(signal.SIGINT, self.join)
while not self._stop.wait(1): while not self._stop.wait(1):
pass pass
@ -70,3 +97,8 @@ class Worker(object):
def join(self, signal_num=None, stack_frame=None): def join(self, signal_num=None, stack_frame=None):
logger.debug('Shutting down worker gracefully.') logger.debug('Shutting down worker gracefully.')
self._stop.set() self._stop.set()
# Give back the retry that we took for this queue item so that if it were down to zero
# retries it will still be picked up by another worker
if self.current_queue_item is not None:
self._queue.incomplete(self.current_queue_item, restore_retry=True)