import logging import json import daemon import time import argparse import digitalocean import requests import paramiko from apscheduler.scheduler import Scheduler from multiprocessing.pool import ThreadPool from base64 import b64encode from requests.exceptions import ConnectionError from data.queue import dockerfile_build_queue from data.userfiles import UserRequestFiles from data import model from app import app root_logger = logging.getLogger('') root_logger.setLevel(logging.DEBUG) FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - %(funcName)s - %(message)s' formatter = logging.Formatter(FORMAT) logger = logging.getLogger(__name__) def try_connection(url, retries=5, period=5): try: return requests.get(url) except ConnectionError as ex: if retries: logger.debug('Retrying connection to url: %s after %ss' % (url, period)) time.sleep(period) return try_connection(url, retries-1, period) raise ex def try_connect_ssh(client, ip_addr, port, user, key_filename, retries=5, period=5): try: client.connect(ip_addr, port, user, look_for_keys=False, key_filename=key_filename) except Exception as ex: if retries: logger.debug('Retrying connection to ssh ip: %s:%s after %ss' % (ip_addr, port, period)) time.sleep(period) return try_connect_ssh(client, ip_addr, port, user, key_filename, retries-1, period) raise ex def get_status(url): return requests.get(url).json()['status'] def babysit_builder(request): try: logger.debug('Starting work item: %s' % request) repository_build = model.get_repository_build(request['build_id']) logger.debug('Request details: %s' % repository_build) # Initialize digital ocean API do_client_id = app.config['DO_CLIENT_ID'] do_api_key = app.config['DO_CLIENT_SECRET'] manager = digitalocean.Manager(client_id=do_client_id, api_key=do_api_key) # check if there is already a DO node for this build, if so clean it up old_id = repository_build.build_node_id if old_id: logger.debug('Cleaning up old DO node: %s' % old_id) old_droplet = digitalocean.Droplet(id=old_id, client_id=do_client_id, api_key=do_api_key) old_droplet.destroy() # Pick the region for the new droplet allowed_regions = app.config['DO_ALLOWED_REGIONS'] available_regions = {region.id for region in manager.get_all_regions()} regions = available_regions.intersection(allowed_regions) if not regions: logger.error('No droplets in our allowed regtions, available: %s' % available_regions) return False # start the DO node name = 'dockerfile-build-%s' % repository_build.id logger.debug('Starting DO node: %s' % name) droplet = digitalocean.Droplet(client_id=do_client_id, api_key=do_api_key, name=name, region_id=regions.pop(), image_id=1004145, # Docker on 13.04 size_id=66, # 512MB, backup_active=False) droplet.create(ssh_key_ids=[app.config['DO_SSH_KEY_ID']]) repository_build.build_node_id = droplet.id repository_build.phase = 'starting' repository_build.save() startup = droplet.get_events()[0] startup.load() while not startup.percentage or int(startup.percentage) != 100: logger.debug('Droplet startup percentage: %s' % startup.percentage) time.sleep(5) startup.load() droplet.load() logger.debug('Droplet started at ip address: %s' % droplet.ip_address) # connect to it with ssh repository_build.phase = 'initializing' repository_build.save() ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try_connect_ssh(ssh_client, droplet.ip_address, 22, 'root', key_filename=app.config['DO_SSH_PRIVATE_KEY_FILENAME']) # Load the node with the pull token token = app.config['BUILD_NODE_PULL_TOKEN'] basicauth = b64encode('%s:%s' % ('$token', token)) auth_object = { 'https://quay.io/v1/': { 'auth': basicauth, 'email': '', }, } create_auth_cmd = 'echo \'%s\' > .dockercfg' % json.dumps(auth_object) ssh_client.exec_command(create_auth_cmd) # Pull and run the buildserver pull_cmd = 'docker pull quay.io/quay/buildserver' _, stdout, _ = ssh_client.exec_command(pull_cmd) pull_status = stdout.channel.recv_exit_status() if pull_status != 0: logger.error('Pull command failed for host: %s' % droplet.ip_address) else: logger.debug('Pull status was: %s' % pull_status) start_cmd = 'docker run -d -privileged -lxc-conf="lxc.aa_profile=unconfined" quay.io/quay/buildserver' ssh_client.exec_command(start_cmd) # wait for the server to be ready logger.debug('Waiting for buildserver to be ready') build_endpoint = 'http://%s:5002/build/' % droplet.ip_address try: try_connection(build_endpoint) except ConnectionError: #TODO cleanup pass # send it the job logger.debug('Sending build server request') user_files = UserRequestFiles(app.config['AWS_ACCESS_KEY'], app.config['AWS_SECRET_KEY'], app.config['REGISTRY_S3_BUCKET']) repo = repository_build.repository payload = { 'tag': repository_build.tag, 'resource_url': user_files.get_file_url(repository_build.resource_key), 'token': repository_build.access_token.code, } start_build = requests.post(build_endpoint, data=payload) # wait for the job to be complete status_url = start_build.headers['Location'] repository_build.phase = 'building' repository_build.status_url = status_url repository_build.save() logger.debug('Waiting for job to be complete') status = get_status(status_url) while status != 'error' and status != 'complete': logger.debug('Job status is: %s' % status) time.sleep(5) status = get_status(status_url) logger.debug('Job complete with status: %s' % status) if status == 'error': repository_build.phase = 'error' else: repository_build.phase = 'complete' # clean up the DO node logger.debug('Cleaning up DO node.') # droplet.destroy() repository_build.status_url = None repository_build.build_node_id = None; repository_build.save() return True except Exception as outer_ex: logger.exception('Exception processing job: %s' % outer_ex.message) def process_work_items(pool): logger.debug('Getting work item from queue.') item = dockerfile_build_queue.get(processing_time=60*60) # allow 1 hr while item: logger.debug('Queue gave us some work: %s' % item.body) request = json.loads(item.body) def build_callback(item): local_item = item def complete_callback(completed): if completed: dockerfile_build_queue.complete(local_item) return complete_callback logger.debug('Sending work item to thread pool: %s' % pool) pool.apply_async(babysit_builder, [request], callback=build_callback(item)) item = dockerfile_build_queue.get() logger.debug('No more work.') def start_worker(): pool = ThreadPool(3) logger.debug('Scheduling worker.') sched = Scheduler() sched.start() # sched.add_interval_job(process_work_items, args=[pool], seconds=30) process_work_items(pool) while True: time.sleep(60 * 60 * 24) # sleep one day, basically forever desc = 'Worker daemon to monitor dockerfile build' parser = argparse.ArgumentParser(description=desc) parser.add_argument('-D', action='store_true', default=False, help='Run the worker in daemon mode.') parser.add_argument('--log', default='dockerfilebuild.log', help='Specify the log file for the worker as a daemon.') args = parser.parse_args() if args.D: handler = logging.FileHandler(args.log) handler.setFormatter(formatter) root_logger.addHandler(handler) with daemon.DaemonContext(files_preserve=[handler.stream]): start_worker() else: handler = logging.StreamHandler() handler.setFormatter(formatter) root_logger.addHandler(handler) start_worker()