382 lines
		
	
	
		
			No EOL
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			382 lines
		
	
	
		
			No EOL
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| import daemon
 | |
| import argparse
 | |
| import os
 | |
| import requests
 | |
| import re
 | |
| import json
 | |
| import shutil
 | |
| 
 | |
| from docker import Client, APIError
 | |
| from tempfile import TemporaryFile, mkdtemp
 | |
| from zipfile import ZipFile
 | |
| from functools import partial
 | |
| 
 | |
| from data.queue import dockerfile_build_queue
 | |
| from data import model
 | |
| from workers.worker import Worker
 | |
| from app import app
 | |
| 
 | |
| 
 | |
| root_logger = logging.getLogger('')
 | |
| root_logger.setLevel(logging.DEBUG)
 | |
| 
 | |
| FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - %(funcName)s - %(message)s'
 | |
| formatter = logging.Formatter(FORMAT)
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| user_files = app.config['USERFILES']
 | |
| build_logs = app.config['BUILDLOGS']
 | |
| 
 | |
| 
 | |
| class StatusWrapper(object):
 | |
|   def __init__(self, build_uuid):
 | |
|     self._uuid = build_uuid
 | |
|     self._status = {
 | |
|       'total_commands': None,
 | |
|       'current_command': None,
 | |
|       'push_completion': 0.0,
 | |
|       'image_completion': {},
 | |
|     }
 | |
| 
 | |
|     self.__exit__(None, None, None)
 | |
| 
 | |
|   def __enter__(self):
 | |
|     return self._status
 | |
| 
 | |
|   def __exit__(self, exc_type, value, traceback):
 | |
|     build_logs.set_status(self._uuid, self._status)
 | |
| 
 | |
| 
 | |
| def unwrap_stream(json_stream):
 | |
|   for json_entry in json_stream:
 | |
|     yield json.loads(json_entry).values()[0]
 | |
| 
 | |
| 
 | |
| class DockerfileBuildContext(object):
 | |
|   def __init__(self, build_context_dir, dockerfile_subdir, repo, tag_names,
 | |
|                push_token, build_uuid):
 | |
|     self._build_dir = build_context_dir
 | |
|     self._dockerfile_subdir = dockerfile_subdir
 | |
|     self._repo = repo
 | |
|     self._tag_names = tag_names
 | |
|     self._push_token = push_token
 | |
|     self._cl = Client(timeout=1200)
 | |
|     self._status = StatusWrapper(build_uuid)
 | |
|     self._build_logger = partial(build_logs.append_log_message, build_uuid)
 | |
| 
 | |
|     dockerfile_path = os.path.join(self._build_dir, dockerfile_subdir,
 | |
|                                    "Dockerfile")
 | |
|     self._num_steps = DockerfileBuildContext.__count_steps(dockerfile_path)
 | |
| 
 | |
|     logger.debug('Will build and push to repo %s with tags named: %s' %
 | |
|                  (self._repo, self._tag_names))
 | |
| 
 | |
|   def __enter__(self):
 | |
|     return self
 | |
| 
 | |
|   def __exit__(self, exc_type, value, traceback):
 | |
|     self.__cleanup()
 | |
| 
 | |
|     shutil.rmtree(self._build_dir)
 | |
| 
 | |
|   @staticmethod
 | |
|   def __count_steps(dockerfile_path):
 | |
|     with open(dockerfile_path, 'r') as dockerfileobj:
 | |
|       steps = 0
 | |
|       for line in dockerfileobj.readlines():
 | |
|         stripped = line.strip()
 | |
|         if stripped and stripped[0] is not '#':
 | |
|           steps += 1
 | |
|       return steps
 | |
| 
 | |
|   @staticmethod
 | |
|   def __total_completion(statuses, total_images):
 | |
|     percentage_with_sizes = float(len(statuses.values()))/total_images
 | |
|     sent_bytes = sum([status[u'current'] for status in statuses.values()])
 | |
|     total_bytes = sum([status[u'total'] for status in statuses.values()])
 | |
|     return float(sent_bytes)/total_bytes*percentage_with_sizes
 | |
| 
 | |
|   def build(self):
 | |
|     logger.debug('Starting build.')
 | |
| 
 | |
|     with self._status as status:
 | |
|       status['total_commands'] = self._num_steps
 | |
| 
 | |
|     logger.debug('Building to tags named: %s' % self._tag_names)
 | |
|     context_path = os.path.join(self._build_dir, self._dockerfile_subdir)
 | |
| 
 | |
|     logger.debug('Final context path: %s exists: %s' %
 | |
|                  (context_path, os.path.exists(context_path)))
 | |
| 
 | |
|     build_status = self._cl.build(path=context_path, stream=True)
 | |
| 
 | |
|     current_step = 0
 | |
|     built_image = None
 | |
|     for status in unwrap_stream(build_status):
 | |
|       fully_unwrapped = ""
 | |
|       if isinstance(status, dict):
 | |
|         if len(status) > 0:
 | |
|           fully_unwrapped = status.values()[0]
 | |
|       elif isinstance(status, basestring):
 | |
|         fully_unwrapped = status
 | |
| 
 | |
|       status_str = str(fully_unwrapped.encode('utf-8'))
 | |
|       logger.debug('Status: %s', status_str)
 | |
|       step_increment = re.search(r'Step ([0-9]+) :', status_str)
 | |
|       if step_increment:
 | |
|         self._build_logger(status_str, build_logs.COMMAND)
 | |
|         current_step = int(step_increment.group(1))
 | |
|         logger.debug('Step now: %s/%s' % (current_step, self._num_steps))
 | |
|         with self._status as status_update:
 | |
|           status_update['current_command'] = current_step
 | |
|         continue
 | |
|       else:
 | |
|         self._build_logger(status_str)
 | |
| 
 | |
|       complete = re.match(r'Successfully built ([a-z0-9]+)$', status_str)
 | |
|       if complete:
 | |
|         built_image = complete.group(1)
 | |
|         logger.debug('Final image ID is: %s' % built_image)
 | |
|         continue
 | |
| 
 | |
|     # Get the image count
 | |
|     if not built_image:
 | |
|       return
 | |
| 
 | |
|     return built_image
 | |
| 
 | |
|   def push(self, built_image):
 | |
|     # Login to the registry
 | |
|     host = re.match(r'([a-z0-9.:]+)/.+/.+$', self._repo)
 | |
|     if not host:
 | |
|       raise RuntimeError('Invalid repo name: %s' % self._repo)
 | |
| 
 | |
|     for protocol in ['https', 'http']:
 | |
|       registry_endpoint = '%s://%s/v1/' % (protocol, host.group(1))
 | |
|       logger.debug('Attempting login to registry: %s' % registry_endpoint)
 | |
| 
 | |
|       try:
 | |
|         self._cl.login('$token', self._push_token, registry=registry_endpoint)
 | |
|         break
 | |
|       except APIError:
 | |
|         pass  # Probably the wrong protocol
 | |
| 
 | |
|     for tag in self._tag_names:
 | |
|       logger.debug('Tagging image %s as %s:%s' %
 | |
|                    (built_image, self._repo, tag))
 | |
|       self._cl.tag(built_image, self._repo, tag)
 | |
| 
 | |
|     history = json.loads(self._cl.history(built_image))
 | |
|     num_images = len(history)
 | |
|     with self._status as status:
 | |
|       status['total_images'] = num_images
 | |
| 
 | |
|     logger.debug('Pushing to repo %s' % self._repo)
 | |
|     resp = self._cl.push(self._repo, stream=True)
 | |
| 
 | |
|     for status_str in resp:
 | |
|       status = json.loads(status_str)
 | |
|       logger.debug('Status: %s', status_str)
 | |
|       if u'status' in status:
 | |
|         status_msg = status[u'status']
 | |
| 
 | |
|         if status_msg == 'Pushing':
 | |
|           if u'progressDetail' in status and u'id' in status:
 | |
|             image_id = status[u'id']
 | |
|             detail = status[u'progressDetail']
 | |
| 
 | |
|             if u'current' in detail and 'total' in detail:
 | |
|               with self._status as status:
 | |
|                 images = status['image_completion']
 | |
| 
 | |
|                 images[image_id] = detail
 | |
|                 status['push_completion'] = \
 | |
|                   DockerfileBuildContext.__total_completion(images, num_images)
 | |
| 
 | |
|       elif u'errorDetail' in status:
 | |
|         message = 'Error pushing image.'
 | |
|         if u'message' in status[u'errorDetail']:
 | |
|           message = str(status[u'errorDetail'][u'message'])
 | |
| 
 | |
|         raise RuntimeError(message)
 | |
| 
 | |
|   def __cleanup(self):
 | |
|     # First clean up any containers that might be holding the images
 | |
|     for running in self._cl.containers(quiet=True):
 | |
|       logger.debug('Killing container: %s' % running['Id'])
 | |
|       self._cl.kill(running['Id'])
 | |
| 
 | |
|     # Next, remove all of the containers (which should all now be killed)
 | |
|     for container in self._cl.containers(all=True, quiet=True):
 | |
|       logger.debug('Removing container: %s' % container['Id'])
 | |
|       self._cl.remove_container(container['Id'])
 | |
| 
 | |
|     # Iterate all of the images and remove the ones that the public registry
 | |
|     # doesn't know about, this should preserve base images.
 | |
|     images_to_remove = set()
 | |
|     repos = set()
 | |
|     for image in self._cl.images():
 | |
|       images_to_remove.add(image['Id'])
 | |
| 
 | |
|       for tag in image['RepoTags']:
 | |
|         tag_repo = tag.split(':')[0]
 | |
|         if tag_repo != '<none>':
 | |
|           repos.add(tag_repo)
 | |
| 
 | |
|     for repo in repos:
 | |
|       repo_url = 'https://index.docker.io/v1/repositories/%s/images' % repo
 | |
|       repo_info = requests.get(repo_url)
 | |
|       if repo_info.status_code / 100 == 2:      
 | |
|         for repo_image in repo_info.json():
 | |
|           if repo_image['id'] in images_to_remove:
 | |
|             logger.debug('Image was deemed public: %s' % repo_image['id'])
 | |
|             images_to_remove.remove(repo_image['id'])
 | |
| 
 | |
|     for to_remove in images_to_remove:
 | |
|       logger.debug('Removing private image: %s' % to_remove)
 | |
|       try:
 | |
|         self._cl.remove_image(to_remove)
 | |
|       except APIError:
 | |
|         # Sometimes an upstream image removed this one
 | |
|         pass
 | |
| 
 | |
|     # Verify that our images were actually removed
 | |
|     for image in self._cl.images():
 | |
|       if image['Id'] in images_to_remove:
 | |
|         raise RuntimeError('Image was not removed: %s' % image['Id'])
 | |
| 
 | |
| 
 | |
| class DockerfileBuildWorker(Worker):
 | |
|   def __init__(self, *vargs, **kwargs):
 | |
|     super(DockerfileBuildWorker, self).__init__(*vargs, **kwargs)
 | |
| 
 | |
|     self._mime_processors = {
 | |
|       'application/zip': DockerfileBuildWorker.__prepare_zip,
 | |
|       'application/x-zip-compressed': DockerfileBuildWorker.__prepare_zip,
 | |
|       'text/plain': DockerfileBuildWorker.__prepare_dockerfile,
 | |
|       'application/octet-stream': DockerfileBuildWorker.__prepare_dockerfile,
 | |
|     }
 | |
| 
 | |
|   @staticmethod
 | |
|   def __prepare_zip(request_file):
 | |
|     build_dir = mkdtemp(prefix='docker-build-')
 | |
| 
 | |
|     # Save the zip file to temp somewhere
 | |
|     with TemporaryFile() as zip_file:
 | |
|       zip_file.write(request_file.content)
 | |
|       to_extract = ZipFile(zip_file)
 | |
|       to_extract.extractall(build_dir)
 | |
| 
 | |
|     return build_dir
 | |
| 
 | |
|   @staticmethod
 | |
|   def __prepare_dockerfile(request_file):
 | |
|     build_dir = mkdtemp(prefix='docker-build-')
 | |
|     dockerfile_path = os.path.join(build_dir, "Dockerfile")
 | |
|     with open(dockerfile_path, 'w') as dockerfile:
 | |
|       dockerfile.write(request_file.content)
 | |
| 
 | |
|     return build_dir
 | |
| 
 | |
|   def process_queue_item(self, job_details):
 | |
|     repository_build = model.get_repository_build(job_details['namespace'],
 | |
|                                                   job_details['repository'],
 | |
|                                                   job_details['build_uuid'])
 | |
| 
 | |
|     job_config = json.loads(repository_build.job_config)
 | |
| 
 | |
|     resource_url = user_files.get_file_url(repository_build.resource_key)
 | |
|     tag_names = job_config['docker_tags']
 | |
|     build_subdir = job_config['build_subdir']
 | |
|     repo = job_config['repository']
 | |
| 
 | |
|     access_token = repository_build.access_token.code
 | |
| 
 | |
|     log_appender = partial(build_logs.append_log_message,
 | |
|                            repository_build.uuid)
 | |
| 
 | |
|     log_appender('initializing', build_logs.PHASE)
 | |
| 
 | |
|     start_msg = ('Starting job with resource url: %s repo: %s' % (resource_url,
 | |
|                                                                   repo))
 | |
|     log_appender(start_msg)
 | |
| 
 | |
|     docker_resource = requests.get(resource_url)
 | |
|     c_type = docker_resource.headers['content-type']
 | |
| 
 | |
|     filetype_msg = ('Request to build type: %s with repo: %s and tags: %s' %
 | |
|                     (c_type, repo, tag_names))
 | |
|     logger.info(filetype_msg)
 | |
|     log_appender(filetype_msg)
 | |
| 
 | |
|     if c_type not in self._mime_processors:
 | |
|       log_appender('error', build_logs.PHASE)
 | |
|       repository_build.phase = 'error'
 | |
|       repository_build.save()
 | |
|       log_appender('Unknown mime-type: %s' % c_type, build_logs.ERROR)
 | |
|       return True
 | |
| 
 | |
|     build_dir = self._mime_processors[c_type](docker_resource)
 | |
|     log_appender('building', build_logs.PHASE)
 | |
|     repository_build.phase = 'building'
 | |
|     repository_build.save()
 | |
| 
 | |
|     with DockerfileBuildContext(build_dir, build_subdir, repo, tag_names,
 | |
|                                 access_token,
 | |
|                                 repository_build.uuid) as build_ctxt:
 | |
|       try:
 | |
|         built_image = build_ctxt.build()
 | |
| 
 | |
|         if not built_image:
 | |
|           log_appender('error', build_logs.PHASE)
 | |
|           repository_build.phase = 'error'
 | |
|           repository_build.save()
 | |
|           log_appender('Unable to build dockerfile.', build_logs.ERROR)
 | |
|           return True
 | |
| 
 | |
|         log_appender('pushing', build_logs.PHASE)
 | |
|         repository_build.phase = 'pushing'
 | |
|         repository_build.save()
 | |
| 
 | |
|         build_ctxt.push(built_image)
 | |
| 
 | |
|         log_appender('complete', build_logs.PHASE)
 | |
|         repository_build.phase = 'complete'
 | |
|         repository_build.save()
 | |
| 
 | |
|       except Exception as exc:
 | |
|         log_appender('error', build_logs.PHASE)
 | |
|         logger.exception('Exception when processing request.')
 | |
|         repository_build.phase = 'error'
 | |
|         repository_build.save()
 | |
|         log_appender(str(exc), build_logs.ERROR)
 | |
|         return True
 | |
| 
 | |
|     return True
 | |
| 
 | |
| 
 | |
| desc = 'Worker daemon to monitor dockerfile build'
 | |
| parser = argparse.ArgumentParser(description=desc)
 | |
| parser.add_argument('-D', action='store_true', default=False,
 | |
|                     help='Run the worker in daemon mode.')
 | |
| parser.add_argument('--log', default='dockerfilebuild.log',
 | |
|                     help='Specify the log file for the worker as a daemon.')
 | |
| args = parser.parse_args()
 | |
| 
 | |
| 
 | |
| worker = DockerfileBuildWorker(dockerfile_build_queue)
 | |
| 
 | |
| if args.D:
 | |
|   handler = logging.FileHandler(args.log)
 | |
|   handler.setFormatter(formatter)
 | |
|   root_logger.addHandler(handler)
 | |
|   with daemon.DaemonContext(files_preserve=[handler.stream]):
 | |
|     worker.start()
 | |
| 
 | |
| else:
 | |
|   handler = logging.StreamHandler()
 | |
|   handler.setFormatter(formatter)
 | |
|   root_logger.addHandler(handler)
 | |
|   worker.start() |