This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/buildman/manager/ephemeral.py

146 lines
4.8 KiB
Python
Raw Normal View History

import logging
import etcd
import uuid
from datetime import datetime, timedelta
from buildman.manager.basemanager import BaseManager
from buildman.manager.executor import PopenExecutor, EC2Executor
from buildman.component.buildcomponent import BuildComponent
logger = logging.getLogger(__name__)
ETCD_BUILDER_PREFIX = 'building/'
def clear_etcd(client):
""" Debugging method used to clear out the section of etcd we are using to track jobs in flight.
"""
try:
building = client.read(ETCD_BUILDER_PREFIX, recursive=True)
for child in building.leaves:
if not child.dir:
logger.warning('Deleting key: %s', child.key)
client.delete(child.key)
except KeyError:
pass
class EphemeralBuilderManager(BaseManager):
""" Build manager implementation for the Enterprise Registry. """
shutting_down = False
def __init__(self, *args, **kwargs):
self._manager_config = None
self._etcd_client = None
self._component_to_job = {}
self._component_to_builder = {}
self._executors = {
'popen': PopenExecutor,
'ec2': EC2Executor,
}
self._executor = None
super(EphemeralBuilderManager, self).__init__(*args, **kwargs)
def initialize(self, manager_config):
logger.debug('Calling initialize')
self._manager_config = manager_config
executor_klass = self._executors.get(manager_config.get('EXECUTOR', ''), PopenExecutor)
self._executor = executor_klass(manager_config.get('EXECUTOR_CONFIG', {}),
self.public_ip_address)
etcd_host = self._manager_config.get('ETCD_HOST', '127.0.0.1')
etcd_port = self._manager_config.get('ETCD_PORT', 2379)
logger.debug('Connecting to etcd on %s:%s', etcd_host, etcd_port)
self._etcd_client = etcd.Client(host=etcd_host, port=etcd_port)
clear_etcd(self._etcd_client)
def setup_time(self):
setup_time = self._manager_config.get('MACHINE_SETUP_TIME', 300)
logger.debug('Returning setup_time: %s', setup_time)
return setup_time
def shutdown(self):
logger.debug('Calling shutdown.')
raise NotImplementedError
def schedule(self, build_job, loop):
logger.debug('Calling schedule with job: %s', build_job.repo_build.uuid)
# Check if there are worker slots avialable by checking the number of jobs in etcd
allowed_worker_count = self._manager_config.get('ALLOWED_WORKER_COUNT', 2)
try:
building = self._etcd_client.read(ETCD_BUILDER_PREFIX, recursive=True)
workers_alive = sum(1 for child in building.children if not child.dir)
except KeyError:
workers_alive = 0
logger.debug('Total jobs: %s', workers_alive)
if workers_alive >= allowed_worker_count:
logger.info('Too many workers alive, unable to start new worker. %s >= %s', workers_alive,
allowed_worker_count)
return False
job_key = self._etcd_job_key(build_job)
# First try to take a lock for this job, meaning we will be responsible for its lifeline
realm = str(uuid.uuid4())
token = str(uuid.uuid4())
expiration = datetime.utcnow() + timedelta(seconds=self.setup_time())
payload = {
'expiration': expiration.isoformat(),
}
try:
self._etcd_client.write(job_key, payload, prevExist=False)
component = self.register_component(realm, BuildComponent, token=token)
self._component_to_job[component] = build_job
except KeyError:
# The job was already taken by someone else, we are probably a retry
logger.warning('Job already exists in etcd, did an old worker die?')
return False
builder_id = self._executor.start_builder(realm, token)
self._component_to_builder[component] = builder_id
return True
def build_component_ready(self, build_component, loop):
try:
job = self._component_to_job.pop(build_component)
logger.debug('Sending build %s to newly ready component on realm %s', job.repo_build.uuid,
build_component.builder_realm)
loop.call_soon(build_component.start_build, job)
except KeyError:
logger.warning('Builder is asking for more work, but work already completed')
def build_component_disposed(self, build_component, timed_out):
logger.debug('Calling build_component_disposed.')
def job_completed(self, build_job, job_status, build_component):
logger.debug('Calling job_completed with status: %s', job_status)
# Kill he ephmeral builder
self._executor.stop_builder(self._component_to_builder.pop(build_component))
# Release the lock in etcd
job_key = self._etcd_job_key(build_job)
self._etcd_client.delete(job_key)
self.job_complete_callback(build_job, job_status)
@staticmethod
def _etcd_job_key(build_job):
""" Create a key which is used to track a job in etcd.
"""
return '{0}{1}'.format(ETCD_BUILDER_PREFIX, build_job.repo_build.uuid)