import logging import json import daemon import time import argparse import digitalocean import requests import os from apscheduler.scheduler import Scheduler from multiprocessing.pool import ThreadPool from base64 import b64encode from requests.exceptions import ConnectionError from data.queue import dockerfile_build_queue from data.userfiles import UserRequestFiles from data import model from data.database import db as db_connection from app import app root_logger = logging.getLogger('') root_logger.setLevel(logging.DEBUG) FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - %(funcName)s - %(message)s' formatter = logging.Formatter(FORMAT) logger = logging.getLogger(__name__) BUILD_SERVER_CMD = ('docker run -d -lxc-conf="lxc.aa_profile=unconfined" ' + '-privileged -e \'RESOURCE_URL=%s\' -e \'TAG=%s\' ' + '-e \'TOKEN=%s\' quay.io/quay/buildserver') def retry_command(to_call, args=[], kwargs={}, retries=5, period=5): try: return to_call(*args, **kwargs) except Exception as ex: if retries: logger.debug('Retrying command after %ss' % period) time.sleep(period) return retry_command(to_call, args, kwargs, retries-1, period) raise ex def get_status(url): return retry_command(requests.get, [url]).json()['status'] def babysit_builder(request): """ Spin up a build node and ask it to build our job. Retryable errors should return False, while fatal errors should return True. """ try: logger.debug('Starting work item: %s' % request) repository_build = model.get_repository_build(request['build_id']) logger.debug('Request details: %s' % repository_build) # Initialize digital ocean API do_client_id = app.config['DO_CLIENT_ID'] do_api_key = app.config['DO_CLIENT_SECRET'] manager = digitalocean.Manager(client_id=do_client_id, api_key=do_api_key) # check if there is already a DO node for this build, if so clean it up old_id = repository_build.build_node_id if old_id: logger.debug('Cleaning up old DO node: %s' % old_id) old_droplet = digitalocean.Droplet(id=old_id, client_id=do_client_id, api_key=do_api_key) retry_command(old_droplet.destroy) # Pick the region for the new droplet allowed_regions = app.config['DO_ALLOWED_REGIONS'] regions = retry_command(manager.get_all_regions) available_regions = {region.id for region in regions} regions = available_regions.intersection(allowed_regions) if not regions: logger.error('No droplets in our allowed regtions, available: %s' % available_regions) return False # start the DO node name = 'dockerfile-build-%s' % repository_build.id logger.debug('Starting DO node: %s' % name) droplet = digitalocean.Droplet(client_id=do_client_id, api_key=do_api_key, name=name, region_id=regions.pop(), image_id=1004145, # Docker on 13.04 size_id=66, # 512MB, backup_active=False) retry_command(droplet.create, [], {'ssh_key_ids': [app.config['DO_SSH_KEY_ID']]}) repository_build.build_node_id = droplet.id repository_build.phase = 'starting' repository_build.save() logger.debug('Waiting for DO node to be available.') startup = retry_command(droplet.get_events)[0] while not startup.percentage or int(startup.percentage) != 100: logger.debug('Droplet startup percentage: %s' % startup.percentage) time.sleep(5) retry_command(startup.load) retry_command(droplet.load) logger.debug('Droplet started at ip address: %s' % droplet.ip_address) # connect to it with ssh repository_build.phase = 'initializing' repository_build.save() # We wait until here to import paramiko because otherwise it doesn't work # under the daemon context. import paramiko ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) logger.debug('Connecting to droplet through ssh at ip: %s' % droplet.ip_address) retry_command(ssh_client.connect, [droplet.ip_address, 22, 'root'], {'look_for_keys': False, 'timeout': 10.0, 'key_filename': app.config['DO_SSH_PRIVATE_KEY_FILENAME']}) # Load the node with the pull token token = app.config['BUILD_NODE_PULL_TOKEN'] basicauth = b64encode('%s:%s' % ('$token', token)) auth_object = { 'https://quay.io/v1/': { 'auth': basicauth, 'email': '', }, } create_auth_cmd = 'echo \'%s\' > .dockercfg' % json.dumps(auth_object) ssh_client.exec_command(create_auth_cmd) # Pull and run the buildserver pull_cmd = 'docker pull quay.io/quay/buildserver' _, stdout, _ = ssh_client.exec_command(pull_cmd) pull_status = stdout.channel.recv_exit_status() if pull_status != 0: logger.error('Pull command failed for host: %s' % droplet.ip_address) return False else: logger.debug('Pull status was: %s' % pull_status) # Remove the credentials we used to pull so crafty users cant steal them remove_auth_cmd = 'rm .dockercfg' ssh_client.exec_command(remove_auth_cmd) # Prepare the signed resource url the build node can fetch the job from user_files = UserRequestFiles(app.config['AWS_ACCESS_KEY'], app.config['AWS_SECRET_KEY'], app.config['REGISTRY_S3_BUCKET']) resource_url = user_files.get_file_url(repository_build.resource_key) # Start the build server start_cmd = BUILD_SERVER_CMD % (resource_url, repository_build.tag, repository_build.access_token.code) logger.debug('Sending build server request with command: %s' % start_cmd) ssh_client.exec_command(start_cmd) status_endpoint = 'http://%s:5002/build/' % droplet.ip_address # wait for the server to be ready logger.debug('Waiting for buildserver to be ready') retry_command(requests.get, [status_endpoint]) # wait for the job to be complete repository_build.phase = 'building' repository_build.status_url = status_endpoint repository_build.save() logger.debug('Waiting for job to be complete') status = get_status(status_endpoint) while status != 'error' and status != 'complete': logger.debug('Job status is: %s' % status) time.sleep(5) status = get_status(status_endpoint) logger.debug('Job complete with status: %s' % status) if status == 'error': error_message = requests.get(status_endpoint).json()['message'] logger.warning('Job error: %s' % error_message) repository_build.phase = 'error' else: repository_build.phase = 'complete' # clean up the DO node logger.debug('Cleaning up DO node.') retry_command(droplet.destroy) repository_build.status_url = None repository_build.build_node_id = None; repository_build.save() return True except Exception as outer_ex: # We don't really know what these are, but they are probably retryable logger.exception('Exception processing job: %s' % outer_ex.message) return False finally: if not db_connection.is_closed(): logger.debug('Closing thread db connection.') db_connection.close() def process_work_items(pool): logger.debug('Getting work item from queue.') item = dockerfile_build_queue.get(processing_time=60*60) # allow 1 hr while item: logger.debug('Queue gave us some work: %s' % item.body) request = json.loads(item.body) def build_callback(item): local_item = item def complete_callback(completed): if completed: dockerfile_build_queue.complete(local_item) else: # We have a retryable error, add the job back to the queue dockerfile_build_queue.incomplete(local_item) return complete_callback logger.debug('Sending work item to thread pool: %s' % pool) pool.apply_async(babysit_builder, [request], callback=build_callback(item)) item = dockerfile_build_queue.get() logger.debug('No more work.') if not db_connection.is_closed(): logger.debug('Closing thread db connection.') db_connection.close() def start_worker(): pool = ThreadPool(3) logger.debug('Scheduling worker.') sched = Scheduler() sched.start() sched.add_interval_job(process_work_items, args=[pool], seconds=30) while True: time.sleep(60 * 60 * 24) # sleep one day, basically forever desc = 'Worker daemon to monitor dockerfile build' parser = argparse.ArgumentParser(description=desc) parser.add_argument('-D', action='store_true', default=False, help='Run the worker in daemon mode.') parser.add_argument('--log', default='dockerfilebuild.log', help='Specify the log file for the worker as a daemon.') args = parser.parse_args() if args.D: handler = logging.FileHandler(args.log) handler.setFormatter(formatter) root_logger.addHandler(handler) with daemon.DaemonContext(files_preserve=[handler.stream], working_directory=os.getcwd()): start_worker() else: handler = logging.StreamHandler() handler.setFormatter(formatter) root_logger.addHandler(handler) start_worker()