This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/workers/dockerfilebuild.py

703 lines
25 KiB
Python

import logging.config
if __name__ == "__main__":
logging.config.fileConfig('conf/logging.conf', disable_existing_loggers=False)
import logging
import argparse
import os
import requests
import re
import json
import shutil
import tarfile
from docker import Client
from docker.utils import kwargs_from_env
from docker.errors import APIError
from tempfile import TemporaryFile, mkdtemp
from zipfile import ZipFile
from functools import partial
from datetime import datetime, timedelta
from threading import Event
from uuid import uuid4
from collections import defaultdict
from requests.exceptions import ConnectionError
from data import model
from data.database import BUILD_PHASE
from workers.worker import Worker, WorkerUnhealthyException, JobException
from app import userfiles as user_files, build_logs, sentry, dockerfile_build_queue
from endpoints.notificationhelper import spawn_notification
from util.safetar import safe_extractall
from util.dockerfileparse import parse_dockerfile, ParsedDockerfile, serialize_dockerfile
logger = logging.getLogger(__name__)
TIMEOUT_PERIOD_MINUTES = 20
CACHE_EXPIRATION_PERIOD_HOURS = 24
NO_TAGS = ['<none>:<none>']
RESERVATION_TIME = (TIMEOUT_PERIOD_MINUTES + 5) * 60
def build_docker_args():
args = kwargs_from_env()
if 'tls' in args and os.environ.get('IGNORE_TLS_ISSUES', False):
args['tls'].verify = False
return args
def matches_system_error(status_str):
""" Returns true if the given status string matches a known system error in the
Docker builder.
"""
KNOWN_MATCHES = ['lxc-start: invalid', 'lxc-start: failed to', 'lxc-start: Permission denied']
for match in KNOWN_MATCHES:
# 10 because we might have a Unix control code at the start.
found = status_str.find(match[0:len(match) + 10])
if found >= 0 and found <= 10:
return True
return False
class StatusWrapper(object):
def __init__(self, build_uuid):
self._uuid = build_uuid
self._status = {
'total_commands': None,
'current_command': None,
'push_completion': 0.0,
'pull_completion': 0.0,
}
self.__exit__(None, None, None)
def __enter__(self):
return self._status
def __exit__(self, exc_type, value, traceback):
build_logs.set_status(self._uuid, self._status)
class _IncompleteJsonError(Exception):
def __init__(self, start_from):
self.start_from = start_from
class _StreamingJSONDecoder(json.JSONDecoder):
FLAGS = re.VERBOSE | re.MULTILINE | re.DOTALL
WHITESPACE = re.compile(r'[ \t\n\r]*', FLAGS)
def decode(self, s, _w=WHITESPACE.match):
"""Return the Python representation of ``s`` (a ``str`` or ``unicode``
instance containing a JSON document)
"""
start_from = 0
while start_from < len(s):
try:
obj, end = self.raw_decode(s[start_from:], idx=_w(s[start_from:], 0).end())
except ValueError:
raise _IncompleteJsonError(start_from)
end = _w(s[start_from:], end).end()
start_from += end
yield obj
class StreamingDockerClient(Client):
def _stream_helper(self, response):
"""Generator for data coming from a chunked-encoded HTTP response."""
content_buf = ''
for content in response.iter_content(chunk_size=256):
content_buf += content
try:
for val in json.loads(content_buf, cls=_StreamingJSONDecoder):
yield val
content_buf = ''
except _IncompleteJsonError as exc:
content_buf = content_buf[exc.start_from:]
class DockerfileBuildContext(object):
def __init__(self, build_context_dir, dockerfile_subdir, repo, tag_names,
push_token, build_uuid, cache_size_gb, pull_credentials=None):
self._build_dir = build_context_dir
self._dockerfile_subdir = dockerfile_subdir
self._repo = repo
self._tag_names = tag_names
self._push_token = push_token
self._status = StatusWrapper(build_uuid)
self._build_logger = partial(build_logs.append_log_message, build_uuid)
self._pull_credentials = pull_credentials
self._cache_size_gb = cache_size_gb
# Note: We have two different clients here because we (potentially) login
# with both, but with different credentials that we do not want shared between
# the build and push operations.
self._push_cl = StreamingDockerClient(timeout=1200, **build_docker_args())
self._build_cl = StreamingDockerClient(timeout=1200, **build_docker_args())
dockerfile_path = os.path.join(self._build_dir, dockerfile_subdir,
'Dockerfile')
if not os.path.exists(dockerfile_path):
raise RuntimeError('Build job did not contain a Dockerfile.')
# Compute the number of steps
with open(dockerfile_path, 'r') as dockerfileobj:
self._parsed_dockerfile = parse_dockerfile(dockerfileobj.read())
self.__inject_quay_repo_env(self._parsed_dockerfile, repo)
self._num_steps = len(self._parsed_dockerfile.commands)
with open(dockerfile_path, 'w') as dockerfileobj:
dockerfileobj.write(serialize_dockerfile(self._parsed_dockerfile))
logger.debug('Will build and push to repo %s with tags named: %s', self._repo,
self._tag_names)
def __enter__(self):
try:
self.__cleanup_containers()
self.__cleanup_images()
self.__prune_cache()
except APIError:
sentry.client.captureException()
message = 'Docker installation is no longer healthy.'
logger.exception(message)
raise WorkerUnhealthyException(message)
return self
def __exit__(self, exc_type, value, traceback):
shutil.rmtree(self._build_dir)
try:
self.__cleanup_containers()
except APIError:
sentry.client.captureException()
message = 'Docker installation is no longer healthy.'
logger.exception(message)
raise WorkerUnhealthyException(message)
@staticmethod
def __inject_quay_repo_env(parsed_dockerfile, quay_reponame):
env_command = {
'command': 'ENV',
'parameters': 'QUAY_REPOSITORY %s' % quay_reponame
}
for index, command in reversed(list(enumerate(parsed_dockerfile.commands))):
if command['command'] == 'FROM':
new_command_index = index + 1
logger.debug('Injecting env command at dockerfile index: %s', new_command_index)
parsed_dockerfile.commands.insert(new_command_index, env_command)
break
@staticmethod
def __total_completion(statuses, total_images):
percentage_with_sizes = float(len(statuses.values()))/total_images
sent_bytes = sum([status['current'] for status in statuses.values()])
total_bytes = sum([status['total'] for status in statuses.values()])
return float(sent_bytes)/total_bytes*percentage_with_sizes
@staticmethod
def __monitor_completion(status_stream, required_message, status_updater, status_completion_key,
num_images=0):
images = {}
for status in status_stream:
logger.debug('%s: %s', status_completion_key, status)
if 'status' in status:
status_msg = status['status']
if status_msg == required_message:
if 'progressDetail' in status and 'id' in status:
image_id = status['id']
detail = status['progressDetail']
if 'current' in detail and 'total' in detail:
images[image_id] = detail
with status_updater as status_update:
status_update[status_completion_key] = \
DockerfileBuildContext.__total_completion(images, max(len(images), num_images))
elif 'errorDetail' in status:
message = 'Error pushing image.'
if 'message' in status['errorDetail']:
message = str(status['errorDetail']['message'])
raise RuntimeError(message)
def pull(self):
image_and_tag_tuple = self._parsed_dockerfile.get_image_and_tag()
if image_and_tag_tuple is None or image_and_tag_tuple[0] is None:
self._build_logger('Missing FROM command in Dockerfile', build_logs.ERROR)
raise JobException('Missing FROM command in Dockerfile')
image_and_tag = ':'.join(image_and_tag_tuple)
# Login with the specified credentials (if any).
if self._pull_credentials:
logger.debug('Logging in with pull credentials: %s@%s',
self._pull_credentials['username'], self._pull_credentials['registry'])
self._build_logger('Pulling base image: %s' % image_and_tag, log_data={
'phasestep': 'login',
'username': self._pull_credentials['username'],
'registry': self._pull_credentials['registry']
})
self._build_cl.login(self._pull_credentials['username'], self._pull_credentials['password'],
registry=self._pull_credentials['registry'], reauth=True)
else:
self._build_logger('Pulling base image: %s' % image_and_tag, log_data={
'phasestep': 'pull',
'repo_url': image_and_tag
})
pull_status = self._build_cl.pull(image_and_tag, stream=True)
self.__monitor_completion(pull_status, 'Downloading', self._status, 'pull_completion')
def build(self, reservation_extension_method):
# Start the build itself.
logger.debug('Starting build.')
with self._status as status:
status['total_commands'] = self._num_steps
logger.debug('Building to tags named: %s', self._tag_names)
context_path = os.path.join(self._build_dir, self._dockerfile_subdir)
logger.debug('Final context path: %s exists: %s', context_path,
os.path.exists(context_path))
build_status = self._build_cl.build(path=context_path, stream=True)
current_step = 0
built_image = None
for status in build_status:
fully_unwrapped = ""
if isinstance(status, dict):
keys_to_extract = ['error', 'status', 'stream']
for key in keys_to_extract:
if key in status:
fully_unwrapped = status[key]
break
if not fully_unwrapped:
logger.debug('Status dict did not have any extractable keys and was: %s', status)
elif isinstance(status, basestring):
fully_unwrapped = status
status_str = str(fully_unwrapped.encode('utf-8'))
# Check for system errors when building.
if matches_system_error(status_str):
raise WorkerUnhealthyException(status_str)
logger.debug('Status: %s', status_str)
step_increment = re.search(r'Step ([0-9]+) :', status_str)
if step_increment:
self._build_logger(status_str, build_logs.COMMAND)
current_step = int(step_increment.group(1))
logger.debug('Step now: %s/%s', current_step, self._num_steps)
with self._status as status_update:
status_update['current_command'] = current_step
# Tell the queue that we're making progress every time we advance a step
reservation_extension_method(RESERVATION_TIME)
continue
else:
self._build_logger(status_str)
complete = re.match(r'Successfully built ([a-z0-9]+)$', status_str)
if complete:
built_image = complete.group(1)
logger.debug('Final image ID is: %s', built_image)
continue
# Get the image count
if not built_image:
return
return built_image
def push(self, built_image):
# Login to the registry
host = re.match(r'([a-z0-9.:]+)/.+/.+$', self._repo)
if not host:
raise RuntimeError('Invalid repo name: %s' % self._repo)
for protocol in ['https', 'http']:
registry_endpoint = '%s://%s/v1/' % (protocol, host.group(1))
logger.debug('Attempting login to registry: %s', registry_endpoint)
try:
self._push_cl.login('$token', self._push_token, registry=registry_endpoint)
break
except APIError:
pass # Probably the wrong protocol
for tag in self._tag_names:
logger.debug('Tagging image %s as %s:%s', built_image, self._repo, tag)
self._push_cl.tag(built_image, self._repo, tag)
history = self._push_cl.history(built_image)
num_images = len(history)
logger.debug('Pushing to repo %s', self._repo)
resp = self._push_cl.push(self._repo, stream=True)
self.__monitor_completion(resp, 'Pushing', self._status, 'push_completion', num_images)
def __cleanup_containers(self):
# First clean up any containers that might be holding the images
for running in self._build_cl.containers(quiet=True):
logger.debug('Killing container: %s', running['Id'])
self._build_cl.kill(running['Id'])
# Next, remove all of the containers (which should all now be killed)
for container in self._build_cl.containers(all=True, quiet=True):
logger.debug('Removing container: %s', container['Id'])
self._build_cl.remove_container(container['Id'])
def __cleanup_images(self):
""" Remove tags on internal nodes, and remove images older than the expiratino time. """
ids_to_images, ids_to_children = self.__compute_image_graph()
# Untag all internal nodes, which are usually the base images
for internal_id in ids_to_children.keys():
internal = ids_to_images[internal_id]
if internal['RepoTags'] != NO_TAGS:
for tag_name in internal['RepoTags']:
self._build_cl.remove_image(tag_name)
# Make sure all of the leaves have gibberish tags, and remove those older than our expiration
leaves = set(ids_to_images.keys()) - set(ids_to_children.keys())
now = datetime.now()
for leaf_id in leaves:
leaf = ids_to_images[leaf_id]
created = datetime.fromtimestamp(leaf['Created'])
expiration = created + timedelta(hours=CACHE_EXPIRATION_PERIOD_HOURS)
if expiration > now:
# Assign a new tag as a uuid to preserve this image
new_tag = str(uuid4())
self._build_cl.tag(leaf['Id'], new_tag)
# Remove all of the existing tags
if leaf['RepoTags'] != NO_TAGS:
for tag_name in leaf['RepoTags']:
self._build_cl.remove_image(tag_name)
def __prune_cache(self):
""" Remove the oldest leaf image until the cache size is the desired size. """
logger.debug('Pruning cache to size(gb): %s', self._cache_size_gb)
while self.__compute_cache_size_gb() > self._cache_size_gb:
logger.debug('Locating the oldest image in the cache to prune.')
# Find the oldest tagged image and remove it
oldest_creation_time = datetime.max
oldest_image = None
for image in self._build_cl.images():
created = datetime.fromtimestamp(image['Created'])
if created < oldest_creation_time:
oldest_creation_time = created
oldest_image = image
logger.debug('Removing oldest image from cache: %s', oldest_image['Id'])
# Remove all tags on the oldest image
if oldest_image['RepoTags'] == NO_TAGS:
# Remove the image id directly since there are no tags
self._build_cl.remove_image(oldest_image['Id'])
else:
# Remove all tags
for tag_name in oldest_image['RepoTags']:
self._build_cl.remove_image(tag_name)
def __compute_cache_size_gb(self):
all_images = self._build_cl.images(all=True)
size_in_bytes = sum([img['Size'] for img in all_images])
size_in_gb = float(size_in_bytes)/1024/1024/1024
logger.debug('Computed cache size(gb) of: %s', size_in_gb)
return size_in_gb
def __compute_image_graph(self):
all_images = self._build_cl.images(all=True)
ids_to_images = {}
ids_to_children = defaultdict(list)
for image in all_images:
if image['ParentId'] != '':
ids_to_children[image['ParentId']].append(image)
ids_to_images[image['Id']] = image
return (ids_to_images, ids_to_children)
class DockerfileBuildWorker(Worker):
def __init__(self, cache_size_gb, *vargs, **kwargs):
super(DockerfileBuildWorker, self).__init__(*vargs, **kwargs)
self._mime_processors = {
'application/zip': DockerfileBuildWorker.__prepare_zip,
'application/x-zip-compressed': DockerfileBuildWorker.__prepare_zip,
'text/plain': DockerfileBuildWorker.__prepare_dockerfile,
'application/octet-stream': DockerfileBuildWorker.__prepare_dockerfile,
'application/x-tar': DockerfileBuildWorker.__prepare_tarball,
'application/gzip': DockerfileBuildWorker.__prepare_tarball,
'application/x-gzip': DockerfileBuildWorker.__prepare_tarball,
}
self._timeout = Event()
self._cache_size_gb = cache_size_gb
@staticmethod
def __prepare_zip(request_file):
build_dir = mkdtemp(prefix='docker-build-')
# Save the zip file to temp somewhere
with TemporaryFile() as zip_file:
zip_file.write(request_file.content)
to_extract = ZipFile(zip_file)
to_extract.extractall(build_dir)
return build_dir
@staticmethod
def __prepare_dockerfile(request_file):
build_dir = mkdtemp(prefix='docker-build-')
dockerfile_path = os.path.join(build_dir, "Dockerfile")
with open(dockerfile_path, 'w') as dockerfile:
dockerfile.write(request_file.content)
return build_dir
@staticmethod
def __prepare_tarball(request_file):
build_dir = mkdtemp(prefix='docker-build-')
# Save the zip file to temp somewhere
with tarfile.open(mode='r|*', fileobj=request_file.raw) as tar_stream:
safe_extractall(tar_stream, build_dir)
return build_dir
def watchdog(self):
logger.debug('Running build watchdog code.')
try:
docker_cl = Client(**build_docker_args())
# Iterate the running containers and kill ones that have been running more than 20 minutes
for container in docker_cl.containers():
start_time = datetime.fromtimestamp(container['Created'])
running_time = datetime.now() - start_time
if running_time > timedelta(minutes=TIMEOUT_PERIOD_MINUTES):
logger.warning('Container has been running too long: %s with command: %s',
container['Id'], container['Command'])
docker_cl.kill(container['Id'])
self._timeout.set()
except ConnectionError as exc:
logger.exception('Watchdog exception')
raise WorkerUnhealthyException(exc.message)
def process_queue_item(self, job_details):
self._timeout.clear()
# Make sure we have more information for debugging problems
sentry.client.user_context(job_details)
repository_build = model.get_repository_build(job_details['build_uuid'])
pull_credentials = job_details.get('pull_credentials', None)
job_config = json.loads(repository_build.job_config)
resource_url = user_files.get_file_url(repository_build.resource_key, requires_cors=False)
tag_names = job_config['docker_tags']
build_subdir = job_config['build_subdir']
# TODO remove the top branch when there are no more jobs with a repository config
if 'repository' in job_config:
repo = job_config['repository']
else:
repo = '%s/%s/%s' % (job_config['registry'],
repository_build.repository.namespace_user.username,
repository_build.repository.name)
access_token = repository_build.access_token.code
log_appender = partial(build_logs.append_log_message, repository_build.uuid)
# Lookup and save the version of docker being used.
try:
docker_cl = Client(**build_docker_args())
docker_version = docker_cl.version().get('Version', '')
except ConnectionError as exc:
logger.exception('Initial connection exception')
raise WorkerUnhealthyException(exc.message)
dash = docker_version.find('-')
# Strip any -tutum or whatever off of the version.
if dash > 0:
docker_version = docker_version[:dash]
log_appender('initializing', build_logs.PHASE, log_data={
'docker_version': docker_version
})
log_appender('Docker version: %s' % docker_version)
start_msg = ('Starting job with resource url: %s repo: %s' % (resource_url, repo))
logger.debug(start_msg)
docker_resource = requests.get(resource_url, stream=True)
c_type = docker_resource.headers['content-type']
if ';' in c_type:
c_type = c_type.split(';')[0]
filetype_msg = ('Request to build type: %s with repo: %s and tags: %s' %
(c_type, repo, tag_names))
logger.info(filetype_msg)
log_appender(filetype_msg)
# Spawn a notification that the build has started.
event_data = {
'build_id': repository_build.uuid,
'build_name': repository_build.display_name,
'docker_tags': tag_names,
'trigger_id': repository_build.trigger.uuid,
'trigger_kind': repository_build.trigger.service.name
}
spawn_notification(repository_build.repository, 'build_start', event_data,
subpage='build?current=%s' % repository_build.uuid,
pathargs=['build', repository_build.uuid])
# Setup a handler for spawning failure messages.
def spawn_failure(message, event_data):
event_data['error_message'] = message
spawn_notification(repository_build.repository, 'build_failure', event_data,
subpage='build?current=%s' % repository_build.uuid,
pathargs=['build', repository_build.uuid])
if c_type not in self._mime_processors:
log_appender('error', build_logs.PHASE)
repository_build.phase = BUILD_PHASE.ERROR
repository_build.save()
message = 'Unknown mime-type: %s' % c_type
log_appender(message, build_logs.ERROR)
spawn_failure(message, event_data)
raise JobException(message)
# Try to build the build directory package from the buildpack.
log_appender('unpacking', build_logs.PHASE)
repository_build.phase = BUILD_PHASE.UNPACKING
repository_build.save()
build_dir = None
try:
build_dir = self._mime_processors[c_type](docker_resource)
except Exception as ex:
cur_message = ex.message or 'Error while unpacking build package'
log_appender(cur_message, build_logs.ERROR)
spawn_failure(cur_message, event_data)
raise JobException(cur_message)
# Start the build process.
try:
with DockerfileBuildContext(build_dir, build_subdir, repo, tag_names, access_token,
repository_build.uuid, self._cache_size_gb,
pull_credentials) as build_ctxt:
log_appender('pulling', build_logs.PHASE)
repository_build.phase = BUILD_PHASE.PULLING
repository_build.save()
build_ctxt.pull()
self.extend_processing(RESERVATION_TIME)
log_appender('building', build_logs.PHASE)
repository_build.phase = BUILD_PHASE.BUILDING
repository_build.save()
built_image = build_ctxt.build(self.extend_processing)
if not built_image:
log_appender('error', build_logs.PHASE)
repository_build.phase = BUILD_PHASE.ERROR
repository_build.save()
message = 'Unable to build dockerfile.'
if self._timeout.is_set():
message = 'Build step was terminated after %s minutes.' % TIMEOUT_PERIOD_MINUTES
log_appender(message, build_logs.ERROR)
raise JobException(message)
self.extend_processing(RESERVATION_TIME)
log_appender('pushing', build_logs.PHASE)
repository_build.phase = BUILD_PHASE.PUSHING
repository_build.save()
build_ctxt.push(built_image)
log_appender('complete', build_logs.PHASE)
repository_build.phase = BUILD_PHASE.COMPLETE
repository_build.save()
# Spawn a notification that the build has completed.
spawn_notification(repository_build.repository, 'build_success', event_data,
subpage='build?current=%s' % repository_build.uuid,
pathargs=['build', repository_build.uuid])
except WorkerUnhealthyException as exc:
# Spawn a notification that the build has failed.
log_appender('Worker has become unhealthy. Will retry shortly.', build_logs.ERROR)
spawn_failure(exc.message, event_data)
# Raise the exception to the queue.
raise exc
except JobException as exc:
# Spawn a notification that the build has failed.
spawn_failure(exc.message, event_data)
# Raise the exception to the queue.
raise exc
except ConnectionError as exc:
# A connection exception means the worker has become unhealthy (Docker is down)
# so we re-raise as that exception.
logger.exception('Build connection exception')
log_appender('Docker daemon has gone away. Will retry shortly.', build_logs.ERROR)
raise WorkerUnhealthyException(exc.message)
except Exception as exc:
# Spawn a notification that the build has failed.
spawn_failure(exc.message, event_data)
# Write the error to the logs.
sentry.client.captureException()
log_appender('error', build_logs.PHASE)
logger.exception('Exception when processing request.')
repository_build.phase = BUILD_PHASE.ERROR
repository_build.save()
log_appender(str(exc), build_logs.ERROR)
# Raise the exception to the queue.
raise JobException(str(exc))
if __name__ == "__main__":
desc = 'Worker daemon to monitor dockerfile build'
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('--cachegb', default=20, type=float,
help='Maximum cache size in gigabytes.')
args = parser.parse_args()
worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue,
reservation_seconds=RESERVATION_TIME)
worker.start(start_status_server_port=8000)