First stab at the new builder.

This commit is contained in:
yackob03 2014-01-24 14:40:36 -05:00
parent 335733ad68
commit 72559fb948
6 changed files with 218 additions and 526 deletions

View file

@ -1,20 +1,19 @@
import logging
import json
import daemon
import time
import argparse
import digitalocean
import requests
import os
import requests
import re
import json
import shutil
from apscheduler.scheduler import Scheduler
from multiprocessing.pool import ThreadPool
from base64 import b64encode
from requests.exceptions import ConnectionError
from docker import Client, APIError
from tempfile import TemporaryFile, mkdtemp
from zipfile import ZipFile
from data.queue import dockerfile_build_queue
from data import model
from data.database import db as db_connection
from workers.worker import Worker
from app import app
@ -26,233 +25,231 @@ formatter = logging.Formatter(FORMAT)
logger = logging.getLogger(__name__)
BUILD_SERVER_CMD = ('docker run -d -p 5002:5002 ' +
'-lxc-conf="lxc.aa_profile=unconfined" ' +
'-privileged -e \'RESOURCE_URL=%s\' -e \'TAG=%s\' ' +
'-e \'TOKEN=%s\' quay.io/quay/buildserver')
class DockerfileBuildWorker(Worker):
def __init__(self, *vargs, **kwargs):
super(DockerfileBuildWorker, self).__init__(*vargs, **kwargs)
def retry_command(to_call, args=[], kwargs={}, retries=5, period=5):
try:
return to_call(*args, **kwargs)
except Exception as ex:
if retries:
logger.debug('Retrying command after %ss' % period)
time.sleep(period)
return retry_command(to_call, args, kwargs, retries-1, period)
raise ex
def get_status(url):
return retry_command(requests.get, [url]).json()['status']
def babysit_builder(request):
""" Spin up a build node and ask it to build our job. Retryable errors
should return False, while fatal errors should return True.
"""
try:
logger.debug('Starting work item: %s' % request)
repository_build = model.get_repository_build(request['build_id'])
logger.debug('Request details: %s' % repository_build)
# Initialize digital ocean API
do_client_id = app.config['DO_CLIENT_ID']
do_api_key = app.config['DO_CLIENT_SECRET']
manager = digitalocean.Manager(client_id=do_client_id, api_key=do_api_key)
# check if there is already a DO node for this build, if so clean it up
old_id = repository_build.build_node_id
if old_id:
logger.debug('Cleaning up old DO node: %s' % old_id)
old_droplet = digitalocean.Droplet(id=old_id, client_id=do_client_id,
api_key=do_api_key)
retry_command(old_droplet.destroy)
# Pick the region for the new droplet
allowed_regions = app.config['DO_ALLOWED_REGIONS']
regions = retry_command(manager.get_all_regions)
available_regions = {region.id for region in regions}
regions = available_regions.intersection(allowed_regions)
if not regions:
logger.error('No droplets in our allowed regtions, available: %s' %
available_regions)
return False
# start the DO node
name = 'dockerfile-build-%s' % repository_build.id
logger.debug('Starting DO node: %s' % name)
droplet = digitalocean.Droplet(client_id=do_client_id,
api_key=do_api_key,
name=name,
region_id=regions.pop(),
image_id=app.config['DO_DOCKER_IMAGE'],
size_id=66, # 512MB,
backup_active=False)
retry_command(droplet.create, [],
{'ssh_key_ids': [app.config['DO_SSH_KEY_ID']]})
repository_build.build_node_id = droplet.id
repository_build.phase = 'starting'
repository_build.save()
logger.debug('Waiting for DO node to be available.')
startup = retry_command(droplet.get_events)[0]
while not startup.percentage or int(startup.percentage) != 100:
logger.debug('Droplet startup percentage: %s' % startup.percentage)
time.sleep(5)
retry_command(startup.load)
retry_command(droplet.load)
logger.debug('Droplet started at ip address: %s' % droplet.ip_address)
# connect to it with ssh
repository_build.phase = 'initializing'
repository_build.save()
# We wait until here to import paramiko because otherwise it doesn't work
# under the daemon context.
import paramiko
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
logger.debug('Connecting to droplet through ssh at ip: %s' %
droplet.ip_address)
retry_command(ssh_client.connect, [droplet.ip_address, 22, 'root'],
{'look_for_keys': False, 'timeout': 10.0,
'key_filename': app.config['DO_SSH_PRIVATE_KEY_FILENAME']})
# Load the node with the pull token
token = app.config['BUILD_NODE_PULL_TOKEN']
basicauth = b64encode('%s:%s' % ('$token', token))
auth_object = {
'https://quay.io/v1/': {
'auth': basicauth,
'email': '',
},
self._mime_processors = {
'application/zip': DockerfileBuildWorker.__prepare_zip,
'text/plain': DockerfileBuildWorker.__prepare_dockerfile,
'application/octet-stream': DockerfileBuildWorker.__prepare_dockerfile,
}
create_auth_cmd = 'echo \'%s\' > .dockercfg' % json.dumps(auth_object)
ssh_client.exec_command(create_auth_cmd)
@staticmethod
def __count_steps(dockerfile_path):
with open(dockerfile_path, 'r') as dockerfileobj:
steps = 0
for line in dockerfileobj.readlines():
stripped = line.strip()
if stripped and stripped[0] is not '#':
steps += 1
return steps
# Pull and run the buildserver
pull_cmd = 'docker pull quay.io/quay/buildserver'
_, stdout, _ = ssh_client.exec_command(pull_cmd)
pull_status = stdout.channel.recv_exit_status()
@staticmethod
def __prepare_zip(request_file):
build_dir = mkdtemp(prefix='docker-build-')
if pull_status != 0:
logger.error('Pull command failed for host: %s' % droplet.ip_address)
return False
else:
logger.debug('Pull status was: %s' % pull_status)
# Save the zip file to temp somewhere
with TemporaryFile() as zip_file:
zip_file.write(request_file.content)
to_extract = ZipFile(zip_file)
to_extract.extractall(build_dir)
# Remove the credentials we used to pull so crafty users cant steal them
remove_auth_cmd = 'rm .dockercfg'
ssh_client.exec_command(remove_auth_cmd)
return build_dir
@staticmethod
def __prepare_dockerfile(request_file):
build_dir = mkdtemp(prefix='docker-build-')
dockerfile_path = os.path.join(build_dir, "Dockerfile")
with open(dockerfile_path, 'w') as dockerfile:
dockerfile.write(request_file.content)
return build_dir
@staticmethod
def __total_completion(statuses, total_images):
percentage_with_sizes = float(len(statuses.values()))/total_images
sent_bytes = sum([status[u'current'] for status in statuses.values()])
total_bytes = sum([status[u'total'] for status in statuses.values()])
return float(sent_bytes)/total_bytes*percentage_with_sizes
@staticmethod
def __build_image(build_dir, tag_name, num_steps, result_object):
try:
logger.debug('Starting build.')
docker_cl = Client(timeout=1200)
result_object['status'] = 'building'
build_status = docker_cl.build(path=build_dir, tag=tag_name, stream=True)
current_step = 0
built_image = None
for status in build_status:
# logger.debug('Status: %s', str(status))
step_increment = re.search(r'Step ([0-9]+) :', status)
if step_increment:
current_step = int(step_increment.group(1))
logger.debug('Step now: %s/%s' % (current_step, num_steps))
result_object['current_command'] = current_step
continue
complete = re.match(r'Successfully built ([a-z0-9]+)$', status)
if complete:
built_image = complete.group(1)
logger.debug('Final image ID is: %s' % built_image)
continue
shutil.rmtree(build_dir)
# Get the image count
if not built_image:
result_object['status'] = 'error'
result_object['message'] = 'Unable to build dockerfile.'
return
return built_image
except Exception as exc:
logger.exception('Exception when processing request.')
result_object['status'] = 'error'
result_object['message'] = str(exc.message)
@staticmethod
def __push_image(built_image, token, tag_name, result_object):
try:
# Login to the registry
host = re.match(r'([a-z0-9.:]+)/.+/.+$', tag_name)
if not host:
raise Exception('Invalid tag name: %s' % tag_name)
registry_endpoint = 'http://%s/v1/' % host.group(1)
docker_cl = Client(timeout=1200)
logger.debug('Attempting login to registry: %s' % registry_endpoint)
docker_cl.login('$token', token, registry=registry_endpoint)
history = json.loads(docker_cl.history(built_image))
num_images = len(history)
result_object['total_images'] = num_images
result_object['status'] = 'pushing'
logger.debug('Pushing to tag name: %s' % tag_name)
resp = docker_cl.push(tag_name, stream=True)
for status_str in resp:
status = json.loads(status_str)
logger.debug('Status: %s', status_str)
if u'status' in status:
status_msg = status[u'status']
if status_msg == 'Pushing':
if u'progressDetail' in status and u'id' in status:
image_id = status[u'id']
detail = status[u'progressDetail']
if u'current' in detail and 'total' in detail:
images = result_object['image_completion']
images[image_id] = detail
result_object['push_completion'] = \
DockerfileBuildWorker.__total_completion(images, num_images)
elif u'errorDetail' in status:
result_object['status'] = 'error'
if u'message' in status[u'errorDetail']:
result_object['message'] = str(status[u'errorDetail'][u'message'])
return
result_object['status'] = 'complete'
except Exception as exc:
logger.exception('Exception when processing request.')
result_object['status'] = 'error'
result_object['message'] = str(exc.message)
@staticmethod
def __cleanup():
docker_cl = Client(timeout=1200)
# First clean up any containers that might be holding the images
for running in docker_cl.containers(quiet=True):
docker_cl.kill(running['Id'])
# Next, remove all of the containers (which should all now be killed)
for container in docker_cl.containers(all=True, quiet=True):
docker_cl.remove_container(container['Id'])
# Iterate all of the images and remove the ones that the public registry
# doesn't know about, this should preserve base images.
images_to_remove = set()
repos = set()
for image in docker_cl.images():
images_to_remove.add(image['Id'])
repos.add(image['Repository'])
for repo in repos:
repo_url = 'https://index.docker.io/v1/repositories/%s/images' % repo
repo_info = requests.get(repo_url)
if repo_info.status_code / 100 == 2:
for repo_image in repo_info.json():
if repo_image['id'] in images_to_remove:
logger.debug('Image was deemed public: %s' % repo_image['id'])
images_to_remove.remove(repo_image['id'])
for to_remove in images_to_remove:
logger.debug('Removing private image: %s' % to_remove)
try:
docker_cl.remove_image(to_remove)
except APIError:
# Sometimes an upstream image removed this one
pass
# Verify that our images were actually removed
for image in docker_cl.images():
if image['Id'] in images_to_remove:
raise RuntimeError('Image was not removed: %s' % image['Id'])
def process_queue_item(self, job_details):
repository_build = model.get_repository_build(job_details['build_id'])
# Prepare the signed resource url the build node can fetch the job from
user_files = app.config['USERFILES']
resource_url = user_files.get_file_url(repository_build.resource_key)
tag_name = repository_build.tag
access_token = repository_build.access_token.code
# Start the build server
start_cmd = BUILD_SERVER_CMD % (resource_url, repository_build.tag,
repository_build.access_token.code)
logger.debug('Sending build server request with command: %s' % start_cmd)
ssh_client.exec_command(start_cmd)
feedback = {
'total_commands': None,
'current_command': None,
'push_completion': 0.0,
'status': 'waiting',
'message': None,
'image_completion': {},
}
status_endpoint = 'http://%s:5002/build/' % droplet.ip_address
# wait for the server to be ready
logger.debug('Waiting for buildserver to be ready')
retry_command(requests.get, [status_endpoint])
logger.debug('Starting job with resource url: %s tag: %s and token: %s' %
(resource_url, tag_name, access_token))
# wait for the job to be complete
repository_build.phase = 'building'
repository_build.status_url = status_endpoint
repository_build.save()
docker_resource = requests.get(resource_url)
c_type = docker_resource.headers['content-type']
logger.debug('Waiting for job to be complete')
status = get_status(status_endpoint)
while status != 'error' and status != 'complete':
logger.debug('Job status is: %s' % status)
time.sleep(5)
status = get_status(status_endpoint)
logger.info('Request to build file of type: %s with tag: %s' %
(c_type, tag_name))
logger.debug('Job complete with status: %s' % status)
if status == 'error':
error_message = requests.get(status_endpoint).json()['message']
logger.warning('Job error: %s' % error_message)
repository_build.phase = 'error'
else:
repository_build.phase = 'complete'
if c_type not in self._mime_processors:
raise Exception('Invalid dockerfile content type: %s' % c_type)
# clean up the DO node
logger.debug('Cleaning up DO node.')
retry_command(droplet.destroy)
build_dir = self._mime_processors[c_type](docker_resource)
repository_build.status_url = None
repository_build.build_node_id = None
repository_build.save()
dockerfile_path = os.path.join(build_dir, "Dockerfile")
num_steps = DockerfileBuildWorker.__count_steps(dockerfile_path)
logger.debug('Dockerfile had %s steps' % num_steps)
return True
built_image = DockerfileBuildWorker.__build_image(build_dir, tag_name,
num_steps, feedback)
except Exception as outer_ex:
# We don't really know what these are, but they are probably retryable
logger.exception('Exception processing job: %s' % outer_ex.message)
return False
DockerfileBuildWorker.__push_image(built_image, access_token, tag_name,
feedback)
finally:
if not db_connection.is_closed():
logger.debug('Closing thread db connection.')
db_connection.close()
def process_work_items(pool):
logger.debug('Getting work item from queue.')
item = dockerfile_build_queue.get(processing_time=60*60) # allow 1 hr
while item:
logger.debug('Queue gave us some work: %s' % item.body)
request = json.loads(item.body)
def build_callback(item):
local_item = item
def complete_callback(completed):
if completed:
logger.debug('Queue item completed successfully, will be removed.')
dockerfile_build_queue.complete(local_item)
else:
# We have a retryable error, add the job back to the queue
logger.debug('Queue item incomplete, will be retryed.')
dockerfile_build_queue.incomplete(local_item)
return complete_callback
logger.debug('Sending work item to thread pool: %s' % pool)
pool.apply_async(babysit_builder, [request],
callback=build_callback(item))
item = dockerfile_build_queue.get()
logger.debug('No more work.')
if not db_connection.is_closed():
logger.debug('Closing thread db connection.')
db_connection.close()
def start_worker():
pool = ThreadPool(3)
logger.debug('Scheduling worker.')
sched = Scheduler()
sched.start()
sched.add_interval_job(process_work_items, args=[pool], seconds=30)
while True:
time.sleep(60 * 60 * 24) # sleep one day, basically forever
DockerfileBuildWorker.__cleanup()
desc = 'Worker daemon to monitor dockerfile build'
@ -264,16 +261,17 @@ parser.add_argument('--log', default='dockerfilebuild.log',
args = parser.parse_args()
worker = DockerfileBuildWorker(dockerfile_build_queue)
if args.D:
handler = logging.FileHandler(args.log)
handler.setFormatter(formatter)
root_logger.addHandler(handler)
with daemon.DaemonContext(files_preserve=[handler.stream],
working_directory=os.getcwd()):
start_worker()
with daemon.DaemonContext(files_preserve=[handler.stream]):
worker.start()
else:
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger.addHandler(handler)
start_worker()
worker.start()