- Extra the build component statuses into an enum
- Add a ping method so the workers can verify the state of the controller - Fix a bug with current_step and 0 values - Rename the build status var to phase, to make it more distinct from the controller status
This commit is contained in:
parent
4322b5f81c
commit
cfc6b196a4
1 changed files with 37 additions and 26 deletions
|
@ -19,6 +19,13 @@ HEARTBEAT_DELTA = datetime.timedelta(seconds=15)
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class COMPONENT_STATUS(object):
|
||||||
|
JOINING = 'joining'
|
||||||
|
WAITING = 'waiting'
|
||||||
|
RUNNING = 'running'
|
||||||
|
BUILDING = 'building'
|
||||||
|
TIMED_OUT = 'timeout'
|
||||||
|
|
||||||
class BuildComponent(BaseComponent):
|
class BuildComponent(BaseComponent):
|
||||||
""" An application session component which conducts one (or more) builds. """
|
""" An application session component which conducts one (or more) builds. """
|
||||||
|
|
||||||
|
@ -26,8 +33,8 @@ class BuildComponent(BaseComponent):
|
||||||
expected_token = None
|
expected_token = None
|
||||||
builder_realm = None
|
builder_realm = None
|
||||||
|
|
||||||
|
_component_status = COMPONENT_STATUS.JOINING
|
||||||
_last_heartbeat = None
|
_last_heartbeat = None
|
||||||
_component_status = 'joining'
|
|
||||||
_current_job = None
|
_current_job = None
|
||||||
_build_status = None
|
_build_status = None
|
||||||
_image_info = None
|
_image_info = None
|
||||||
|
@ -44,20 +51,21 @@ class BuildComponent(BaseComponent):
|
||||||
def onJoin(self, details):
|
def onJoin(self, details):
|
||||||
logger.debug('Registering methods and listeners for component %s' % self.builder_realm)
|
logger.debug('Registering methods and listeners for component %s' % self.builder_realm)
|
||||||
yield From(self.register(self._on_ready, u'io.quay.buildworker.ready'))
|
yield From(self.register(self._on_ready, u'io.quay.buildworker.ready'))
|
||||||
|
yield From(self.register(self._ping, u'io.quay.buildworker.ping'))
|
||||||
yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat'))
|
yield From(self.subscribe(self._on_heartbeat, 'io.quay.builder.heartbeat'))
|
||||||
yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage'))
|
yield From(self.subscribe(self._on_log_message, 'io.quay.builder.logmessage'))
|
||||||
|
|
||||||
self._set_status('waiting')
|
self._set_status(COMPONENT_STATUS.WAITING)
|
||||||
|
|
||||||
def is_ready(self):
|
def is_ready(self):
|
||||||
return self._component_status == 'running'
|
return self._component_status == COMPONENT_STATUS.RUNNING
|
||||||
|
|
||||||
def start_build(self, build_job):
|
def start_build(self, build_job):
|
||||||
self._current_job = build_job
|
self._current_job = build_job
|
||||||
self._build_status = StatusHandler(self.build_logs, build_job.repo_build())
|
self._build_status = StatusHandler(self.build_logs, build_job.repo_build())
|
||||||
self._image_info = {}
|
self._image_info = {}
|
||||||
|
|
||||||
self._set_status('building')
|
self._set_status(COMPONENT_STATUS.BUILDING)
|
||||||
|
|
||||||
# Retrieve the job's buildpack.
|
# Retrieve the job's buildpack.
|
||||||
buildpack_url = self.user_files.get_file_url(build_job.repo_build().resource_key,
|
buildpack_url = self.user_files.get_file_url(build_job.repo_build().resource_key,
|
||||||
|
@ -84,7 +92,7 @@ class BuildComponent(BaseComponent):
|
||||||
|
|
||||||
image_and_tag_tuple = parsed_dockerfile.get_image_and_tag()
|
image_and_tag_tuple = parsed_dockerfile.get_image_and_tag()
|
||||||
if image_and_tag_tuple is None or image_and_tag_tuple[0] is None:
|
if image_and_tag_tuple is None or image_and_tag_tuple[0] is None:
|
||||||
self._build_failure('Missing FROM line Dockerfile')
|
self._build_failure('Missing FROM line in Dockerfile')
|
||||||
return
|
return
|
||||||
|
|
||||||
base_image_information = {
|
base_image_information = {
|
||||||
|
@ -139,10 +147,10 @@ class BuildComponent(BaseComponent):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def __total_completion(statuses, total_images):
|
def __total_completion(statuses, total_images):
|
||||||
percentage_with_sizes = float(len(statuses.values()))/total_images
|
percentage_with_sizes = float(len(statuses.values())) / total_images
|
||||||
sent_bytes = sum([status['current'] for status in statuses.values()])
|
sent_bytes = sum([status['current'] for status in statuses.values()])
|
||||||
total_bytes = sum([status['total'] for status in statuses.values()])
|
total_bytes = sum([status['total'] for status in statuses.values()])
|
||||||
return float(sent_bytes)/total_bytes*percentage_with_sizes
|
return float(sent_bytes) / total_bytes * percentage_with_sizes
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def __process_pushpull_status(status_dict, current_phase, docker_data, images):
|
def __process_pushpull_status(status_dict, current_phase, docker_data, images):
|
||||||
|
@ -171,7 +179,7 @@ class BuildComponent(BaseComponent):
|
||||||
status_dict[status_completion_key] = \
|
status_dict[status_completion_key] = \
|
||||||
BuildComponent.__total_completion(images, max(len(images), num_images))
|
BuildComponent.__total_completion(images, max(len(images), num_images))
|
||||||
|
|
||||||
def _on_log_message(self, status, json_data):
|
def _on_log_message(self, phase, json_data):
|
||||||
# Parse any of the JSON data logged.
|
# Parse any of the JSON data logged.
|
||||||
docker_data = {}
|
docker_data = {}
|
||||||
if json_data:
|
if json_data:
|
||||||
|
@ -180,7 +188,7 @@ class BuildComponent(BaseComponent):
|
||||||
except ValueError:
|
except ValueError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# Extra the current status message (if any).
|
# Extract the current status message (if any).
|
||||||
fully_unwrapped = ''
|
fully_unwrapped = ''
|
||||||
keys_to_extract = ['error', 'status', 'stream']
|
keys_to_extract = ['error', 'status', 'stream']
|
||||||
for key in keys_to_extract:
|
for key in keys_to_extract:
|
||||||
|
@ -192,31 +200,30 @@ class BuildComponent(BaseComponent):
|
||||||
current_step = None
|
current_step = None
|
||||||
current_status_string = str(fully_unwrapped.encode('utf-8'))
|
current_status_string = str(fully_unwrapped.encode('utf-8'))
|
||||||
|
|
||||||
if current_status_string and status == 'building':
|
if current_status_string and phase == BUILD_PHASE.BUILDING:
|
||||||
step_increment = re.search(r'Step ([0-9]+) :', current_status_string)
|
step_increment = re.search(r'Step ([0-9]+) :', current_status_string)
|
||||||
if step_increment:
|
if step_increment:
|
||||||
current_step = int(step_increment.group(1))
|
current_step = int(step_increment.group(1))
|
||||||
|
|
||||||
# Parse and update the phase and the status_dict. The status dictionary contains
|
# Parse and update the phase and the status_dict. The status dictionary contains
|
||||||
# the pull/push progress.
|
# the pull/push progress, as well as the current step index.
|
||||||
with self._build_status as status_dict:
|
with self._build_status as status_dict:
|
||||||
self._build_status.set_phase(status)
|
self._build_status.set_phase(phase)
|
||||||
BuildComponent.__process_pushpull_status(status_dict, status, docker_data, self._image_info)
|
BuildComponent.__process_pushpull_status(status_dict, phase, docker_data, self._image_info)
|
||||||
|
|
||||||
# If the current message is for a step, then update that index.
|
# If the current message represents the beginning of a new step, then update the
|
||||||
if current_step:
|
# current command index.
|
||||||
|
if current_step is not None:
|
||||||
status_dict['current_command'] = current_step
|
status_dict['current_command'] = current_step
|
||||||
|
|
||||||
# If the json data contains an error, then something went wrong with a push or pull.
|
# If the json data contains an error, then something went wrong with a push or pull.
|
||||||
if 'error' in docker_data:
|
if 'error' in docker_data:
|
||||||
self._build_status.set_error(docker_data['error'])
|
self._build_status.set_error(docker_data['error'])
|
||||||
|
|
||||||
# If we are in the building phase, then write out the log.
|
if current_step is not None:
|
||||||
if status == 'building':
|
self._build_status.set_command(current_status_string)
|
||||||
if current_step:
|
elif phase == BUILD_PHASE.BUILDING:
|
||||||
self._build_status.set_command(current_status_string)
|
self._build_status.append_log(current_status_string)
|
||||||
else:
|
|
||||||
self._build_status.append_log(current_status_string)
|
|
||||||
|
|
||||||
|
|
||||||
def _build_failure(self, error_message, exception=None):
|
def _build_failure(self, error_message, exception=None):
|
||||||
|
@ -234,8 +241,8 @@ class BuildComponent(BaseComponent):
|
||||||
try:
|
try:
|
||||||
# Retrieve the result. This will raise an ApplicationError on any error that occurred.
|
# Retrieve the result. This will raise an ApplicationError on any error that occurred.
|
||||||
result.result()
|
result.result()
|
||||||
self._build_finished(BUILD_JOB_RESULT.COMPLETE)
|
|
||||||
self._build_status.set_phase(BUILD_PHASE.COMPLETE)
|
self._build_status.set_phase(BUILD_PHASE.COMPLETE)
|
||||||
|
self._build_finished(BUILD_JOB_RESULT.COMPLETE)
|
||||||
except ApplicationError as ae:
|
except ApplicationError as ae:
|
||||||
worker_error = WorkerError(ae.error, ae.kwargs.get('base_error'))
|
worker_error = WorkerError(ae.error, ae.kwargs.get('base_error'))
|
||||||
|
|
||||||
|
@ -253,7 +260,10 @@ class BuildComponent(BaseComponent):
|
||||||
self._current_job = None
|
self._current_job = None
|
||||||
|
|
||||||
# Set the component back to a running state.
|
# Set the component back to a running state.
|
||||||
self._set_status('running')
|
self._set_status(COMPONENT_STATUS.RUNNING)
|
||||||
|
|
||||||
|
def _ping(self):
|
||||||
|
return 'pong'
|
||||||
|
|
||||||
def _on_ready(self, token):
|
def _on_ready(self, token):
|
||||||
if self._component_status != 'waiting':
|
if self._component_status != 'waiting':
|
||||||
|
@ -264,7 +274,7 @@ class BuildComponent(BaseComponent):
|
||||||
logger.warning('Builder token mismatch. Expected: %s. Found: %s', self.expected_token, token)
|
logger.warning('Builder token mismatch. Expected: %s. Found: %s', self.expected_token, token)
|
||||||
return
|
return
|
||||||
|
|
||||||
self._set_status('running')
|
self._set_status(COMPONENT_STATUS.RUNNING)
|
||||||
|
|
||||||
# Start the heartbeat check.
|
# Start the heartbeat check.
|
||||||
loop = trollius.get_event_loop()
|
loop = trollius.get_event_loop()
|
||||||
|
@ -285,7 +295,8 @@ class BuildComponent(BaseComponent):
|
||||||
@trollius.coroutine
|
@trollius.coroutine
|
||||||
def _check_heartbeat(self, loop):
|
def _check_heartbeat(self, loop):
|
||||||
while True:
|
while True:
|
||||||
if self._component_status != 'running' and self._component_status != 'building':
|
if (self._component_status != COMPONENT_STATUS.RUNNING and
|
||||||
|
self._component_status != COMPONENT_STATUS.BUILDING):
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.debug('Checking heartbeat on realm %s', self.builder_realm)
|
logger.debug('Checking heartbeat on realm %s', self.builder_realm)
|
||||||
|
@ -300,7 +311,7 @@ class BuildComponent(BaseComponent):
|
||||||
yield From(trollius.sleep(5))
|
yield From(trollius.sleep(5))
|
||||||
|
|
||||||
def _timeout(self):
|
def _timeout(self):
|
||||||
self._set_status('timeout')
|
self._set_status(COMPONENT_STATUS.TIMED_OUT)
|
||||||
logger.warning('Build component %s timed out', self.expected_token)
|
logger.warning('Build component %s timed out', self.expected_token)
|
||||||
self._dispose(timed_out=True)
|
self._dispose(timed_out=True)
|
||||||
|
|
||||||
|
|
Reference in a new issue