import math import logging from random import SystemRandom from loremipsum import get_sentence from data.buildlogs import BuildLogs from random import choice logger = logging.getLogger(__name__) class TestBuildLogs(BuildLogs): def __init__(self, redis_host, namespace, repository, test_build_id): super(TestBuildLogs, self).__init__(redis_host) self.namespace = namespace self.repository = repository self.test_build_id = test_build_id self.last_command = 0 self.logs = [self._generate_command()] self.commands = [{ 'index': 0, 'message': self.logs[0]['message'], }] self.request_counter = 0 self._generate_logs() self._status = {} def _get_random_command(self): COMMANDS = ['FROM', 'MAINTAINER', 'RUN', 'CMD', 'EXPOSE', 'ENV', 'ADD', 'ENTRYPOINT', 'VOLUME', 'USER', 'WORKDIR'] return choice(COMMANDS) def _generate_command(self): self.last_command += 1 sentence = get_sentence() command = self._get_random_command() if command == 'FROM': sentence = choice(['ubuntu', 'quay.io/devtable/simple', 'quay.io/buynlarge/orgrepo', 'stackbrew/ubuntu:precise']) return { 'message': 'Step %s: %s %s' % (self.last_command, command, sentence), 'is_command': True, } def _generate_logs(self): rand = SystemRandom() num_logs = rand.randint(1, 500) for _ in range(num_logs): if rand.randint(1, 50) == 1: cmd = self._generate_command() self.commands.append({ 'message': cmd['message'], 'index': len(self.logs), }) self.logs.append(cmd) else: self.logs.append({ 'message': get_sentence(), }) @staticmethod def _generate_image_completion(rand_func): images = {} for image_id in range(rand_func.randint(1, 11)): total = int(math.pow(abs(rand_func.gauss(0, 1000)), 2)) current = rand_func.randint(0, total) image_id = 'image_id_%s' % image_id images[image_id] = { 'total': total, 'current': current, } return images def _generate_fake_status(self): random = SystemRandom() phases = { 'waiting': {}, 'starting': { 'total_commands': 7, 'current_command': 0, }, 'initializing': {}, 'error': {}, 'complete': {}, 'building': { 'total_commands': 7, 'current_command': random.randint(1, 7), }, 'pushing': { 'total_commands': 7, 'current_command': 7, 'push_completion': random.random(), 'image_completion': self._generate_image_completion(random), }, } phase = random.choice(phases.keys()) from data import model build_obj = model.get_repository_build(self.namespace, self.repository, self.test_build_id) build_obj.phase = phase build_obj.save() return phases[phase] def get_log_entries(self, build_id, start_index, end_index): if build_id == self.test_build_id: self.request_counter += 1 if self.request_counter % 10 == 0: self._generate_logs() logger.debug('Returning logs %s:%s', start_index, end_index) if end_index >= 0: end_index += 1 return (len(self.logs), self.logs[start_index:end_index]) else: return super(TestBuildLogs, self).get_log_entries(build_id, start_index, end_index) def get_commands(self, build_id): if build_id == self.test_build_id: self.request_counter += 1 if self.request_counter % 10 == 0: self._generate_logs() return self.commands else: return super(TestBuildLogs, self).get_commands(build_id) def get_last_command(self, build_id): if build_id == self.test_build_id: self.request_counter += 1 if self.request_counter % 10 == 0: self._generate_logs() return self.commands[-1] else: return super(TestBuildLogs, self).get_last_command(build_id) def get_status(self, build_id): if build_id == self.test_build_id: self.request_counter += 1 if self.request_counter % 10 == 0: self._generate_logs() last_status = self._status self._status = self._generate_fake_status() return last_status else: return super(TestBuildLogs, self).get_status(build_id)