import logging import daemon import argparse import os import requests import re import json import shutil from docker import Client, APIError from tempfile import TemporaryFile, mkdtemp from zipfile import ZipFile from data.queue import dockerfile_build_queue from data import model from workers.worker import Worker from app import app root_logger = logging.getLogger('') root_logger.setLevel(logging.DEBUG) FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - %(funcName)s - %(message)s' formatter = logging.Formatter(FORMAT) logger = logging.getLogger(__name__) class DockerfileBuildWorker(Worker): def __init__(self, *vargs, **kwargs): super(DockerfileBuildWorker, self).__init__(*vargs, **kwargs) self._mime_processors = { 'application/zip': DockerfileBuildWorker.__prepare_zip, 'text/plain': DockerfileBuildWorker.__prepare_dockerfile, 'application/octet-stream': DockerfileBuildWorker.__prepare_dockerfile, } @staticmethod def __count_steps(dockerfile_path): with open(dockerfile_path, 'r') as dockerfileobj: steps = 0 for line in dockerfileobj.readlines(): stripped = line.strip() if stripped and stripped[0] is not '#': steps += 1 return steps @staticmethod def __prepare_zip(request_file): build_dir = mkdtemp(prefix='docker-build-') # Save the zip file to temp somewhere with TemporaryFile() as zip_file: zip_file.write(request_file.content) to_extract = ZipFile(zip_file) to_extract.extractall(build_dir) return build_dir @staticmethod def __prepare_dockerfile(request_file): build_dir = mkdtemp(prefix='docker-build-') dockerfile_path = os.path.join(build_dir, "Dockerfile") with open(dockerfile_path, 'w') as dockerfile: dockerfile.write(request_file.content) return build_dir @staticmethod def __total_completion(statuses, total_images): percentage_with_sizes = float(len(statuses.values()))/total_images sent_bytes = sum([status[u'current'] for status in statuses.values()]) total_bytes = sum([status[u'total'] for status in statuses.values()]) return float(sent_bytes)/total_bytes*percentage_with_sizes @staticmethod def __build_image(build_dir, tag_name, num_steps, result_object): try: logger.debug('Starting build.') docker_cl = Client(timeout=1200) result_object['status'] = 'building' build_status = docker_cl.build(path=build_dir, tag=tag_name, stream=True) current_step = 0 built_image = None for status in build_status: # logger.debug('Status: %s', str(status)) step_increment = re.search(r'Step ([0-9]+) :', status) if step_increment: current_step = int(step_increment.group(1)) logger.debug('Step now: %s/%s' % (current_step, num_steps)) result_object['current_command'] = current_step continue complete = re.match(r'Successfully built ([a-z0-9]+)$', status) if complete: built_image = complete.group(1) logger.debug('Final image ID is: %s' % built_image) continue shutil.rmtree(build_dir) # Get the image count if not built_image: result_object['status'] = 'error' result_object['message'] = 'Unable to build dockerfile.' return return built_image except Exception as exc: logger.exception('Exception when processing request.') result_object['status'] = 'error' result_object['message'] = str(exc.message) @staticmethod def __push_image(built_image, token, tag_name, result_object): try: # Login to the registry host = re.match(r'([a-z0-9.:]+)/.+/.+$', tag_name) if not host: raise Exception('Invalid tag name: %s' % tag_name) docker_cl = Client(timeout=1200) for protocol in ['https', 'http']: registry_endpoint = '%s://%s/v1/' % (protocol, host.group(1)) logger.debug('Attempting login to registry: %s' % registry_endpoint) try: docker_cl.login('$token', token, registry=registry_endpoint) break except APIError: pass # Probably the wrong protocol history = json.loads(docker_cl.history(built_image)) num_images = len(history) result_object['total_images'] = num_images result_object['status'] = 'pushing' logger.debug('Pushing to tag name: %s' % tag_name) resp = docker_cl.push(tag_name, stream=True) for status_str in resp: status = json.loads(status_str) logger.debug('Status: %s', status_str) if u'status' in status: status_msg = status[u'status'] if status_msg == 'Pushing': if u'progressDetail' in status and u'id' in status: image_id = status[u'id'] detail = status[u'progressDetail'] if u'current' in detail and 'total' in detail: images = result_object['image_completion'] images[image_id] = detail result_object['push_completion'] = \ DockerfileBuildWorker.__total_completion(images, num_images) elif u'errorDetail' in status: result_object['status'] = 'error' if u'message' in status[u'errorDetail']: result_object['message'] = str(status[u'errorDetail'][u'message']) return result_object['status'] = 'complete' except Exception as exc: logger.exception('Exception when processing request.') result_object['status'] = 'error' result_object['message'] = str(exc.message) @staticmethod def __cleanup(): docker_cl = Client(timeout=1200) # First clean up any containers that might be holding the images for running in docker_cl.containers(quiet=True): docker_cl.kill(running['Id']) # Next, remove all of the containers (which should all now be killed) for container in docker_cl.containers(all=True, quiet=True): docker_cl.remove_container(container['Id']) # Iterate all of the images and remove the ones that the public registry # doesn't know about, this should preserve base images. images_to_remove = set() repos = set() for image in docker_cl.images(): images_to_remove.add(image['Id']) repos.add(image['Repository']) for repo in repos: repo_url = 'https://index.docker.io/v1/repositories/%s/images' % repo repo_info = requests.get(repo_url) if repo_info.status_code / 100 == 2: for repo_image in repo_info.json(): if repo_image['id'] in images_to_remove: logger.debug('Image was deemed public: %s' % repo_image['id']) images_to_remove.remove(repo_image['id']) for to_remove in images_to_remove: logger.debug('Removing private image: %s' % to_remove) try: docker_cl.remove_image(to_remove) except APIError: # Sometimes an upstream image removed this one pass # Verify that our images were actually removed for image in docker_cl.images(): if image['Id'] in images_to_remove: raise RuntimeError('Image was not removed: %s' % image['Id']) def process_queue_item(self, job_details): repository_build = model.get_repository_build(job_details['build_id']) user_files = app.config['USERFILES'] resource_url = user_files.get_file_url(repository_build.resource_key) tag_name = repository_build.tag access_token = repository_build.access_token.code feedback = { 'total_commands': None, 'current_command': None, 'push_completion': 0.0, 'status': 'waiting', 'message': None, 'image_completion': {}, } logger.debug('Starting job with resource url: %s tag: %s and token: %s' % (resource_url, tag_name, access_token)) docker_resource = requests.get(resource_url) c_type = docker_resource.headers['content-type'] logger.info('Request to build file of type: %s with tag: %s' % (c_type, tag_name)) if c_type not in self._mime_processors: raise Exception('Invalid dockerfile content type: %s' % c_type) build_dir = self._mime_processors[c_type](docker_resource) dockerfile_path = os.path.join(build_dir, "Dockerfile") num_steps = DockerfileBuildWorker.__count_steps(dockerfile_path) logger.debug('Dockerfile had %s steps' % num_steps) built_image = DockerfileBuildWorker.__build_image(build_dir, tag_name, num_steps, feedback) DockerfileBuildWorker.__push_image(built_image, access_token, tag_name, feedback) DockerfileBuildWorker.__cleanup() desc = 'Worker daemon to monitor dockerfile build' parser = argparse.ArgumentParser(description=desc) parser.add_argument('-D', action='store_true', default=False, help='Run the worker in daemon mode.') parser.add_argument('--log', default='dockerfilebuild.log', help='Specify the log file for the worker as a daemon.') args = parser.parse_args() worker = DockerfileBuildWorker(dockerfile_build_queue) if args.D: handler = logging.FileHandler(args.log) handler.setFormatter(formatter) root_logger.addHandler(handler) with daemon.DaemonContext(files_preserve=[handler.stream]): worker.start() else: handler = logging.StreamHandler() handler.setFormatter(formatter) root_logger.addHandler(handler) worker.start()