This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/testlogs.py

152 lines
4.3 KiB
Python
Raw Normal View History

import math
import logging
from random import SystemRandom
from loremipsum import get_sentence
from data.buildlogs import BuildLogs
2014-02-11 03:43:48 +00:00
from random import choice
logger = logging.getLogger(__name__)
class TestBuildLogs(BuildLogs):
def __init__(self, redis_host, namespace, repository, test_build_id):
super(TestBuildLogs, self).__init__(redis_host)
self.namespace = namespace
self.repository = repository
self.test_build_id = test_build_id
self.last_command = 0
self.logs = [self._generate_command()]
self.commands = [{
'index': 0,
'message': self.logs[0]['message'],
}]
self.request_counter = 0
self._generate_logs()
self._status = {}
2014-02-11 03:43:48 +00:00
def _get_random_command(self):
COMMANDS = ['FROM', 'MAINTAINER', 'RUN', 'CMD', 'EXPOSE', 'ENV', 'ADD',
'ENTRYPOINT', 'VOLUME', 'USER', 'WORKDIR']
return choice(COMMANDS)
def _generate_command(self):
self.last_command += 1
2014-02-11 03:43:48 +00:00
sentence = get_sentence()
command = self._get_random_command()
if command == 'FROM':
sentence = choice(['ubuntu', 'quay.io/devtable/simple', 'quay.io/buynlarge/orgrepo', 'stackbrew/ubuntu:precise'])
return {
2014-02-11 03:43:48 +00:00
'message': 'Step %s: %s %s' % (self.last_command, command, sentence),
'is_command': True,
}
def _generate_logs(self):
rand = SystemRandom()
num_logs = rand.randint(1, 500)
for _ in range(num_logs):
if rand.randint(1, 50) == 1:
cmd = self._generate_command()
self.commands.append({
'message': cmd['message'],
'index': len(self.logs),
})
self.logs.append(cmd)
else:
self.logs.append({
'message': get_sentence(),
})
@staticmethod
def _generate_image_completion(rand_func):
images = {}
for image_id in range(rand_func.randint(1, 11)):
total = int(math.pow(abs(rand_func.gauss(0, 1000)), 2))
current = rand_func.randint(0, total)
image_id = 'image_id_%s' % image_id
images[image_id] = {
'total': total,
'current': current,
}
return images
def _generate_fake_status(self):
random = SystemRandom()
phases = {
'waiting': {},
'starting': {
'total_commands': 7,
'current_command': 0,
},
'initializing': {},
'error': {},
'complete': {},
'building': {
'total_commands': 7,
'current_command': random.randint(1, 7),
},
'pushing': {
'total_commands': 7,
'current_command': 7,
'push_completion': random.random(),
'image_completion': self._generate_image_completion(random),
},
}
phase = random.choice(phases.keys())
from data import model
build_obj = model.get_repository_build(self.namespace, self.repository,
self.test_build_id)
build_obj.phase = phase
build_obj.save()
return phases[phase]
def get_log_entries(self, build_id, start_index, end_index):
if build_id == self.test_build_id:
self.request_counter += 1
if self.request_counter % 10 == 0:
self._generate_logs()
logger.debug('Returning logs %s:%s', start_index, end_index)
if end_index >= 0:
end_index += 1
return (len(self.logs), self.logs[start_index:end_index])
else:
return super(TestBuildLogs, self).get_log_entries(build_id, start_index,
end_index)
def get_commands(self, build_id):
if build_id == self.test_build_id:
self.request_counter += 1
if self.request_counter % 10 == 0:
self._generate_logs()
return self.commands
else:
return super(TestBuildLogs, self).get_commands(build_id)
def get_last_command(self, build_id):
if build_id == self.test_build_id:
self.request_counter += 1
if self.request_counter % 10 == 0:
self._generate_logs()
return self.commands[-1]
else:
return super(TestBuildLogs, self).get_last_command(build_id)
def get_status(self, build_id):
if build_id == self.test_build_id:
self.request_counter += 1
if self.request_counter % 10 == 0:
self._generate_logs()
last_status = self._status
self._status = self._generate_fake_status()
return last_status
else:
return super(TestBuildLogs, self).get_status(build_id)