import logging import daemon import argparse import os import requests import re import json import shutil import tarfile from docker import Client from docker.errors import APIError from tempfile import TemporaryFile, mkdtemp from zipfile import ZipFile from functools import partial from datetime import datetime, timedelta from threading import Event from uuid import uuid4 from collections import defaultdict from data.queue import dockerfile_build_queue from data import model from workers.worker import Worker, WorkerUnhealthyException, JobException from app import app, userfiles as user_files, build_logs from util.safetar import safe_extractall from util.dockerfileparse import parse_dockerfile, ParsedDockerfile, serialize_dockerfile root_logger = logging.getLogger('') root_logger.setLevel(logging.DEBUG) FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - %(funcName)s - %(message)s' formatter = logging.Formatter(FORMAT) logger = logging.getLogger(__name__) TIMEOUT_PERIOD_MINUTES = 20 CACHE_EXPIRATION_PERIOD_HOURS = 24 NO_TAGS = [':'] RESERVATION_TIME = (TIMEOUT_PERIOD_MINUTES + 5) * 60 class StatusWrapper(object): def __init__(self, build_uuid): self._uuid = build_uuid self._status = { 'total_commands': None, 'current_command': None, 'push_completion': 0.0, 'pull_completion': 0.0, } self.__exit__(None, None, None) def __enter__(self): return self._status def __exit__(self, exc_type, value, traceback): build_logs.set_status(self._uuid, self._status) class _IncompleteJsonError(Exception): def __init__(self, start_from): self.start_from = start_from class _StreamingJSONDecoder(json.JSONDecoder): FLAGS = re.VERBOSE | re.MULTILINE | re.DOTALL WHITESPACE = re.compile(r'[ \t\n\r]*', FLAGS) def decode(self, s, _w=WHITESPACE.match): """Return the Python representation of ``s`` (a ``str`` or ``unicode`` instance containing a JSON document) """ start_from = 0 while start_from < len(s): try: obj, end = self.raw_decode(s[start_from:], idx=_w(s[start_from:], 0).end()) except ValueError: raise _IncompleteJsonError(start_from) end = _w(s[start_from:], end).end() start_from += end yield obj class StreamingDockerClient(Client): def _stream_helper(self, response): """Generator for data coming from a chunked-encoded HTTP response.""" content_buf = '' for content in response.iter_content(chunk_size=256): content_buf += content try: for val in json.loads(content_buf, cls=_StreamingJSONDecoder): yield val content_buf = '' except _IncompleteJsonError as exc: content_buf = content_buf[exc.start_from:] class DockerfileBuildContext(object): def __init__(self, build_context_dir, dockerfile_subdir, repo, tag_names, push_token, build_uuid, cache_size_gb, pull_credentials=None): self._build_dir = build_context_dir self._dockerfile_subdir = dockerfile_subdir self._repo = repo self._tag_names = tag_names self._push_token = push_token self._status = StatusWrapper(build_uuid) self._build_logger = partial(build_logs.append_log_message, build_uuid) self._pull_credentials = pull_credentials self._cache_size_gb = cache_size_gb # Note: We have two different clients here because we (potentially) login # with both, but with different credentials that we do not want shared between # the build and push operations. self._push_cl = StreamingDockerClient(timeout=1200) self._build_cl = StreamingDockerClient(timeout=1200) dockerfile_path = os.path.join(self._build_dir, dockerfile_subdir, 'Dockerfile') if not os.path.exists(dockerfile_path): raise RuntimeError('Build job did not contain a Dockerfile.') # Compute the number of steps with open(dockerfile_path, 'r') as dockerfileobj: self._parsed_dockerfile = parse_dockerfile(dockerfileobj.read()) self.__inject_quay_repo_env(self._parsed_dockerfile, repo) self._num_steps = len(self._parsed_dockerfile.commands) with open(dockerfile_path, 'w') as dockerfileobj: dockerfileobj.write(serialize_dockerfile(self._parsed_dockerfile)) logger.debug('Will build and push to repo %s with tags named: %s', self._repo, self._tag_names) def __enter__(self): try: self.__cleanup_containers() self.__cleanup_images() self.__prune_cache() except APIError: message = 'Docker installation is no longer healthy.' logger.exception(message) raise WorkerUnhealthyException(message) return self def __exit__(self, exc_type, value, traceback): self.__cleanup_containers() shutil.rmtree(self._build_dir) @staticmethod def __inject_quay_repo_env(parsed_dockerfile, quay_reponame): env_command = { 'command': 'ENV', 'parameters': 'QUAY_REPOSITORY %s' % quay_reponame } for index, command in reversed(list(enumerate(parsed_dockerfile.commands))): if command['command'] == 'FROM': new_command_index = index + 1 logger.debug('Injecting env command at dockerfile index: %s', new_command_index) parsed_dockerfile.commands.insert(new_command_index, env_command) break @staticmethod def __total_completion(statuses, total_images): percentage_with_sizes = float(len(statuses.values()))/total_images sent_bytes = sum([status['current'] for status in statuses.values()]) total_bytes = sum([status['total'] for status in statuses.values()]) return float(sent_bytes)/total_bytes*percentage_with_sizes @staticmethod def __monitor_completion(status_stream, required_message, status_updater, status_completion_key, num_images=0): images = {} for status in status_stream: logger.debug('%s: %s', status_completion_key, status) if 'status' in status: status_msg = status['status'] if status_msg == required_message: if 'progressDetail' in status and 'id' in status: image_id = status['id'] detail = status['progressDetail'] if 'current' in detail and 'total' in detail: images[image_id] = detail with status_updater as status_update: status_update[status_completion_key] = \ DockerfileBuildContext.__total_completion(images, max(len(images), num_images)) elif 'errorDetail' in status: message = 'Error pushing image.' if 'message' in status['errorDetail']: message = str(status['errorDetail']['message']) raise RuntimeError(message) def pull(self): # Login with the specified credentials (if any). if self._pull_credentials: logger.debug('Logging in with pull credentials: %s@%s', self._pull_credentials['username'], self._pull_credentials['registry']) self._build_cl.login(self._pull_credentials['username'], self._pull_credentials['password'], registry=self._pull_credentials['registry'], reauth=True) # Pull the image, in case it was updated since the last build image_and_tag = ':'.join(self._parsed_dockerfile.get_image_and_tag()) self._build_logger('Pulling base image: %s' % image_and_tag) pull_status = self._build_cl.pull(image_and_tag, stream=True) self.__monitor_completion(pull_status, 'Downloading', self._status, 'pull_completion') def build(self, reservation_extension_method): # Start the build itself. logger.debug('Starting build.') with self._status as status: status['total_commands'] = self._num_steps logger.debug('Building to tags named: %s', self._tag_names) context_path = os.path.join(self._build_dir, self._dockerfile_subdir) logger.debug('Final context path: %s exists: %s', context_path, os.path.exists(context_path)) build_status = self._build_cl.build(path=context_path, stream=True) current_step = 0 built_image = None for status in build_status: fully_unwrapped = "" if isinstance(status, dict): keys_to_extract = ['error', 'status', 'stream'] for key in keys_to_extract: if key in status: fully_unwrapped = status[key] break if not fully_unwrapped: logger.debug('Status dict did not have any extractable keys and was: %s', status) elif isinstance(status, basestring): fully_unwrapped = status status_str = str(fully_unwrapped.encode('utf-8')) logger.debug('Status: %s', status_str) step_increment = re.search(r'Step ([0-9]+) :', status_str) if step_increment: self._build_logger(status_str, build_logs.COMMAND) current_step = int(step_increment.group(1)) logger.debug('Step now: %s/%s', current_step, self._num_steps) with self._status as status_update: status_update['current_command'] = current_step # Tell the queue that we're making progress every time we advance a step reservation_extension_method(RESERVATION_TIME) continue else: self._build_logger(status_str) complete = re.match(r'Successfully built ([a-z0-9]+)$', status_str) if complete: built_image = complete.group(1) logger.debug('Final image ID is: %s', built_image) continue # Get the image count if not built_image: return return built_image def push(self, built_image): # Login to the registry host = re.match(r'([a-z0-9.:]+)/.+/.+$', self._repo) if not host: raise RuntimeError('Invalid repo name: %s' % self._repo) for protocol in ['https', 'http']: registry_endpoint = '%s://%s/v1/' % (protocol, host.group(1)) logger.debug('Attempting login to registry: %s', registry_endpoint) try: self._push_cl.login('$token', self._push_token, registry=registry_endpoint) break except APIError: pass # Probably the wrong protocol for tag in self._tag_names: logger.debug('Tagging image %s as %s:%s', built_image, self._repo, tag) self._push_cl.tag(built_image, self._repo, tag) history = json.loads(self._push_cl.history(built_image)) num_images = len(history) logger.debug('Pushing to repo %s', self._repo) resp = self._push_cl.push(self._repo, stream=True) self.__monitor_completion(resp, 'Pushing', self._status, 'push_completion', num_images) def __cleanup_containers(self): # First clean up any containers that might be holding the images for running in self._build_cl.containers(quiet=True): logger.debug('Killing container: %s', running['Id']) self._build_cl.kill(running['Id']) # Next, remove all of the containers (which should all now be killed) for container in self._build_cl.containers(all=True, quiet=True): logger.debug('Removing container: %s', container['Id']) self._build_cl.remove_container(container['Id']) def __cleanup_images(self): """ Remove tags on internal nodes, and remove images older than the expiratino time. """ ids_to_images, ids_to_children = self.__compute_image_graph() # Untag all internal nodes, which are usually the base images for internal_id in ids_to_children.keys(): internal = ids_to_images[internal_id] if internal['RepoTags'] != NO_TAGS: for tag_name in internal['RepoTags']: self._build_cl.remove_image(tag_name) # Make sure all of the leaves have gibberish tags, and remove those older than our expiration leaves = set(ids_to_images.keys()) - set(ids_to_children.keys()) now = datetime.now() for leaf_id in leaves: leaf = ids_to_images[leaf_id] created = datetime.fromtimestamp(leaf['Created']) expiration = created + timedelta(hours=CACHE_EXPIRATION_PERIOD_HOURS) if expiration > now: # Assign a new tag as a uuid to preserve this image new_tag = str(uuid4()) self._build_cl.tag(leaf['Id'], new_tag) # Remove all of the existing tags if leaf['RepoTags'] != NO_TAGS: for tag_name in leaf['RepoTags']: self._build_cl.remove_image(tag_name) def __prune_cache(self): """ Remove the oldest leaf image until the cache size is the desired size. """ logger.debug('Pruning cache to size(gb): %s', self._cache_size_gb) while self.__compute_cache_size_gb() > self._cache_size_gb: logger.debug('Locating the oldest image in the cache to prune.') # Find the oldest tagged image and remove it oldest_creation_time = datetime.max oldest_image = None for image in self._build_cl.images(): created = datetime.fromtimestamp(image['Created']) if created < oldest_creation_time: oldest_creation_time = created oldest_image = image logger.debug('Removing oldest image from cache: %s', oldest_image['Id']) # Remove all tags on the oldest image if oldest_image['RepoTags'] == NO_TAGS: # Remove the image id directly since there are no tags self._build_cl.remove_image(oldest_image['Id']) else: # Remove all tags for tag_name in oldest_image['RepoTags']: self._build_cl.remove_image(tag_name) def __compute_cache_size_gb(self): all_images = self._build_cl.images(all=True) size_in_bytes = sum([img['Size'] for img in all_images]) size_in_gb = float(size_in_bytes)/1024/1024/1024 logger.debug('Computed cache size(gb) of: %s', size_in_gb) return size_in_gb def __compute_image_graph(self): all_images = self._build_cl.images(all=True) ids_to_images = {} ids_to_children = defaultdict(list) for image in all_images: if image['ParentId'] != '': ids_to_children[image['ParentId']].append(image) ids_to_images[image['Id']] = image return (ids_to_images, ids_to_children) class DockerfileBuildWorker(Worker): def __init__(self, cache_size_gb, *vargs, **kwargs): super(DockerfileBuildWorker, self).__init__(*vargs, **kwargs) self._mime_processors = { 'application/zip': DockerfileBuildWorker.__prepare_zip, 'application/x-zip-compressed': DockerfileBuildWorker.__prepare_zip, 'text/plain': DockerfileBuildWorker.__prepare_dockerfile, 'application/octet-stream': DockerfileBuildWorker.__prepare_dockerfile, 'application/x-tar': DockerfileBuildWorker.__prepare_tarball, 'application/gzip': DockerfileBuildWorker.__prepare_tarball, 'application/x-gzip': DockerfileBuildWorker.__prepare_tarball, } self._timeout = Event() self._cache_size_gb = cache_size_gb @staticmethod def __prepare_zip(request_file): build_dir = mkdtemp(prefix='docker-build-') # Save the zip file to temp somewhere with TemporaryFile() as zip_file: zip_file.write(request_file.content) to_extract = ZipFile(zip_file) to_extract.extractall(build_dir) return build_dir @staticmethod def __prepare_dockerfile(request_file): build_dir = mkdtemp(prefix='docker-build-') dockerfile_path = os.path.join(build_dir, "Dockerfile") with open(dockerfile_path, 'w') as dockerfile: dockerfile.write(request_file.content) return build_dir @staticmethod def __prepare_tarball(request_file): build_dir = mkdtemp(prefix='docker-build-') # Save the zip file to temp somewhere with tarfile.open(mode='r|*', fileobj=request_file.raw) as tar_stream: safe_extractall(tar_stream, build_dir) return build_dir def watchdog(self): logger.debug('Running build watchdog code.') docker_cl = Client() # Iterate the running containers and kill ones that have been running more than 20 minutes for container in docker_cl.containers(): start_time = datetime.fromtimestamp(container['Created']) running_time = datetime.now() - start_time if running_time > timedelta(minutes=TIMEOUT_PERIOD_MINUTES): logger.warning('Container has been running too long: %s with command: %s', container['Id'], container['Command']) docker_cl.kill(container['Id']) self._timeout.set() def process_queue_item(self, job_details): self._timeout.clear() repository_build = model.get_repository_build(job_details['namespace'], job_details['repository'], job_details['build_uuid']) pull_credentials = job_details.get('pull_credentials', None) job_config = json.loads(repository_build.job_config) resource_url = user_files.get_file_url(repository_build.resource_key) tag_names = job_config['docker_tags'] build_subdir = job_config['build_subdir'] repo = job_config['repository'] access_token = repository_build.access_token.code log_appender = partial(build_logs.append_log_message, repository_build.uuid) log_appender('initializing', build_logs.PHASE) start_msg = ('Starting job with resource url: %s repo: %s' % (resource_url, repo)) logger.debug(start_msg) docker_resource = requests.get(resource_url, stream=True) c_type = docker_resource.headers['content-type'] if ';' in c_type: c_type = c_type.split(';')[0] filetype_msg = ('Request to build type: %s with repo: %s and tags: %s' % (c_type, repo, tag_names)) logger.info(filetype_msg) log_appender(filetype_msg) if c_type not in self._mime_processors: log_appender('error', build_logs.PHASE) repository_build.phase = 'error' repository_build.save() message = 'Unknown mime-type: %s' % c_type log_appender(message, build_logs.ERROR) raise JobException(message) build_dir = self._mime_processors[c_type](docker_resource) try: with DockerfileBuildContext(build_dir, build_subdir, repo, tag_names, access_token, repository_build.uuid, self._cache_size_gb, pull_credentials) as build_ctxt: log_appender('pulling', build_logs.PHASE) repository_build.phase = 'pulling' repository_build.save() build_ctxt.pull() self.extend_processing(RESERVATION_TIME) log_appender('building', build_logs.PHASE) repository_build.phase = 'building' repository_build.save() built_image = build_ctxt.build(self.extend_processing) if not built_image: log_appender('error', build_logs.PHASE) repository_build.phase = 'error' repository_build.save() message = 'Unable to build dockerfile.' if self._timeout.is_set(): message = 'Build step was terminated after %s minutes.' % TIMEOUT_PERIOD_MINUTES log_appender(message, build_logs.ERROR) raise JobException(message) self.extend_processing(RESERVATION_TIME) log_appender('pushing', build_logs.PHASE) repository_build.phase = 'pushing' repository_build.save() build_ctxt.push(built_image) log_appender('complete', build_logs.PHASE) repository_build.phase = 'complete' repository_build.save() except WorkerUnhealthyException as exc: # Need a separate handler for this so it doesn't get caught by catch all below raise exc except Exception as exc: log_appender('error', build_logs.PHASE) logger.exception('Exception when processing request.') repository_build.phase = 'error' repository_build.save() log_appender(str(exc), build_logs.ERROR) raise JobException(str(exc)) desc = 'Worker daemon to monitor dockerfile build' parser = argparse.ArgumentParser(description=desc) parser.add_argument('-D', action='store_true', default=False, help='Run the worker in daemon mode.') parser.add_argument('--log', default='dockerfilebuild.log', help='Specify the log file for the worker as a daemon.') parser.add_argument('--cachegb', default=20, type=float, help='Maximum cache size in gigabytes.') args = parser.parse_args() worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue, reservation_seconds=RESERVATION_TIME) if args.D: handler = logging.FileHandler(args.log) handler.setFormatter(formatter) root_logger.addHandler(handler) with daemon.DaemonContext(files_preserve=[handler.stream]): worker.start() else: handler = logging.StreamHandler() handler.setFormatter(formatter) root_logger.addHandler(handler) worker.start()