Use a new caching algorithm which can limit the size for the build nodes. Stop treating public images as special. Add a new phase to the builder for pulling.

This commit is contained in:
Jake Moshenko 2014-04-30 18:48:36 -04:00
parent 4e36be1a88
commit 450928674b
5 changed files with 188 additions and 180 deletions

View file

@ -676,6 +676,10 @@ i.toggle-icon:hover {
background-color: #ddd;
}
.phase-icon.pulling {
background-color: #cab442;
}
.phase-icon.building {
background-color: #f0ad4e;
}

View file

@ -4215,6 +4215,9 @@ quayApp.directive('buildMessage', function () {
case 'waiting':
return 'Waiting for available build worker';
case 'pulling':
return 'Pulling base image';
case 'building':
return 'Building image from Dockerfile';
@ -4247,6 +4250,10 @@ quayApp.directive('buildProgress', function () {
controller: function($scope, $element) {
$scope.getPercentage = function(buildInfo) {
switch (buildInfo.phase) {
case 'pulling':
return buildInfo.status.pull_completion * 100;
break;
case 'building':
return (buildInfo.status.current_command / buildInfo.status.total_commands) * 100;
break;

View file

@ -39,7 +39,7 @@ class TestBuildLogs(BuildLogs):
'total_commands': None,
'current_command': None,
'push_completion': 0.0,
'image_completion': {},
'pull_completion': 0.0,
}
def __init__(self, redis_host, namespace, repository, test_build_id):
@ -156,7 +156,6 @@ class TestBuildLogs(BuildLogs):
num_images = random.randint(2, 7)
sizes = [random.randint(one_mb, one_mb * 5) for _ in range(num_images)]
image_completion = {}
for image_num, image_size in enumerate(sizes):
image_id = 'image_id_%s' % image_num
@ -168,8 +167,6 @@ class TestBuildLogs(BuildLogs):
for i in range(one_mb, image_size, one_mb):
image_completion[image_id]['current'] = i
new_status = deepcopy(push_status_template)
new_status['image_completion'] = deepcopy(image_completion)
completion = TestBuildLogs._compute_total_completion(image_completion,
num_images)
new_status['push_completion'] = completion

View file

@ -4,23 +4,27 @@ LINE_CONTINUATION_REGEX = re.compile('\s*\\\s*\n')
COMMAND_REGEX = re.compile('([A-Za-z]+)\s(.*)')
COMMENT_CHARACTER = '#'
LATEST_TAG = 'latest'
class ParsedDockerfile(object):
def __init__(self, commands):
self.commands = commands
def get_commands_of_kind(self, kind):
def _get_commands_of_kind(self, kind):
return [command for command in self.commands if command['command'] == kind]
def get_base_image(self):
image_and_tag = self.get_base_image_and_tag()
if not image_and_tag:
def _get_from_image_identifier(self):
from_commands = self._get_commands_of_kind('FROM')
if not from_commands:
return None
return self.base_image_from_repo_identifier(image_and_tag)
return from_commands[-1]['parameters']
@staticmethod
def base_image_from_repo_identifier(image_and_tag):
def parse_image_identifier(image_identifier):
""" Parses a docker image identifier, and returns a tuple of image name and tag, where the tag
is filled in with "latest" if left unspecified.
"""
# Note:
# Dockerfile images references can be of multiple forms:
# server:port/some/path
@ -28,29 +32,34 @@ class ParsedDockerfile(object):
# server/some/path
# server/some/path:tag
# server:port/some/path:tag
parts = image_and_tag.strip().split(':')
parts = image_identifier.strip().split(':')
if len(parts) == 1:
# somepath
return parts[0]
return (parts[0], LATEST_TAG)
# Otherwise, determine if the last part is a port
# or a tag.
if parts[-1].find('/') >= 0:
# Last part is part of the hostname.
return image_and_tag
return (image_identifier, LATEST_TAG)
# Remaining cases:
# server/some/path:tag
# server:port/some/path:tag
return ':'.join(parts[0:-1])
return (':'.join(parts[0:-1]), parts[-1])
def get_base_image_and_tag(self):
from_commands = self.get_commands_of_kind('FROM')
if not from_commands:
return None
def get_base_image(self):
""" Return the base image without the tag name. """
return self.get_image_and_tag()[0]
return from_commands[-1]['parameters']
def get_image_and_tag(self):
""" Returns the image and tag from the FROM line of the dockerfile. """
image_identifier = self._get_from_image_identifier()
if image_identifier is None:
return (None, None)
return self.parse_image_identifier(image_identifier)
def strip_comments(contents):

View file

@ -8,13 +8,15 @@ import json
import shutil
import tarfile
from docker import Client, APIError
from docker import Client
from docker.errors import APIError
from tempfile import TemporaryFile, mkdtemp
from zipfile import ZipFile
from functools import partial
from datetime import datetime, timedelta
from threading import Event
from uuid import uuid4
from collections import defaultdict
from data.queue import dockerfile_build_queue
from data import model
@ -36,6 +38,7 @@ build_logs = app.config['BUILDLOGS']
TIMEOUT_PERIOD_MINUTES = 20
CACHE_EXPIRATION_PERIOD_HOURS = 24
NO_TAGS = ['<none>:<none>']
class StatusWrapper(object):
@ -45,7 +48,7 @@ class StatusWrapper(object):
'total_commands': None,
'current_command': None,
'push_completion': 0.0,
'image_completion': {},
'pull_completion': 0.0,
}
self.__exit__(None, None, None)
@ -97,11 +100,8 @@ class StreamingDockerClient(Client):
class DockerfileBuildContext(object):
image_id_to_cache_time = {}
private_repo_tags = set()
def __init__(self, build_context_dir, dockerfile_subdir, repo, tag_names,
push_token, build_uuid, pull_credentials=None):
push_token, build_uuid, cache_size_gb, pull_credentials=None):
self._build_dir = build_context_dir
self._dockerfile_subdir = dockerfile_subdir
self._repo = repo
@ -110,7 +110,7 @@ class DockerfileBuildContext(object):
self._status = StatusWrapper(build_uuid)
self._build_logger = partial(build_logs.append_log_message, build_uuid)
self._pull_credentials = pull_credentials
self._public_repos = set()
self._cache_size_gb = cache_size_gb
# Note: We have two different clients here because we (potentially) login
# with both, but with different credentials that we do not want shared between
@ -120,6 +120,8 @@ class DockerfileBuildContext(object):
dockerfile_path = os.path.join(self._build_dir, dockerfile_subdir,
'Dockerfile')
if not os.path.exists(dockerfile_path):
raise RuntimeError('Build job did not contain a Dockerfile.')
# Compute the number of steps
with open(dockerfile_path, 'r') as dockerfileobj:
@ -131,18 +133,17 @@ class DockerfileBuildContext(object):
with open(dockerfile_path, 'w') as dockerfileobj:
dockerfileobj.write(serialize_dockerfile(self._parsed_dockerfile))
logger.debug('Will build and push to repo %s with tags named: %s' %
(self._repo, self._tag_names))
logger.debug('Will build and push to repo %s with tags named: %s', self._repo,
self._tag_names)
def __enter__(self):
self.__cleanup_containers()
self.__evict_expired_images()
self.__cleanup()
self.__cleanup_images()
self.__prune_cache()
return self
def __exit__(self, exc_type, value, traceback):
self.__cleanup_containers()
self.__cleanup()
shutil.rmtree(self._build_dir)
@ -159,15 +160,41 @@ class DockerfileBuildContext(object):
parsed_dockerfile.commands.insert(new_command_index, env_command)
break
@staticmethod
def __total_completion(statuses, total_images):
percentage_with_sizes = float(len(statuses.values()))/total_images
sent_bytes = sum([status[u'current'] for status in statuses.values()])
total_bytes = sum([status[u'total'] for status in statuses.values()])
sent_bytes = sum([status['current'] for status in statuses.values()])
total_bytes = sum([status['total'] for status in statuses.values()])
return float(sent_bytes)/total_bytes*percentage_with_sizes
def build(self):
@staticmethod
def __monitor_completion(status_stream, required_message, status_updater, status_completion_key,
num_images=0):
images = {}
for status in status_stream:
logger.debug('%s: %s', status_completion_key, status)
if 'status' in status:
status_msg = status['status']
if status_msg == required_message:
if 'progressDetail' in status and 'id' in status:
image_id = status['id']
detail = status['progressDetail']
if 'current' in detail and 'total' in detail:
images[image_id] = detail
with status_updater as status_update:
status_update[status_completion_key] = \
DockerfileBuildContext.__total_completion(images, max(len(images), num_images))
elif 'errorDetail' in status:
message = 'Error pushing image.'
if 'message' in status['errorDetail']:
message = str(status['errorDetail']['message'])
raise RuntimeError(message)
def pull(self):
# Login with the specified credentials (if any).
if self._pull_credentials:
logger.debug('Logging in with pull credentials: %s@%s',
@ -176,21 +203,24 @@ class DockerfileBuildContext(object):
registry=self._pull_credentials['registry'], reauth=True)
# Pull the image, in case it was updated since the last build
base_image = self._parsed_dockerfile.get_base_image()
self._build_logger('Pulling base image: %s' % base_image)
self._build_cl.pull(base_image)
image_and_tag = ':'.join(self._parsed_dockerfile.get_image_and_tag())
self._build_logger('Pulling base image: %s' % image_and_tag)
pull_status = self._build_cl.pull(image_and_tag, stream=True)
self.__monitor_completion(pull_status, 'Downloading', self._status, 'pull_completion')
def build(self):
# Start the build itself.
logger.debug('Starting build.')
with self._status as status:
status['total_commands'] = self._num_steps
logger.debug('Building to tags named: %s' % self._tag_names)
logger.debug('Building to tags named: %s', self._tag_names)
context_path = os.path.join(self._build_dir, self._dockerfile_subdir)
logger.debug('Final context path: %s exists: %s' %
(context_path, os.path.exists(context_path)))
logger.debug('Final context path: %s exists: %s', context_path,
os.path.exists(context_path))
build_status = self._build_cl.build(path=context_path, stream=True)
@ -216,7 +246,7 @@ class DockerfileBuildContext(object):
if step_increment:
self._build_logger(status_str, build_logs.COMMAND)
current_step = int(step_increment.group(1))
logger.debug('Step now: %s/%s' % (current_step, self._num_steps))
logger.debug('Step now: %s/%s', current_step, self._num_steps)
with self._status as status_update:
status_update['current_command'] = current_step
continue
@ -226,7 +256,7 @@ class DockerfileBuildContext(object):
complete = re.match(r'Successfully built ([a-z0-9]+)$', status_str)
if complete:
built_image = complete.group(1)
logger.debug('Final image ID is: %s' % built_image)
logger.debug('Final image ID is: %s', built_image)
continue
# Get the image count
@ -243,7 +273,7 @@ class DockerfileBuildContext(object):
for protocol in ['https', 'http']:
registry_endpoint = '%s://%s/v1/' % (protocol, host.group(1))
logger.debug('Attempting login to registry: %s' % registry_endpoint)
logger.debug('Attempting login to registry: %s', registry_endpoint)
try:
self._push_cl.login('$token', self._push_token, registry=registry_endpoint)
@ -252,151 +282,103 @@ class DockerfileBuildContext(object):
pass # Probably the wrong protocol
for tag in self._tag_names:
logger.debug('Tagging image %s as %s:%s' %
(built_image, self._repo, tag))
logger.debug('Tagging image %s as %s:%s', built_image, self._repo, tag)
self._push_cl.tag(built_image, self._repo, tag)
history = json.loads(self._push_cl.history(built_image))
num_images = len(history)
with self._status as status:
status['total_images'] = num_images
logger.debug('Pushing to repo %s' % self._repo)
logger.debug('Pushing to repo %s', self._repo)
resp = self._push_cl.push(self._repo, stream=True)
for status in resp:
logger.debug('Status: %s', status)
if u'status' in status:
status_msg = status[u'status']
if status_msg == 'Pushing':
if u'progressDetail' in status and u'id' in status:
image_id = status[u'id']
detail = status[u'progressDetail']
if u'current' in detail and 'total' in detail:
with self._status as status:
images = status['image_completion']
images[image_id] = detail
status['push_completion'] = \
DockerfileBuildContext.__total_completion(images, num_images)
elif u'errorDetail' in status:
message = 'Error pushing image.'
if u'message' in status[u'errorDetail']:
message = str(status[u'errorDetail'][u'message'])
raise RuntimeError(message)
def __is_repo_public(self, repo_name):
if repo_name in self._public_repos:
return True
repo_portions = repo_name.split('/')
registry_hostname = 'index.docker.io'
local_repo_name = repo_name
if len(repo_portions) > 2:
registry_hostname = repo_portions[0]
local_repo_name = '/'.join(repo_portions[1:])
repo_url_template = '%s://%s/v1/repositories/%s/images'
protocols = ['https', 'http']
secure_repo_url, repo_url = [repo_url_template % (protocol, registry_hostname, local_repo_name)
for protocol in protocols]
try:
try:
repo_info = requests.get(secure_repo_url)
except requests.exceptions.SSLError:
repo_info = requests.get(repo_url)
except requests.exceptions.ConnectionError:
return False
if repo_info.status_code / 100 == 2:
self._public_repos.add(repo_name)
return True
else:
return False
self.__monitor_completion(resp, 'Pushing', self._status, 'push_completion', num_images)
def __cleanup_containers(self):
# First clean up any containers that might be holding the images
for running in self._build_cl.containers(quiet=True):
logger.debug('Killing container: %s' % running['Id'])
logger.debug('Killing container: %s', running['Id'])
self._build_cl.kill(running['Id'])
# Next, remove all of the containers (which should all now be killed)
for container in self._build_cl.containers(all=True, quiet=True):
logger.debug('Removing container: %s' % container['Id'])
logger.debug('Removing container: %s', container['Id'])
self._build_cl.remove_container(container['Id'])
def __evict_expired_images(self):
logger.debug('Cleaning images older than %s hours.', CACHE_EXPIRATION_PERIOD_HOURS)
def __cleanup_images(self):
""" Remove tags on internal nodes, and remove images older than the expiratino time. """
ids_to_images, ids_to_children = self.__compute_image_graph()
# Untag all internal nodes, which are usually the base images
for internal_id in ids_to_children.keys():
internal = ids_to_images[internal_id]
if internal['RepoTags'] != NO_TAGS:
for tag_name in internal['RepoTags']:
self._build_cl.remove_image(tag_name)
# Make sure all of the leaves have gibberish tags, and remove those older than our expiration
leaves = set(ids_to_images.keys()) - set(ids_to_children.keys())
now = datetime.now()
verify_removed = set()
for leaf_id in leaves:
leaf = ids_to_images[leaf_id]
created = datetime.fromtimestamp(leaf['Created'])
expiration = created + timedelta(hours=CACHE_EXPIRATION_PERIOD_HOURS)
if expiration > now:
# Assign a new tag as a uuid to preserve this image
new_tag = str(uuid4())
self._build_cl.tag(leaf['Id'], new_tag)
# Remove all of the existing tags
if leaf['RepoTags'] != NO_TAGS:
for tag_name in leaf['RepoTags']:
self._build_cl.remove_image(tag_name)
def __prune_cache(self):
""" Remove the oldest leaf image until the cache size is the desired size. """
logger.debug('Pruning cache to size(gb): %s', self._cache_size_gb)
while self.__compute_cache_size_gb() > self._cache_size_gb:
logger.debug('Locating the oldest image in the cache to prune.')
# Find the oldest tagged image and remove it
oldest_creation_time = datetime.max
oldest_image = None
for image in self._build_cl.images():
image_id = image[u'Id']
created = datetime.fromtimestamp(image[u'Created'])
created = datetime.fromtimestamp(image['Created'])
if created < oldest_creation_time:
oldest_creation_time = created
oldest_image = image
# If we don't have a cache time, use the created time (e.g. worker reboot)
cache_time = self.image_id_to_cache_time.get(image_id, created)
expiration = cache_time + timedelta(hours=CACHE_EXPIRATION_PERIOD_HOURS)
if expiration < now:
logger.debug('Removing expired image: %s' % image_id)
for tag in image['RepoTags']:
# We can forget about this particular tag if it was indeed one of our renamed tags
self.private_repo_tags.discard(tag)
try:
self._build_cl.remove_image(tag)
except APIError:
# Sometimes an upstream image removed this one
pass
try:
self._build_cl.remove_image(image_id)
except APIError:
# Sometimes an upstream image removed this one
pass
verify_removed.add(image_id)
# Verify that our images were actually removed
for image in self._build_cl.images():
if image['Id'] in verify_removed:
logger.warning('Image was not removed: %s' % image['Id'])
def __cleanup(self):
# Iterate all of the images and rename the ones that aren't public. This should preserve
# base images and also allow the cache to function.
now = datetime.now()
for image in self._build_cl.images():
image_id = image[u'Id']
if image_id not in self.image_id_to_cache_time:
logger.debug('Setting image %s cache time to %s', image_id, now)
self.image_id_to_cache_time[image_id] = now
for tag in image['RepoTags']:
tag_repo = ParsedDockerfile.base_image_from_repo_identifier(tag)
if tag_repo != '<none>':
if tag_repo in self.private_repo_tags:
logger.debug('Repo is private and has already been renamed: %s' % tag_repo)
elif self.__is_repo_public(tag_repo):
logger.debug('Repo was deemed public: %s', tag_repo)
logger.debug('Removing oldest image from cache: %s', oldest_image['Id'])
# Remove all tags on the oldest image
if oldest_image['RepoTags'] == NO_TAGS:
# Remove the image id directly since there are no tags
self._build_cl.remove_image(oldest_image['Id'])
else:
new_name = str(uuid4())
logger.debug('Private repo tag being renamed %s -> %s', tag, new_name)
self._build_cl.tag(image_id, new_name)
self._build_cl.remove_image(tag)
self.private_repo_tags.add(new_name)
# Remove all tags
for tag_name in oldest_image['RepoTags']:
self._build_cl.remove_image(tag_name)
def __compute_cache_size_gb(self):
all_images = self._build_cl.images(all=True)
size_in_bytes = sum([img['Size'] for img in all_images])
size_in_gb = float(size_in_bytes)/1024/1024/1024
logger.debug('Computed cache size(gb) of: %s', size_in_gb)
return size_in_gb
def __compute_image_graph(self):
all_images = self._build_cl.images(all=True)
ids_to_images = {}
ids_to_children = defaultdict(list)
for image in all_images:
if image['ParentId'] != '':
ids_to_children[image['ParentId']].append(image)
ids_to_images[image['Id']] = image
return (ids_to_images, ids_to_children)
class DockerfileBuildWorker(Worker):
def __init__(self, *vargs, **kwargs):
def __init__(self, cache_size_gb, *vargs, **kwargs):
super(DockerfileBuildWorker, self).__init__(*vargs, **kwargs)
self._mime_processors = {
@ -410,6 +392,7 @@ class DockerfileBuildWorker(Worker):
}
self._timeout = Event()
self._cache_size_gb = cache_size_gb
@staticmethod
def __prepare_zip(request_file):
@ -449,12 +432,12 @@ class DockerfileBuildWorker(Worker):
# Iterate the running containers and kill ones that have been running more than 20 minutes
for container in docker_cl.containers():
start_time = datetime.fromtimestamp(container[u'Created'])
start_time = datetime.fromtimestamp(container['Created'])
running_time = datetime.now() - start_time
if running_time > timedelta(minutes=TIMEOUT_PERIOD_MINUTES):
logger.warning('Container has been running too long: %s with command: %s',
container[u'Id'], container[u'Command'])
docker_cl.kill(container[u'Id'])
container['Id'], container['Command'])
docker_cl.kill(container['Id'])
self._timeout.set()
def process_queue_item(self, job_details):
@ -503,14 +486,19 @@ class DockerfileBuildWorker(Worker):
return True
build_dir = self._mime_processors[c_type](docker_resource)
try:
with DockerfileBuildContext(build_dir, build_subdir, repo, tag_names, access_token,
repository_build.uuid, self._cache_size_gb,
pull_credentials) as build_ctxt:
log_appender('pulling', build_logs.PHASE)
repository_build.phase = 'pulling'
repository_build.save()
build_ctxt.pull()
log_appender('building', build_logs.PHASE)
repository_build.phase = 'building'
repository_build.save()
with DockerfileBuildContext(build_dir, build_subdir, repo, tag_names,
access_token,
repository_build.uuid, pull_credentials) as build_ctxt:
try:
built_image = build_ctxt.build()
if not built_image:
@ -551,10 +539,13 @@ parser.add_argument('-D', action='store_true', default=False,
help='Run the worker in daemon mode.')
parser.add_argument('--log', default='dockerfilebuild.log',
help='Specify the log file for the worker as a daemon.')
parser.add_argument('--cachegb', default=20, type=float,
help='Maximum cache size in gigabytes.')
args = parser.parse_args()
worker = DockerfileBuildWorker(dockerfile_build_queue, reservation_seconds=60*60) # 1 hour
ONE_HOUR = 60*60
worker = DockerfileBuildWorker(args.cachegb, dockerfile_build_queue, reservation_seconds=ONE_HOUR)
if args.D:
handler = logging.FileHandler(args.log)