This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/workers/dockerfilebuild.py

428 lines
No EOL
14 KiB
Python

import logging
import daemon
import argparse
import os
import requests
import re
import json
import shutil
import tarfile
from docker import Client, APIError
from tempfile import TemporaryFile, mkdtemp
from zipfile import ZipFile
from functools import partial
from data.queue import dockerfile_build_queue
from data import model
from workers.worker import Worker
from app import app
root_logger = logging.getLogger('')
root_logger.setLevel(logging.DEBUG)
FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - %(funcName)s - %(message)s'
formatter = logging.Formatter(FORMAT)
logger = logging.getLogger(__name__)
user_files = app.config['USERFILES']
build_logs = app.config['BUILDLOGS']
class StatusWrapper(object):
def __init__(self, build_uuid):
self._uuid = build_uuid
self._status = {
'total_commands': None,
'current_command': None,
'push_completion': 0.0,
'image_completion': {},
}
self.__exit__(None, None, None)
def __enter__(self):
return self._status
def __exit__(self, exc_type, value, traceback):
build_logs.set_status(self._uuid, self._status)
class _IncompleteJsonError(Exception):
def __init__(self, start_from):
self.start_from = start_from
class _StreamingJSONDecoder(json.JSONDecoder):
FLAGS = re.VERBOSE | re.MULTILINE | re.DOTALL
WHITESPACE = re.compile(r'[ \t\n\r]*', FLAGS)
def decode(self, s, _w=WHITESPACE.match):
"""Return the Python representation of ``s`` (a ``str`` or ``unicode``
instance containing a JSON document)
"""
start_from = 0
while start_from < len(s):
try:
obj, end = self.raw_decode(s[start_from:], idx=_w(s[start_from:], 0).end())
except ValueError:
raise _IncompleteJsonError(start_from)
end = _w(s[start_from:], end).end()
start_from += end
yield obj
class StreamingDockerClient(Client):
def _stream_helper(self, response):
"""Generator for data coming from a chunked-encoded HTTP response."""
content_buf = ''
for content in response.iter_content(chunk_size=256):
content_buf += content
try:
for val in json.loads(content_buf, cls=_StreamingJSONDecoder):
yield val
content_buf = ''
except _IncompleteJsonError as exc:
content_buf = content_buf[exc.start_from:]
class DockerfileBuildContext(object):
def __init__(self, build_context_dir, dockerfile_subdir, repo, tag_names,
push_token, build_uuid):
self._build_dir = build_context_dir
self._dockerfile_subdir = dockerfile_subdir
self._repo = repo
self._tag_names = tag_names
self._push_token = push_token
self._cl = StreamingDockerClient(timeout=1200)
self._status = StatusWrapper(build_uuid)
self._build_logger = partial(build_logs.append_log_message, build_uuid)
dockerfile_path = os.path.join(self._build_dir, dockerfile_subdir,
'Dockerfile')
self._num_steps = DockerfileBuildContext.__count_steps(dockerfile_path)
logger.debug('Will build and push to repo %s with tags named: %s' %
(self._repo, self._tag_names))
def __enter__(self):
return self
def __exit__(self, exc_type, value, traceback):
self.__cleanup()
shutil.rmtree(self._build_dir)
@staticmethod
def __count_steps(dockerfile_path):
with open(dockerfile_path, 'r') as dockerfileobj:
steps = 0
for line in dockerfileobj.readlines():
stripped = line.strip()
if stripped and stripped[0] is not '#':
steps += 1
return steps
@staticmethod
def __total_completion(statuses, total_images):
percentage_with_sizes = float(len(statuses.values()))/total_images
sent_bytes = sum([status[u'current'] for status in statuses.values()])
total_bytes = sum([status[u'total'] for status in statuses.values()])
return float(sent_bytes)/total_bytes*percentage_with_sizes
def build(self):
logger.debug('Starting build.')
with self._status as status:
status['total_commands'] = self._num_steps
logger.debug('Building to tags named: %s' % self._tag_names)
context_path = os.path.join(self._build_dir, self._dockerfile_subdir)
logger.debug('Final context path: %s exists: %s' %
(context_path, os.path.exists(context_path)))
build_status = self._cl.build(path=context_path, stream=True)
current_step = 0
built_image = None
for status in build_status:
fully_unwrapped = ""
if isinstance(status, dict):
if len(status) > 0:
fully_unwrapped = status.values()[0]
elif isinstance(status, basestring):
fully_unwrapped = status
status_str = str(fully_unwrapped.encode('utf-8'))
logger.debug('Status: %s', status_str)
step_increment = re.search(r'Step ([0-9]+) :', status_str)
if step_increment:
self._build_logger(status_str, build_logs.COMMAND)
current_step = int(step_increment.group(1))
logger.debug('Step now: %s/%s' % (current_step, self._num_steps))
with self._status as status_update:
status_update['current_command'] = current_step
continue
else:
self._build_logger(status_str)
complete = re.match(r'Successfully built ([a-z0-9]+)$', status_str)
if complete:
built_image = complete.group(1)
logger.debug('Final image ID is: %s' % built_image)
continue
# Get the image count
if not built_image:
return
return built_image
def push(self, built_image):
# Login to the registry
host = re.match(r'([a-z0-9.:]+)/.+/.+$', self._repo)
if not host:
raise RuntimeError('Invalid repo name: %s' % self._repo)
for protocol in ['https', 'http']:
registry_endpoint = '%s://%s/v1/' % (protocol, host.group(1))
logger.debug('Attempting login to registry: %s' % registry_endpoint)
try:
self._cl.login('$token', self._push_token, registry=registry_endpoint)
break
except APIError:
pass # Probably the wrong protocol
for tag in self._tag_names:
logger.debug('Tagging image %s as %s:%s' %
(built_image, self._repo, tag))
self._cl.tag(built_image, self._repo, tag)
history = json.loads(self._cl.history(built_image))
num_images = len(history)
with self._status as status:
status['total_images'] = num_images
logger.debug('Pushing to repo %s' % self._repo)
resp = self._cl.push(self._repo, stream=True)
for status in resp:
logger.debug('Status: %s', status)
if u'status' in status:
status_msg = status[u'status']
if status_msg == 'Pushing':
if u'progressDetail' in status and u'id' in status:
image_id = status[u'id']
detail = status[u'progressDetail']
if u'current' in detail and 'total' in detail:
with self._status as status:
images = status['image_completion']
images[image_id] = detail
status['push_completion'] = \
DockerfileBuildContext.__total_completion(images, num_images)
elif u'errorDetail' in status:
message = 'Error pushing image.'
if u'message' in status[u'errorDetail']:
message = str(status[u'errorDetail'][u'message'])
raise RuntimeError(message)
def __cleanup(self):
# First clean up any containers that might be holding the images
for running in self._cl.containers(quiet=True):
logger.debug('Killing container: %s' % running['Id'])
self._cl.kill(running['Id'])
# Next, remove all of the containers (which should all now be killed)
for container in self._cl.containers(all=True, quiet=True):
logger.debug('Removing container: %s' % container['Id'])
self._cl.remove_container(container['Id'])
# Iterate all of the images and remove the ones that the public registry
# doesn't know about, this should preserve base images.
images_to_remove = set()
repos = set()
for image in self._cl.images():
images_to_remove.add(image['Id'])
for tag in image['RepoTags']:
tag_repo = tag.split(':')[0]
if tag_repo != '<none>':
repos.add(tag_repo)
for repo in repos:
repo_url = 'https://index.docker.io/v1/repositories/%s/images' % repo
repo_info = requests.get(repo_url)
if repo_info.status_code / 100 == 2:
for repo_image in repo_info.json():
if repo_image['id'] in images_to_remove:
logger.debug('Image was deemed public: %s' % repo_image['id'])
images_to_remove.remove(repo_image['id'])
for to_remove in images_to_remove:
logger.debug('Removing private image: %s' % to_remove)
try:
self._cl.remove_image(to_remove)
except APIError:
# Sometimes an upstream image removed this one
pass
# Verify that our images were actually removed
for image in self._cl.images():
if image['Id'] in images_to_remove:
raise RuntimeError('Image was not removed: %s' % image['Id'])
class DockerfileBuildWorker(Worker):
def __init__(self, *vargs, **kwargs):
super(DockerfileBuildWorker, self).__init__(*vargs, **kwargs)
self._mime_processors = {
'application/zip': DockerfileBuildWorker.__prepare_zip,
'application/x-zip-compressed': DockerfileBuildWorker.__prepare_zip,
'text/plain': DockerfileBuildWorker.__prepare_dockerfile,
'application/octet-stream': DockerfileBuildWorker.__prepare_dockerfile,
'application/x-tar': DockerfileBuildWorker.__prepare_tarball,
'application/gzip': DockerfileBuildWorker.__prepare_tarball,
}
@staticmethod
def __prepare_zip(request_file):
build_dir = mkdtemp(prefix='docker-build-')
# Save the zip file to temp somewhere
with TemporaryFile() as zip_file:
zip_file.write(request_file.raw)
to_extract = ZipFile(zip_file)
to_extract.extractall(build_dir)
return build_dir
@staticmethod
def __prepare_dockerfile(request_file):
build_dir = mkdtemp(prefix='docker-build-')
dockerfile_path = os.path.join(build_dir, "Dockerfile")
with open(dockerfile_path, 'w') as dockerfile:
dockerfile.write(request_file.content)
return build_dir
@staticmethod
def __prepare_tarball(request_file):
build_dir = mkdtemp(prefix='docker-build-')
# Save the zip file to temp somewhere
with tarfile.open(mode='r|*', fileobj=request_file.raw) as tar_stream:
tar_stream.extractall(build_dir)
return build_dir
def process_queue_item(self, job_details):
repository_build = model.get_repository_build(job_details['namespace'],
job_details['repository'],
job_details['build_uuid'])
job_config = json.loads(repository_build.job_config)
resource_url = user_files.get_file_url(repository_build.resource_key)
tag_names = job_config['docker_tags']
build_subdir = job_config['build_subdir']
repo = job_config['repository']
access_token = repository_build.access_token.code
log_appender = partial(build_logs.append_log_message,
repository_build.uuid)
log_appender('initializing', build_logs.PHASE)
start_msg = ('Starting job with resource url: %s repo: %s' % (resource_url,
repo))
log_appender(start_msg)
docker_resource = requests.get(resource_url, stream=True)
c_type = docker_resource.headers['content-type']
filetype_msg = ('Request to build type: %s with repo: %s and tags: %s' %
(c_type, repo, tag_names))
logger.info(filetype_msg)
log_appender(filetype_msg)
if c_type not in self._mime_processors:
log_appender('error', build_logs.PHASE)
repository_build.phase = 'error'
repository_build.save()
log_appender('Unknown mime-type: %s' % c_type, build_logs.ERROR)
return True
build_dir = self._mime_processors[c_type](docker_resource)
log_appender('building', build_logs.PHASE)
repository_build.phase = 'building'
repository_build.save()
with DockerfileBuildContext(build_dir, build_subdir, repo, tag_names,
access_token,
repository_build.uuid) as build_ctxt:
try:
built_image = build_ctxt.build()
if not built_image:
log_appender('error', build_logs.PHASE)
repository_build.phase = 'error'
repository_build.save()
log_appender('Unable to build dockerfile.', build_logs.ERROR)
return True
log_appender('pushing', build_logs.PHASE)
repository_build.phase = 'pushing'
repository_build.save()
build_ctxt.push(built_image)
log_appender('complete', build_logs.PHASE)
repository_build.phase = 'complete'
repository_build.save()
except Exception as exc:
log_appender('error', build_logs.PHASE)
logger.exception('Exception when processing request.')
repository_build.phase = 'error'
repository_build.save()
log_appender(str(exc), build_logs.ERROR)
return True
return True
desc = 'Worker daemon to monitor dockerfile build'
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('-D', action='store_true', default=False,
help='Run the worker in daemon mode.')
parser.add_argument('--log', default='dockerfilebuild.log',
help='Specify the log file for the worker as a daemon.')
args = parser.parse_args()
worker = DockerfileBuildWorker(dockerfile_build_queue)
if args.D:
handler = logging.FileHandler(args.log)
handler.setFormatter(formatter)
root_logger.addHandler(handler)
with daemon.DaemonContext(files_preserve=[handler.stream]):
worker.start()
else:
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger.addHandler(handler)
worker.start()