This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/workers/dockerfilebuild.py

570 lines
19 KiB
Python

import logging
import daemon
import argparse
import os
import requests
import re
import json
import shutil
import tarfile
from docker import Client, APIError
from tempfile import TemporaryFile, mkdtemp
from zipfile import ZipFile
from functools import partial
from datetime import datetime, timedelta
from threading import Event
from uuid import uuid4
from data.queue import dockerfile_build_queue
from data import model
from workers.worker import Worker
from app import app, userfiles as user_files
from util.safetar import safe_extractall
from util.dockerfileparse import parse_dockerfile, ParsedDockerfile, serialize_dockerfile
root_logger = logging.getLogger('')
root_logger.setLevel(logging.DEBUG)
FORMAT = '%(asctime)-15s - %(levelname)s - %(pathname)s - %(funcName)s - %(message)s'
formatter = logging.Formatter(FORMAT)
logger = logging.getLogger(__name__)
build_logs = app.config['BUILDLOGS']
TIMEOUT_PERIOD_MINUTES = 20
CACHE_EXPIRATION_PERIOD_HOURS = 24
class StatusWrapper(object):
def __init__(self, build_uuid):
self._uuid = build_uuid
self._status = {
'total_commands': None,
'current_command': None,
'push_completion': 0.0,
'image_completion': {},
}
self.__exit__(None, None, None)
def __enter__(self):
return self._status
def __exit__(self, exc_type, value, traceback):
build_logs.set_status(self._uuid, self._status)
class _IncompleteJsonError(Exception):
def __init__(self, start_from):
self.start_from = start_from
class _StreamingJSONDecoder(json.JSONDecoder):
FLAGS = re.VERBOSE | re.MULTILINE | re.DOTALL
WHITESPACE = re.compile(r'[ \t\n\r]*', FLAGS)
def decode(self, s, _w=WHITESPACE.match):
"""Return the Python representation of ``s`` (a ``str`` or ``unicode``
instance containing a JSON document)
"""
start_from = 0
while start_from < len(s):
try:
obj, end = self.raw_decode(s[start_from:], idx=_w(s[start_from:], 0).end())
except ValueError:
raise _IncompleteJsonError(start_from)
end = _w(s[start_from:], end).end()
start_from += end
yield obj
class StreamingDockerClient(Client):
def _stream_helper(self, response):
"""Generator for data coming from a chunked-encoded HTTP response."""
content_buf = ''
for content in response.iter_content(chunk_size=256):
content_buf += content
try:
for val in json.loads(content_buf, cls=_StreamingJSONDecoder):
yield val
content_buf = ''
except _IncompleteJsonError as exc:
content_buf = content_buf[exc.start_from:]
class DockerfileBuildContext(object):
image_id_to_cache_time = {}
private_repo_tags = set()
def __init__(self, build_context_dir, dockerfile_subdir, repo, tag_names,
push_token, build_uuid, pull_credentials=None):
self._build_dir = build_context_dir
self._dockerfile_subdir = dockerfile_subdir
self._repo = repo
self._tag_names = tag_names
self._push_token = push_token
self._status = StatusWrapper(build_uuid)
self._build_logger = partial(build_logs.append_log_message, build_uuid)
self._pull_credentials = pull_credentials
self._public_repos = set()
# Note: We have two different clients here because we (potentially) login
# with both, but with different credentials that we do not want shared between
# the build and push operations.
self._push_cl = StreamingDockerClient(timeout=1200)
self._build_cl = StreamingDockerClient(timeout=1200)
dockerfile_path = os.path.join(self._build_dir, dockerfile_subdir,
'Dockerfile')
# Compute the number of steps
with open(dockerfile_path, 'r') as dockerfileobj:
self._parsed_dockerfile = parse_dockerfile(dockerfileobj.read())
self.__inject_quay_repo_env(self._parsed_dockerfile, repo)
self._num_steps = len(self._parsed_dockerfile.commands)
with open(dockerfile_path, 'w') as dockerfileobj:
dockerfileobj.write(serialize_dockerfile(self._parsed_dockerfile))
logger.debug('Will build and push to repo %s with tags named: %s' %
(self._repo, self._tag_names))
def __enter__(self):
self.__cleanup_containers()
self.__evict_expired_images()
self.__cleanup()
return self
def __exit__(self, exc_type, value, traceback):
self.__cleanup_containers()
self.__cleanup()
shutil.rmtree(self._build_dir)
@staticmethod
def __inject_quay_repo_env(parsed_dockerfile, quay_reponame):
env_command = {
'command': 'ENV',
'parameters': 'QUAY_REPOSITORY %s' % quay_reponame
}
for index, command in reversed(list(enumerate(parsed_dockerfile.commands))):
if command['command'] == 'FROM':
new_command_index = index + 1
logger.debug('Injecting env command at dockerfile index: %s', new_command_index)
parsed_dockerfile.commands.insert(new_command_index, env_command)
break
@staticmethod
def __total_completion(statuses, total_images):
percentage_with_sizes = float(len(statuses.values()))/total_images
sent_bytes = sum([status[u'current'] for status in statuses.values()])
total_bytes = sum([status[u'total'] for status in statuses.values()])
return float(sent_bytes)/total_bytes*percentage_with_sizes
def build(self):
# Login with the specified credentials (if any).
if self._pull_credentials:
logger.debug('Logging in with pull credentials: %s@%s',
self._pull_credentials['username'], self._pull_credentials['registry'])
self._build_cl.login(self._pull_credentials['username'], self._pull_credentials['password'],
registry=self._pull_credentials['registry'], reauth=True)
# Pull the image, in case it was updated since the last build
base_image = self._parsed_dockerfile.get_base_image()
self._build_logger('Pulling base image: %s' % base_image)
self._build_cl.pull(base_image)
# Start the build itself.
logger.debug('Starting build.')
with self._status as status:
status['total_commands'] = self._num_steps
logger.debug('Building to tags named: %s' % self._tag_names)
context_path = os.path.join(self._build_dir, self._dockerfile_subdir)
logger.debug('Final context path: %s exists: %s' %
(context_path, os.path.exists(context_path)))
build_status = self._build_cl.build(path=context_path, stream=True)
current_step = 0
built_image = None
for status in build_status:
fully_unwrapped = ""
if isinstance(status, dict):
keys_to_extract = ['error', 'status', 'stream']
for key in keys_to_extract:
if key in status:
fully_unwrapped = status[key]
break
if not fully_unwrapped:
logger.debug('Status dict did not have any extractable keys and was: %s', status)
elif isinstance(status, basestring):
fully_unwrapped = status
status_str = str(fully_unwrapped.encode('utf-8'))
logger.debug('Status: %s', status_str)
step_increment = re.search(r'Step ([0-9]+) :', status_str)
if step_increment:
self._build_logger(status_str, build_logs.COMMAND)
current_step = int(step_increment.group(1))
logger.debug('Step now: %s/%s' % (current_step, self._num_steps))
with self._status as status_update:
status_update['current_command'] = current_step
continue
else:
self._build_logger(status_str)
complete = re.match(r'Successfully built ([a-z0-9]+)$', status_str)
if complete:
built_image = complete.group(1)
logger.debug('Final image ID is: %s' % built_image)
continue
# Get the image count
if not built_image:
return
return built_image
def push(self, built_image):
# Login to the registry
host = re.match(r'([a-z0-9.:]+)/.+/.+$', self._repo)
if not host:
raise RuntimeError('Invalid repo name: %s' % self._repo)
for protocol in ['https', 'http']:
registry_endpoint = '%s://%s/v1/' % (protocol, host.group(1))
logger.debug('Attempting login to registry: %s' % registry_endpoint)
try:
self._push_cl.login('$token', self._push_token, registry=registry_endpoint)
break
except APIError:
pass # Probably the wrong protocol
for tag in self._tag_names:
logger.debug('Tagging image %s as %s:%s' %
(built_image, self._repo, tag))
self._push_cl.tag(built_image, self._repo, tag)
history = json.loads(self._push_cl.history(built_image))
num_images = len(history)
with self._status as status:
status['total_images'] = num_images
logger.debug('Pushing to repo %s' % self._repo)
resp = self._push_cl.push(self._repo, stream=True)
for status in resp:
logger.debug('Status: %s', status)
if u'status' in status:
status_msg = status[u'status']
if status_msg == 'Pushing':
if u'progressDetail' in status and u'id' in status:
image_id = status[u'id']
detail = status[u'progressDetail']
if u'current' in detail and 'total' in detail:
with self._status as status:
images = status['image_completion']
images[image_id] = detail
status['push_completion'] = \
DockerfileBuildContext.__total_completion(images, num_images)
elif u'errorDetail' in status:
message = 'Error pushing image.'
if u'message' in status[u'errorDetail']:
message = str(status[u'errorDetail'][u'message'])
raise RuntimeError(message)
def __is_repo_public(self, repo_name):
if repo_name in self._public_repos:
return True
repo_portions = repo_name.split('/')
registry_hostname = 'index.docker.io'
local_repo_name = repo_name
if len(repo_portions) > 2:
registry_hostname = repo_portions[0]
local_repo_name = '/'.join(repo_portions[1:])
repo_url_template = '%s://%s/v1/repositories/%s/images'
protocols = ['https', 'http']
secure_repo_url, repo_url = [repo_url_template % (protocol, registry_hostname, local_repo_name)
for protocol in protocols]
try:
try:
repo_info = requests.get(secure_repo_url)
except requests.exceptions.SSLError:
repo_info = requests.get(repo_url)
except requests.exceptions.ConnectionError:
return False
if repo_info.status_code / 100 == 2:
self._public_repos.add(repo_name)
return True
else:
return False
def __cleanup_containers(self):
# First clean up any containers that might be holding the images
for running in self._build_cl.containers(quiet=True):
logger.debug('Killing container: %s' % running['Id'])
self._build_cl.kill(running['Id'])
# Next, remove all of the containers (which should all now be killed)
for container in self._build_cl.containers(all=True, quiet=True):
logger.debug('Removing container: %s' % container['Id'])
self._build_cl.remove_container(container['Id'])
def __evict_expired_images(self):
logger.debug('Cleaning images older than %s hours.', CACHE_EXPIRATION_PERIOD_HOURS)
now = datetime.now()
verify_removed = set()
for image in self._build_cl.images():
image_id = image[u'Id']
created = datetime.fromtimestamp(image[u'Created'])
# If we don't have a cache time, use the created time (e.g. worker reboot)
cache_time = self.image_id_to_cache_time.get(image_id, created)
expiration = cache_time + timedelta(hours=CACHE_EXPIRATION_PERIOD_HOURS)
if expiration < now:
logger.debug('Removing expired image: %s' % image_id)
for tag in image['RepoTags']:
# We can forget about this particular tag if it was indeed one of our renamed tags
self.private_repo_tags.discard(tag)
try:
self._build_cl.remove_image(tag)
except APIError:
# Sometimes an upstream image removed this one
pass
try:
self._build_cl.remove_image(image_id)
except APIError:
# Sometimes an upstream image removed this one
pass
verify_removed.add(image_id)
# Verify that our images were actually removed
for image in self._build_cl.images():
if image['Id'] in verify_removed:
logger.warning('Image was not removed: %s' % image['Id'])
def __cleanup(self):
# Iterate all of the images and rename the ones that aren't public. This should preserve
# base images and also allow the cache to function.
now = datetime.now()
for image in self._build_cl.images():
image_id = image[u'Id']
if image_id not in self.image_id_to_cache_time:
logger.debug('Setting image %s cache time to %s', image_id, now)
self.image_id_to_cache_time[image_id] = now
for tag in image['RepoTags']:
tag_repo = ParsedDockerfile.base_image_from_repo_identifier(tag)
if tag_repo != '<none>':
if tag_repo in self.private_repo_tags:
logger.debug('Repo is private and has already been renamed: %s' % tag_repo)
elif self.__is_repo_public(tag_repo):
logger.debug('Repo was deemed public: %s', tag_repo)
else:
new_name = str(uuid4())
logger.debug('Private repo tag being renamed %s -> %s', tag, new_name)
self._build_cl.tag(image_id, new_name)
self._build_cl.remove_image(tag)
self.private_repo_tags.add(new_name)
class DockerfileBuildWorker(Worker):
def __init__(self, *vargs, **kwargs):
super(DockerfileBuildWorker, self).__init__(*vargs, **kwargs)
self._mime_processors = {
'application/zip': DockerfileBuildWorker.__prepare_zip,
'application/x-zip-compressed': DockerfileBuildWorker.__prepare_zip,
'text/plain': DockerfileBuildWorker.__prepare_dockerfile,
'application/octet-stream': DockerfileBuildWorker.__prepare_dockerfile,
'application/x-tar': DockerfileBuildWorker.__prepare_tarball,
'application/gzip': DockerfileBuildWorker.__prepare_tarball,
'application/x-gzip': DockerfileBuildWorker.__prepare_tarball,
}
self._timeout = Event()
@staticmethod
def __prepare_zip(request_file):
build_dir = mkdtemp(prefix='docker-build-')
# Save the zip file to temp somewhere
with TemporaryFile() as zip_file:
zip_file.write(request_file.content)
to_extract = ZipFile(zip_file)
to_extract.extractall(build_dir)
return build_dir
@staticmethod
def __prepare_dockerfile(request_file):
build_dir = mkdtemp(prefix='docker-build-')
dockerfile_path = os.path.join(build_dir, "Dockerfile")
with open(dockerfile_path, 'w') as dockerfile:
dockerfile.write(request_file.content)
return build_dir
@staticmethod
def __prepare_tarball(request_file):
build_dir = mkdtemp(prefix='docker-build-')
# Save the zip file to temp somewhere
with tarfile.open(mode='r|*', fileobj=request_file.raw) as tar_stream:
safe_extractall(tar_stream, build_dir)
return build_dir
def watchdog(self):
logger.debug('Running build watchdog code.')
docker_cl = Client()
# Iterate the running containers and kill ones that have been running more than 20 minutes
for container in docker_cl.containers():
start_time = datetime.fromtimestamp(container[u'Created'])
running_time = datetime.now() - start_time
if running_time > timedelta(minutes=TIMEOUT_PERIOD_MINUTES):
logger.warning('Container has been running too long: %s with command: %s',
container[u'Id'], container[u'Command'])
docker_cl.kill(container[u'Id'])
self._timeout.set()
def process_queue_item(self, job_details):
self._timeout.clear()
repository_build = model.get_repository_build(job_details['namespace'],
job_details['repository'],
job_details['build_uuid'])
pull_credentials = job_details.get('pull_credentials', None)
job_config = json.loads(repository_build.job_config)
resource_url = user_files.get_file_url(repository_build.resource_key)
tag_names = job_config['docker_tags']
build_subdir = job_config['build_subdir']
repo = job_config['repository']
access_token = repository_build.access_token.code
log_appender = partial(build_logs.append_log_message,
repository_build.uuid)
log_appender('initializing', build_logs.PHASE)
start_msg = ('Starting job with resource url: %s repo: %s' % (resource_url,
repo))
logger.debug(start_msg)
docker_resource = requests.get(resource_url, stream=True)
c_type = docker_resource.headers['content-type']
if ';' in c_type:
c_type = c_type.split(';')[0]
filetype_msg = ('Request to build type: %s with repo: %s and tags: %s' %
(c_type, repo, tag_names))
logger.info(filetype_msg)
log_appender(filetype_msg)
if c_type not in self._mime_processors:
log_appender('error', build_logs.PHASE)
repository_build.phase = 'error'
repository_build.save()
log_appender('Unknown mime-type: %s' % c_type, build_logs.ERROR)
return True
build_dir = self._mime_processors[c_type](docker_resource)
log_appender('building', build_logs.PHASE)
repository_build.phase = 'building'
repository_build.save()
with DockerfileBuildContext(build_dir, build_subdir, repo, tag_names,
access_token,
repository_build.uuid, pull_credentials) as build_ctxt:
try:
built_image = build_ctxt.build()
if not built_image:
log_appender('error', build_logs.PHASE)
repository_build.phase = 'error'
repository_build.save()
if self._timeout.is_set():
log_appender('Build step was terminated after %s minutes.' % TIMEOUT_PERIOD_MINUTES,
build_logs.ERROR)
else:
log_appender('Unable to build dockerfile.', build_logs.ERROR)
return True
log_appender('pushing', build_logs.PHASE)
repository_build.phase = 'pushing'
repository_build.save()
build_ctxt.push(built_image)
log_appender('complete', build_logs.PHASE)
repository_build.phase = 'complete'
repository_build.save()
except Exception as exc:
log_appender('error', build_logs.PHASE)
logger.exception('Exception when processing request.')
repository_build.phase = 'error'
repository_build.save()
log_appender(str(exc), build_logs.ERROR)
return True
return True
desc = 'Worker daemon to monitor dockerfile build'
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('-D', action='store_true', default=False,
help='Run the worker in daemon mode.')
parser.add_argument('--log', default='dockerfilebuild.log',
help='Specify the log file for the worker as a daemon.')
args = parser.parse_args()
worker = DockerfileBuildWorker(dockerfile_build_queue, reservation_seconds=60*60) # 1 hour
if args.D:
handler = logging.FileHandler(args.log)
handler.setFormatter(formatter)
root_logger.addHandler(handler)
with daemon.DaemonContext(files_preserve=[handler.stream]):
worker.start()
else:
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger.addHandler(handler)
worker.start()