diff --git a/buildman/buildcomponent.py b/buildman/buildcomponent.py index a4ce3c821..83ff998bd 100644 --- a/buildman/buildcomponent.py +++ b/buildman/buildcomponent.py @@ -19,6 +19,13 @@ HEARTBEAT_DELTA = datetime.timedelta(seconds=15) logger = logging.getLogger(__name__) +class COMPONENT_STATUS(object): + JOINING = 'joining' + WAITING = 'waiting' + RUNNING = 'running' + BUILDING = 'building' + TIMED_OUT = 'timeout' + class BuildComponent(BaseComponent): """ An application session component which conducts one (or more) builds. """ @@ -26,8 +33,8 @@ class BuildComponent(BaseComponent): expected_token = None builder_realm = None + _component_status = COMPONENT_STATUS.JOINING _last_heartbeat = None - _component_status = 'joining' _current_job = None _build_status = None _image_info = None @@ -44,20 +51,21 @@ class BuildComponent(BaseComponent): def onJoin(self, details): logger.debug('Registering methods and listeners for component %s' % self.builder_realm) yield From(self.register(self._on_ready, u'io.quay.buildworker.ready')) + yield From(self.register(self._ping, u'io.quay.buildworker.ping')) yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat')) yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage')) - self._set_status('waiting') + self._set_status(COMPONENT_STATUS.WAITING) def is_ready(self): - return self._component_status == 'running' + return self._component_status == COMPONENT_STATUS.RUNNING def start_build(self, build_job): self._current_job = build_job self._build_status = StatusHandler(self.build_logs, build_job.repo_build()) self._image_info = {} - self._set_status('building') + self._set_status(COMPONENT_STATUS.BUILDING) # Retrieve the job's buildpack. buildpack_url = self.user_files.get_file_url(build_job.repo_build().resource_key, @@ -84,7 +92,7 @@ class BuildComponent(BaseComponent): image_and_tag_tuple = parsed_dockerfile.get_image_and_tag() if image_and_tag_tuple is None or image_and_tag_tuple[0] is None: - self._build_failure('Missing FROM line Dockerfile') + self._build_failure('Missing FROM line in Dockerfile') return base_image_information = { @@ -139,10 +147,10 @@ class BuildComponent(BaseComponent): @staticmethod def __total_completion(statuses, total_images): - percentage_with_sizes = float(len(statuses.values()))/total_images + percentage_with_sizes = float(len(statuses.values())) / total_images sent_bytes = sum([status['current'] for status in statuses.values()]) total_bytes = sum([status['total'] for status in statuses.values()]) - return float(sent_bytes)/total_bytes*percentage_with_sizes + return float(sent_bytes) / total_bytes * percentage_with_sizes @staticmethod def __process_pushpull_status(status_dict, current_phase, docker_data, images): @@ -171,7 +179,7 @@ class BuildComponent(BaseComponent): status_dict[status_completion_key] = \ BuildComponent.__total_completion(images, max(len(images), num_images)) - def _on_log_message(self, status, json_data): + def _on_log_message(self, phase, json_data): # Parse any of the JSON data logged. docker_data = {} if json_data: @@ -180,7 +188,7 @@ class BuildComponent(BaseComponent): except ValueError: pass - # Extra the current status message (if any). + # Extract the current status message (if any). fully_unwrapped = '' keys_to_extract = ['error', 'status', 'stream'] for key in keys_to_extract: @@ -192,31 +200,30 @@ class BuildComponent(BaseComponent): current_step = None current_status_string = str(fully_unwrapped.encode('utf-8')) - if current_status_string and status == 'building': + if current_status_string and phase == BUILD_PHASE.BUILDING: step_increment = re.search(r'Step ([0-9]+) :', current_status_string) if step_increment: current_step = int(step_increment.group(1)) # Parse and update the phase and the status_dict. The status dictionary contains - # the pull/push progress. + # the pull/push progress, as well as the current step index. with self._build_status as status_dict: - self._build_status.set_phase(status) - BuildComponent.__process_pushpull_status(status_dict, status, docker_data, self._image_info) + self._build_status.set_phase(phase) + BuildComponent.__process_pushpull_status(status_dict, phase, docker_data, self._image_info) - # If the current message is for a step, then update that index. - if current_step: + # If the current message represents the beginning of a new step, then update the + # current command index. + if current_step is not None: status_dict['current_command'] = current_step # If the json data contains an error, then something went wrong with a push or pull. if 'error' in docker_data: self._build_status.set_error(docker_data['error']) - # If we are in the building phase, then write out the log. - if status == 'building': - if current_step: - self._build_status.set_command(current_status_string) - else: - self._build_status.append_log(current_status_string) + if current_step is not None: + self._build_status.set_command(current_status_string) + elif phase == BUILD_PHASE.BUILDING: + self._build_status.append_log(current_status_string) def _build_failure(self, error_message, exception=None): @@ -234,8 +241,8 @@ class BuildComponent(BaseComponent): try: # Retrieve the result. This will raise an ApplicationError on any error that occurred. result.result() - self._build_finished(BUILD_JOB_RESULT.COMPLETE) self._build_status.set_phase(BUILD_PHASE.COMPLETE) + self._build_finished(BUILD_JOB_RESULT.COMPLETE) except ApplicationError as ae: worker_error = WorkerError(ae.error, ae.kwargs.get('base_error')) @@ -253,7 +260,10 @@ class BuildComponent(BaseComponent): self._current_job = None # Set the component back to a running state. - self._set_status('running') + self._set_status(COMPONENT_STATUS.RUNNING) + + def _ping(self): + return 'pong' def _on_ready(self, token): if self._component_status != 'waiting': @@ -264,7 +274,7 @@ class BuildComponent(BaseComponent): logger.warning('Builder token mismatch. Expected: %s. Found: %s', self.expected_token, token) return - self._set_status('running') + self._set_status(COMPONENT_STATUS.RUNNING) # Start the heartbeat check. loop = trollius.get_event_loop() @@ -285,7 +295,8 @@ class BuildComponent(BaseComponent): @trollius.coroutine def _check_heartbeat(self, loop): while True: - if self._component_status != 'running' and self._component_status != 'building': + if (self._component_status != COMPONENT_STATUS.RUNNING and + self._component_status != COMPONENT_STATUS.BUILDING): return logger.debug('Checking heartbeat on realm %s', self.builder_realm) @@ -300,7 +311,7 @@ class BuildComponent(BaseComponent): yield From(trollius.sleep(5)) def _timeout(self): - self._set_status('timeout') + self._set_status(COMPONENT_STATUS.TIMED_OUT) logger.warning('Build component %s timed out', self.expected_token) self._dispose(timed_out=True)