This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/workers/dockerfilebuild.py

278 lines
No EOL
9.3 KiB
Python

import logging
import json
import daemon
import time
import argparse
import digitalocean
import requests
import os
from apscheduler.scheduler import Scheduler
from multiprocessing.pool import ThreadPool
from base64 import b64encode
from requests.exceptions import ConnectionError
from data.queue import dockerfile_build_queue
from data import model
from data.database import db as db_connection
from app import app
root_logger = logging.getLogger('')
root_logger.setLevel(logging.DEBUG)
FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - %(funcName)s - %(message)s'
formatter = logging.Formatter(FORMAT)
logger = logging.getLogger(__name__)
BUILD_SERVER_CMD = ('docker run -d -lxc-conf="lxc.aa_profile=unconfined" ' +
'-privileged -e \'RESOURCE_URL=%s\' -e \'TAG=%s\' ' +
'-e \'TOKEN=%s\' quay.io/quay/buildserver')
def retry_command(to_call, args=[], kwargs={}, retries=5, period=5):
try:
return to_call(*args, **kwargs)
except Exception as ex:
if retries:
logger.debug('Retrying command after %ss' % period)
time.sleep(period)
return retry_command(to_call, args, kwargs, retries-1, period)
raise ex
def get_status(url):
return retry_command(requests.get, [url]).json()['status']
def babysit_builder(request):
""" Spin up a build node and ask it to build our job. Retryable errors
should return False, while fatal errors should return True.
"""
try:
logger.debug('Starting work item: %s' % request)
repository_build = model.get_repository_build(request['build_id'])
logger.debug('Request details: %s' % repository_build)
# Initialize digital ocean API
do_client_id = app.config['DO_CLIENT_ID']
do_api_key = app.config['DO_CLIENT_SECRET']
manager = digitalocean.Manager(client_id=do_client_id, api_key=do_api_key)
# check if there is already a DO node for this build, if so clean it up
old_id = repository_build.build_node_id
if old_id:
logger.debug('Cleaning up old DO node: %s' % old_id)
old_droplet = digitalocean.Droplet(id=old_id, client_id=do_client_id,
api_key=do_api_key)
retry_command(old_droplet.destroy)
# Pick the region for the new droplet
allowed_regions = app.config['DO_ALLOWED_REGIONS']
regions = retry_command(manager.get_all_regions)
available_regions = {region.id for region in regions}
regions = available_regions.intersection(allowed_regions)
if not regions:
logger.error('No droplets in our allowed regtions, available: %s' %
available_regions)
return False
# start the DO node
name = 'dockerfile-build-%s' % repository_build.id
logger.debug('Starting DO node: %s' % name)
droplet = digitalocean.Droplet(client_id=do_client_id,
api_key=do_api_key,
name=name,
region_id=regions.pop(),
image_id=1004145, # Docker on 13.04
size_id=66, # 512MB,
backup_active=False)
retry_command(droplet.create, [],
{'ssh_key_ids': [app.config['DO_SSH_KEY_ID']]})
repository_build.build_node_id = droplet.id
repository_build.phase = 'starting'
repository_build.save()
logger.debug('Waiting for DO node to be available.')
startup = retry_command(droplet.get_events)[0]
while not startup.percentage or int(startup.percentage) != 100:
logger.debug('Droplet startup percentage: %s' % startup.percentage)
time.sleep(5)
retry_command(startup.load)
retry_command(droplet.load)
logger.debug('Droplet started at ip address: %s' % droplet.ip_address)
# connect to it with ssh
repository_build.phase = 'initializing'
repository_build.save()
# We wait until here to import paramiko because otherwise it doesn't work
# under the daemon context.
import paramiko
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
logger.debug('Connecting to droplet through ssh at ip: %s' %
droplet.ip_address)
retry_command(ssh_client.connect, [droplet.ip_address, 22, 'root'],
{'look_for_keys': False, 'timeout': 10.0,
'key_filename': app.config['DO_SSH_PRIVATE_KEY_FILENAME']})
# Load the node with the pull token
token = app.config['BUILD_NODE_PULL_TOKEN']
basicauth = b64encode('%s:%s' % ('$token', token))
auth_object = {
'https://quay.io/v1/': {
'auth': basicauth,
'email': '',
},
}
create_auth_cmd = 'echo \'%s\' > .dockercfg' % json.dumps(auth_object)
ssh_client.exec_command(create_auth_cmd)
# Pull and run the buildserver
pull_cmd = 'docker pull quay.io/quay/buildserver'
_, stdout, _ = ssh_client.exec_command(pull_cmd)
pull_status = stdout.channel.recv_exit_status()
if pull_status != 0:
logger.error('Pull command failed for host: %s' % droplet.ip_address)
return False
else:
logger.debug('Pull status was: %s' % pull_status)
# Remove the credentials we used to pull so crafty users cant steal them
remove_auth_cmd = 'rm .dockercfg'
ssh_client.exec_command(remove_auth_cmd)
# Prepare the signed resource url the build node can fetch the job from
user_files = app.config['USERFILES']
resource_url = user_files.get_file_url(repository_build.resource_key)
# Start the build server
start_cmd = BUILD_SERVER_CMD % (resource_url, repository_build.tag,
repository_build.access_token.code)
logger.debug('Sending build server request with command: %s' % start_cmd)
ssh_client.exec_command(start_cmd)
status_endpoint = 'http://%s:5002/build/' % droplet.ip_address
# wait for the server to be ready
logger.debug('Waiting for buildserver to be ready')
retry_command(requests.get, [status_endpoint])
# wait for the job to be complete
repository_build.phase = 'building'
repository_build.status_url = status_endpoint
repository_build.save()
logger.debug('Waiting for job to be complete')
status = get_status(status_endpoint)
while status != 'error' and status != 'complete':
logger.debug('Job status is: %s' % status)
time.sleep(5)
status = get_status(status_endpoint)
logger.debug('Job complete with status: %s' % status)
if status == 'error':
error_message = requests.get(status_endpoint).json()['message']
logger.warning('Job error: %s' % error_message)
repository_build.phase = 'error'
else:
repository_build.phase = 'complete'
# clean up the DO node
logger.debug('Cleaning up DO node.')
retry_command(droplet.destroy)
repository_build.status_url = None
repository_build.build_node_id = None;
repository_build.save()
return True
except Exception as outer_ex:
# We don't really know what these are, but they are probably retryable
logger.exception('Exception processing job: %s' % outer_ex.message)
return False
finally:
if not db_connection.is_closed():
logger.debug('Closing thread db connection.')
db_connection.close()
def process_work_items(pool):
logger.debug('Getting work item from queue.')
item = dockerfile_build_queue.get(processing_time=60*60) # allow 1 hr
while item:
logger.debug('Queue gave us some work: %s' % item.body)
request = json.loads(item.body)
def build_callback(item):
local_item = item
def complete_callback(completed):
if completed:
logger.debug('Queue item completed successfully, will be removed.')
dockerfile_build_queue.complete(local_item)
else:
# We have a retryable error, add the job back to the queue
logger.debug('Queue item incomplete, will be retryed.')
dockerfile_build_queue.incomplete(local_item)
return complete_callback
logger.debug('Sending work item to thread pool: %s' % pool)
pool.apply_async(babysit_builder, [request],
callback=build_callback(item))
item = dockerfile_build_queue.get()
logger.debug('No more work.')
if not db_connection.is_closed():
logger.debug('Closing thread db connection.')
db_connection.close()
def start_worker():
pool = ThreadPool(3)
logger.debug('Scheduling worker.')
sched = Scheduler()
sched.start()
sched.add_interval_job(process_work_items, args=[pool], seconds=30)
while True:
time.sleep(60 * 60 * 24) # sleep one day, basically forever
desc = 'Worker daemon to monitor dockerfile build'
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('-D', action='store_true', default=False,
help='Run the worker in daemon mode.')
parser.add_argument('--log', default='dockerfilebuild.log',
help='Specify the log file for the worker as a daemon.')
args = parser.parse_args()
if args.D:
handler = logging.FileHandler(args.log)
handler.setFormatter(formatter)
root_logger.addHandler(handler)
with daemon.DaemonContext(files_preserve=[handler.stream],
working_directory=os.getcwd()):
start_worker()
else:
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger.addHandler(handler)
start_worker()