From 450928674bd8cc63138b4086fb5cfa4b256dcf62 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Wed, 30 Apr 2014 18:48:36 -0400 Subject: [PATCH] Use a new caching algorithm which can limit the size for the build nodes. Stop treating public images as special. Add a new phase to the builder for pulling. --- static/css/quay.css | 4 + static/js/app.js | 9 +- test/testlogs.py | 5 +- util/dockerfileparse.py | 39 +++-- workers/dockerfilebuild.py | 311 ++++++++++++++++++------------------- 5 files changed, 188 insertions(+), 180 deletions(-) diff --git a/static/css/quay.css b/static/css/quay.css index 2761b8e28..e6cf04f1e 100644 --- a/static/css/quay.css +++ b/static/css/quay.css @@ -676,6 +676,10 @@ i.toggle-icon:hover { background-color: #ddd; } +.phase-icon.pulling { + background-color: #cab442; +} + .phase-icon.building { background-color: #f0ad4e; } diff --git a/static/js/app.js b/static/js/app.js index 8aed08782..a6162a2b3 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -4214,6 +4214,9 @@ quayApp.directive('buildMessage', function () { case 'waiting': return 'Waiting for available build worker'; + + case 'pulling': + return 'Pulling base image'; case 'building': return 'Building image from Dockerfile'; @@ -4247,10 +4250,14 @@ quayApp.directive('buildProgress', function () { controller: function($scope, $element) { $scope.getPercentage = function(buildInfo) { switch (buildInfo.phase) { + case 'pulling': + return buildInfo.status.pull_completion * 100; + break; + case 'building': return (buildInfo.status.current_command / buildInfo.status.total_commands) * 100; break; - + case 'pushing': return buildInfo.status.push_completion * 100; break; diff --git a/test/testlogs.py b/test/testlogs.py index b30cee6ed..2133db2be 100644 --- a/test/testlogs.py +++ b/test/testlogs.py @@ -39,7 +39,7 @@ class TestBuildLogs(BuildLogs): 'total_commands': None, 'current_command': None, 'push_completion': 0.0, - 'image_completion': {}, + 'pull_completion': 0.0, } def __init__(self, redis_host, namespace, repository, test_build_id): @@ -156,7 +156,6 @@ class TestBuildLogs(BuildLogs): num_images = random.randint(2, 7) sizes = [random.randint(one_mb, one_mb * 5) for _ in range(num_images)] - image_completion = {} for image_num, image_size in enumerate(sizes): image_id = 'image_id_%s' % image_num @@ -168,8 +167,6 @@ class TestBuildLogs(BuildLogs): for i in range(one_mb, image_size, one_mb): image_completion[image_id]['current'] = i new_status = deepcopy(push_status_template) - new_status['image_completion'] = deepcopy(image_completion) - completion = TestBuildLogs._compute_total_completion(image_completion, num_images) new_status['push_completion'] = completion diff --git a/util/dockerfileparse.py b/util/dockerfileparse.py index 4a08e5dd8..cabaf6b16 100644 --- a/util/dockerfileparse.py +++ b/util/dockerfileparse.py @@ -4,23 +4,27 @@ LINE_CONTINUATION_REGEX = re.compile('\s*\\\s*\n') COMMAND_REGEX = re.compile('([A-Za-z]+)\s(.*)') COMMENT_CHARACTER = '#' +LATEST_TAG = 'latest' class ParsedDockerfile(object): def __init__(self, commands): self.commands = commands - def get_commands_of_kind(self, kind): + def _get_commands_of_kind(self, kind): return [command for command in self.commands if command['command'] == kind] - def get_base_image(self): - image_and_tag = self.get_base_image_and_tag() - if not image_and_tag: + def _get_from_image_identifier(self): + from_commands = self._get_commands_of_kind('FROM') + if not from_commands: return None - return self.base_image_from_repo_identifier(image_and_tag) + return from_commands[-1]['parameters'] @staticmethod - def base_image_from_repo_identifier(image_and_tag): + def parse_image_identifier(image_identifier): + """ Parses a docker image identifier, and returns a tuple of image name and tag, where the tag + is filled in with "latest" if left unspecified. + """ # Note: # Dockerfile images references can be of multiple forms: # server:port/some/path @@ -28,29 +32,34 @@ class ParsedDockerfile(object): # server/some/path # server/some/path:tag # server:port/some/path:tag - parts = image_and_tag.strip().split(':') + parts = image_identifier.strip().split(':') if len(parts) == 1: # somepath - return parts[0] + return (parts[0], LATEST_TAG) # Otherwise, determine if the last part is a port # or a tag. if parts[-1].find('/') >= 0: # Last part is part of the hostname. - return image_and_tag + return (image_identifier, LATEST_TAG) # Remaining cases: # server/some/path:tag # server:port/some/path:tag - return ':'.join(parts[0:-1]) + return (':'.join(parts[0:-1]), parts[-1]) - def get_base_image_and_tag(self): - from_commands = self.get_commands_of_kind('FROM') - if not from_commands: - return None + def get_base_image(self): + """ Return the base image without the tag name. """ + return self.get_image_and_tag()[0] - return from_commands[-1]['parameters'] + def get_image_and_tag(self): + """ Returns the image and tag from the FROM line of the dockerfile. """ + image_identifier = self._get_from_image_identifier() + if image_identifier is None: + return (None, None) + + return self.parse_image_identifier(image_identifier) def strip_comments(contents): diff --git a/workers/dockerfilebuild.py b/workers/dockerfilebuild.py index 66910fc92..0bc7d7467 100644 --- a/workers/dockerfilebuild.py +++ b/workers/dockerfilebuild.py @@ -8,13 +8,15 @@ import json import shutil import tarfile -from docker import Client, APIError +from docker import Client +from docker.errors import APIError from tempfile import TemporaryFile, mkdtemp from zipfile import ZipFile from functools import partial from datetime import datetime, timedelta from threading import Event from uuid import uuid4 +from collections import defaultdict from data.queue import dockerfile_build_queue from data import model @@ -36,6 +38,7 @@ build_logs = app.config['BUILDLOGS'] TIMEOUT_PERIOD_MINUTES = 20 CACHE_EXPIRATION_PERIOD_HOURS = 24 +NO_TAGS = [':'] class StatusWrapper(object): @@ -45,7 +48,7 @@ class StatusWrapper(object): 'total_commands': None, 'current_command': None, 'push_completion': 0.0, - 'image_completion': {}, + 'pull_completion': 0.0, } self.__exit__(None, None, None) @@ -97,11 +100,8 @@ class StreamingDockerClient(Client): class DockerfileBuildContext(object): - image_id_to_cache_time = {} - private_repo_tags = set() - def __init__(self, build_context_dir, dockerfile_subdir, repo, tag_names, - push_token, build_uuid, pull_credentials=None): + push_token, build_uuid, cache_size_gb, pull_credentials=None): self._build_dir = build_context_dir self._dockerfile_subdir = dockerfile_subdir self._repo = repo @@ -110,7 +110,7 @@ class DockerfileBuildContext(object): self._status = StatusWrapper(build_uuid) self._build_logger = partial(build_logs.append_log_message, build_uuid) self._pull_credentials = pull_credentials - self._public_repos = set() + self._cache_size_gb = cache_size_gb # Note: We have two different clients here because we (potentially) login # with both, but with different credentials that we do not want shared between @@ -120,6 +120,8 @@ class DockerfileBuildContext(object): dockerfile_path = os.path.join(self._build_dir, dockerfile_subdir, 'Dockerfile') + if not os.path.exists(dockerfile_path): + raise RuntimeError('Build job did not contain a Dockerfile.') # Compute the number of steps with open(dockerfile_path, 'r') as dockerfileobj: @@ -131,18 +133,17 @@ class DockerfileBuildContext(object): with open(dockerfile_path, 'w') as dockerfileobj: dockerfileobj.write(serialize_dockerfile(self._parsed_dockerfile)) - logger.debug('Will build and push to repo %s with tags named: %s' % - (self._repo, self._tag_names)) + logger.debug('Will build and push to repo %s with tags named: %s', self._repo, + self._tag_names) def __enter__(self): self.__cleanup_containers() - self.__evict_expired_images() - self.__cleanup() + self.__cleanup_images() + self.__prune_cache() return self def __exit__(self, exc_type, value, traceback): self.__cleanup_containers() - self.__cleanup() shutil.rmtree(self._build_dir) @@ -159,15 +160,41 @@ class DockerfileBuildContext(object): parsed_dockerfile.commands.insert(new_command_index, env_command) break - @staticmethod def __total_completion(statuses, total_images): percentage_with_sizes = float(len(statuses.values()))/total_images - sent_bytes = sum([status[u'current'] for status in statuses.values()]) - total_bytes = sum([status[u'total'] for status in statuses.values()]) + sent_bytes = sum([status['current'] for status in statuses.values()]) + total_bytes = sum([status['total'] for status in statuses.values()]) return float(sent_bytes)/total_bytes*percentage_with_sizes - def build(self): + @staticmethod + def __monitor_completion(status_stream, required_message, status_updater, status_completion_key, + num_images=0): + images = {} + for status in status_stream: + logger.debug('%s: %s', status_completion_key, status) + if 'status' in status: + status_msg = status['status'] + + if status_msg == required_message: + if 'progressDetail' in status and 'id' in status: + image_id = status['id'] + detail = status['progressDetail'] + + if 'current' in detail and 'total' in detail: + images[image_id] = detail + with status_updater as status_update: + status_update[status_completion_key] = \ + DockerfileBuildContext.__total_completion(images, max(len(images), num_images)) + + elif 'errorDetail' in status: + message = 'Error pushing image.' + if 'message' in status['errorDetail']: + message = str(status['errorDetail']['message']) + + raise RuntimeError(message) + + def pull(self): # Login with the specified credentials (if any). if self._pull_credentials: logger.debug('Logging in with pull credentials: %s@%s', @@ -176,21 +203,24 @@ class DockerfileBuildContext(object): registry=self._pull_credentials['registry'], reauth=True) # Pull the image, in case it was updated since the last build - base_image = self._parsed_dockerfile.get_base_image() - self._build_logger('Pulling base image: %s' % base_image) - self._build_cl.pull(base_image) + image_and_tag = ':'.join(self._parsed_dockerfile.get_image_and_tag()) + self._build_logger('Pulling base image: %s' % image_and_tag) + pull_status = self._build_cl.pull(image_and_tag, stream=True) + self.__monitor_completion(pull_status, 'Downloading', self._status, 'pull_completion') + + def build(self): # Start the build itself. logger.debug('Starting build.') with self._status as status: status['total_commands'] = self._num_steps - logger.debug('Building to tags named: %s' % self._tag_names) + logger.debug('Building to tags named: %s', self._tag_names) context_path = os.path.join(self._build_dir, self._dockerfile_subdir) - logger.debug('Final context path: %s exists: %s' % - (context_path, os.path.exists(context_path))) + logger.debug('Final context path: %s exists: %s', context_path, + os.path.exists(context_path)) build_status = self._build_cl.build(path=context_path, stream=True) @@ -216,7 +246,7 @@ class DockerfileBuildContext(object): if step_increment: self._build_logger(status_str, build_logs.COMMAND) current_step = int(step_increment.group(1)) - logger.debug('Step now: %s/%s' % (current_step, self._num_steps)) + logger.debug('Step now: %s/%s', current_step, self._num_steps) with self._status as status_update: status_update['current_command'] = current_step continue @@ -226,7 +256,7 @@ class DockerfileBuildContext(object): complete = re.match(r'Successfully built ([a-z0-9]+)$', status_str) if complete: built_image = complete.group(1) - logger.debug('Final image ID is: %s' % built_image) + logger.debug('Final image ID is: %s', built_image) continue # Get the image count @@ -243,7 +273,7 @@ class DockerfileBuildContext(object): for protocol in ['https', 'http']: registry_endpoint = '%s://%s/v1/' % (protocol, host.group(1)) - logger.debug('Attempting login to registry: %s' % registry_endpoint) + logger.debug('Attempting login to registry: %s', registry_endpoint) try: self._push_cl.login('$token', self._push_token, registry=registry_endpoint) @@ -252,151 +282,103 @@ class DockerfileBuildContext(object): pass # Probably the wrong protocol for tag in self._tag_names: - logger.debug('Tagging image %s as %s:%s' % - (built_image, self._repo, tag)) + logger.debug('Tagging image %s as %s:%s', built_image, self._repo, tag) self._push_cl.tag(built_image, self._repo, tag) history = json.loads(self._push_cl.history(built_image)) num_images = len(history) - with self._status as status: - status['total_images'] = num_images - logger.debug('Pushing to repo %s' % self._repo) + logger.debug('Pushing to repo %s', self._repo) resp = self._push_cl.push(self._repo, stream=True) - - for status in resp: - logger.debug('Status: %s', status) - if u'status' in status: - status_msg = status[u'status'] - - if status_msg == 'Pushing': - if u'progressDetail' in status and u'id' in status: - image_id = status[u'id'] - detail = status[u'progressDetail'] - - if u'current' in detail and 'total' in detail: - with self._status as status: - images = status['image_completion'] - - images[image_id] = detail - status['push_completion'] = \ - DockerfileBuildContext.__total_completion(images, num_images) - - elif u'errorDetail' in status: - message = 'Error pushing image.' - if u'message' in status[u'errorDetail']: - message = str(status[u'errorDetail'][u'message']) - - raise RuntimeError(message) - - def __is_repo_public(self, repo_name): - if repo_name in self._public_repos: - return True - - repo_portions = repo_name.split('/') - registry_hostname = 'index.docker.io' - local_repo_name = repo_name - if len(repo_portions) > 2: - registry_hostname = repo_portions[0] - local_repo_name = '/'.join(repo_portions[1:]) - - repo_url_template = '%s://%s/v1/repositories/%s/images' - protocols = ['https', 'http'] - secure_repo_url, repo_url = [repo_url_template % (protocol, registry_hostname, local_repo_name) - for protocol in protocols] - - try: - - try: - repo_info = requests.get(secure_repo_url) - except requests.exceptions.SSLError: - repo_info = requests.get(repo_url) - - except requests.exceptions.ConnectionError: - return False - - if repo_info.status_code / 100 == 2: - self._public_repos.add(repo_name) - return True - else: - return False + self.__monitor_completion(resp, 'Pushing', self._status, 'push_completion', num_images) def __cleanup_containers(self): # First clean up any containers that might be holding the images for running in self._build_cl.containers(quiet=True): - logger.debug('Killing container: %s' % running['Id']) + logger.debug('Killing container: %s', running['Id']) self._build_cl.kill(running['Id']) # Next, remove all of the containers (which should all now be killed) for container in self._build_cl.containers(all=True, quiet=True): - logger.debug('Removing container: %s' % container['Id']) + logger.debug('Removing container: %s', container['Id']) self._build_cl.remove_container(container['Id']) - def __evict_expired_images(self): - logger.debug('Cleaning images older than %s hours.', CACHE_EXPIRATION_PERIOD_HOURS) + def __cleanup_images(self): + """ Remove tags on internal nodes, and remove images older than the expiratino time. """ + ids_to_images, ids_to_children = self.__compute_image_graph() + + # Untag all internal nodes, which are usually the base images + for internal_id in ids_to_children.keys(): + internal = ids_to_images[internal_id] + if internal['RepoTags'] != NO_TAGS: + for tag_name in internal['RepoTags']: + self._build_cl.remove_image(tag_name) + + # Make sure all of the leaves have gibberish tags, and remove those older than our expiration + leaves = set(ids_to_images.keys()) - set(ids_to_children.keys()) now = datetime.now() - verify_removed = set() + for leaf_id in leaves: + leaf = ids_to_images[leaf_id] - for image in self._build_cl.images(): - image_id = image[u'Id'] - created = datetime.fromtimestamp(image[u'Created']) + created = datetime.fromtimestamp(leaf['Created']) + expiration = created + timedelta(hours=CACHE_EXPIRATION_PERIOD_HOURS) + if expiration > now: + # Assign a new tag as a uuid to preserve this image + new_tag = str(uuid4()) + self._build_cl.tag(leaf['Id'], new_tag) - # If we don't have a cache time, use the created time (e.g. worker reboot) - cache_time = self.image_id_to_cache_time.get(image_id, created) - expiration = cache_time + timedelta(hours=CACHE_EXPIRATION_PERIOD_HOURS) + # Remove all of the existing tags + if leaf['RepoTags'] != NO_TAGS: + for tag_name in leaf['RepoTags']: + self._build_cl.remove_image(tag_name) - if expiration < now: - logger.debug('Removing expired image: %s' % image_id) + def __prune_cache(self): + """ Remove the oldest leaf image until the cache size is the desired size. """ - for tag in image['RepoTags']: - # We can forget about this particular tag if it was indeed one of our renamed tags - self.private_repo_tags.discard(tag) + logger.debug('Pruning cache to size(gb): %s', self._cache_size_gb) + while self.__compute_cache_size_gb() > self._cache_size_gb: + logger.debug('Locating the oldest image in the cache to prune.') + # Find the oldest tagged image and remove it + oldest_creation_time = datetime.max + oldest_image = None + for image in self._build_cl.images(): + created = datetime.fromtimestamp(image['Created']) + if created < oldest_creation_time: + oldest_creation_time = created + oldest_image = image - try: - self._build_cl.remove_image(tag) - except APIError: - # Sometimes an upstream image removed this one - pass + logger.debug('Removing oldest image from cache: %s', oldest_image['Id']) + # Remove all tags on the oldest image + if oldest_image['RepoTags'] == NO_TAGS: + # Remove the image id directly since there are no tags + self._build_cl.remove_image(oldest_image['Id']) + else: + # Remove all tags + for tag_name in oldest_image['RepoTags']: + self._build_cl.remove_image(tag_name) - try: - self._build_cl.remove_image(image_id) - except APIError: - # Sometimes an upstream image removed this one - pass - verify_removed.add(image_id) + def __compute_cache_size_gb(self): + all_images = self._build_cl.images(all=True) + size_in_bytes = sum([img['Size'] for img in all_images]) + size_in_gb = float(size_in_bytes)/1024/1024/1024 + logger.debug('Computed cache size(gb) of: %s', size_in_gb) + return size_in_gb - # Verify that our images were actually removed - for image in self._build_cl.images(): - if image['Id'] in verify_removed: - logger.warning('Image was not removed: %s' % image['Id']) + def __compute_image_graph(self): + all_images = self._build_cl.images(all=True) - def __cleanup(self): - # Iterate all of the images and rename the ones that aren't public. This should preserve - # base images and also allow the cache to function. - now = datetime.now() - for image in self._build_cl.images(): - image_id = image[u'Id'] + ids_to_images = {} + ids_to_children = defaultdict(list) + for image in all_images: + if image['ParentId'] != '': + ids_to_children[image['ParentId']].append(image) + ids_to_images[image['Id']] = image - if image_id not in self.image_id_to_cache_time: - logger.debug('Setting image %s cache time to %s', image_id, now) - self.image_id_to_cache_time[image_id] = now + return (ids_to_images, ids_to_children) - for tag in image['RepoTags']: - tag_repo = ParsedDockerfile.base_image_from_repo_identifier(tag) - if tag_repo != '': - if tag_repo in self.private_repo_tags: - logger.debug('Repo is private and has already been renamed: %s' % tag_repo) - elif self.__is_repo_public(tag_repo): - logger.debug('Repo was deemed public: %s', tag_repo) - else: - new_name = str(uuid4()) - logger.debug('Private repo tag being renamed %s -> %s', tag, new_name) - self._build_cl.tag(image_id, new_name) - self._build_cl.remove_image(tag) - self.private_repo_tags.add(new_name) class DockerfileBuildWorker(Worker): - def __init__(self, *vargs, **kwargs): + def __init__(self, cache_size_gb, *vargs, **kwargs): super(DockerfileBuildWorker, self).__init__(*vargs, **kwargs) self._mime_processors = { @@ -410,6 +392,7 @@ class DockerfileBuildWorker(Worker): } self._timeout = Event() + self._cache_size_gb = cache_size_gb @staticmethod def __prepare_zip(request_file): @@ -449,12 +432,12 @@ class DockerfileBuildWorker(Worker): # Iterate the running containers and kill ones that have been running more than 20 minutes for container in docker_cl.containers(): - start_time = datetime.fromtimestamp(container[u'Created']) + start_time = datetime.fromtimestamp(container['Created']) running_time = datetime.now() - start_time if running_time > timedelta(minutes=TIMEOUT_PERIOD_MINUTES): logger.warning('Container has been running too long: %s with command: %s', - container[u'Id'], container[u'Command']) - docker_cl.kill(container[u'Id']) + container['Id'], container['Command']) + docker_cl.kill(container['Id']) self._timeout.set() def process_queue_item(self, job_details): @@ -503,14 +486,19 @@ class DockerfileBuildWorker(Worker): return True build_dir = self._mime_processors[c_type](docker_resource) - log_appender('building', build_logs.PHASE) - repository_build.phase = 'building' - repository_build.save() - with DockerfileBuildContext(build_dir, build_subdir, repo, tag_names, - access_token, - repository_build.uuid, pull_credentials) as build_ctxt: - try: + try: + with DockerfileBuildContext(build_dir, build_subdir, repo, tag_names, access_token, + repository_build.uuid, self._cache_size_gb, + pull_credentials) as build_ctxt: + log_appender('pulling', build_logs.PHASE) + repository_build.phase = 'pulling' + repository_build.save() + build_ctxt.pull() + + log_appender('building', build_logs.PHASE) + repository_build.phase = 'building' + repository_build.save() built_image = build_ctxt.build() if not built_image: @@ -519,7 +507,7 @@ class DockerfileBuildWorker(Worker): repository_build.save() if self._timeout.is_set(): log_appender('Build step was terminated after %s minutes.' % TIMEOUT_PERIOD_MINUTES, - build_logs.ERROR) + build_logs.ERROR) else: log_appender('Unable to build dockerfile.', build_logs.ERROR) return True @@ -534,13 +522,13 @@ class DockerfileBuildWorker(Worker): repository_build.phase = 'complete' repository_build.save() - except Exception as exc: - log_appender('error', build_logs.PHASE) - logger.exception('Exception when processing request.') - repository_build.phase = 'error' - repository_build.save() - log_appender(str(exc), build_logs.ERROR) - return True + except Exception as exc: + log_appender('error', build_logs.PHASE) + logger.exception('Exception when processing request.') + repository_build.phase = 'error' + repository_build.save() + log_appender(str(exc), build_logs.ERROR) + return True return True @@ -551,10 +539,13 @@ parser.add_argument('-D', action='store_true', default=False, help='Run the worker in daemon mode.') parser.add_argument('--log', default='dockerfilebuild.log', help='Specify the log file for the worker as a daemon.') +parser.add_argument('--cachegb', default=20, type=float, + help='Maximum cache size in gigabytes.') args = parser.parse_args() -worker = DockerfileBuildWorker(dockerfile_build_queue, reservation_seconds=60*60) # 1 hour +ONE_HOUR = 60*60 +worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue, reservation_seconds=ONE_HOUR) if args.D: handler = logging.FileHandler(args.log)