Merge pull request #155 from coreos-inc/asyncgc
Garbage Collection Optimizations And Async
This commit is contained in:
commit
dbd9a32c85
7 changed files with 154 additions and 27 deletions
2
conf/init/service/gcworker/log/run
Executable file
2
conf/init/service/gcworker/log/run
Executable file
|
@ -0,0 +1,2 @@
|
||||||
|
#!/bin/sh
|
||||||
|
exec logger -i -t gcworker
|
8
conf/init/service/gcworker/run
Executable file
8
conf/init/service/gcworker/run
Executable file
|
@ -0,0 +1,8 @@
|
||||||
|
#! /bin/bash
|
||||||
|
|
||||||
|
echo 'Starting GC worker'
|
||||||
|
|
||||||
|
cd /
|
||||||
|
venv/bin/python -m workers.gcworker 2>&1
|
||||||
|
|
||||||
|
echo 'Repository GC exited'
|
|
@ -231,3 +231,6 @@ class DefaultConfig(object):
|
||||||
'#7f7f7f', '#c7c7c7', '#bcbd22', '#1f77b4', '#17becf', '#9edae5', '#393b79',
|
'#7f7f7f', '#c7c7c7', '#bcbd22', '#1f77b4', '#17becf', '#9edae5', '#393b79',
|
||||||
'#5254a3', '#6b6ecf', '#9c9ede', '#9ecae1', '#31a354', '#b5cf6b', '#a1d99b',
|
'#5254a3', '#6b6ecf', '#9c9ede', '#9ecae1', '#31a354', '#b5cf6b', '#a1d99b',
|
||||||
'#8c6d31', '#ad494a', '#e7ba52', '#a55194']
|
'#8c6d31', '#ad494a', '#e7ba52', '#a55194']
|
||||||
|
|
||||||
|
# Experiment: Async garbage collection
|
||||||
|
EXP_ASYNC_GARBAGE_COLLECTION = []
|
||||||
|
|
|
@ -4,10 +4,11 @@ from peewee import JOIN_LEFT_OUTER, fn
|
||||||
from datetime import timedelta, datetime
|
from datetime import timedelta, datetime
|
||||||
|
|
||||||
from data.model import (DataModelException, tag, db_transaction, storage, image, permission,
|
from data.model import (DataModelException, tag, db_transaction, storage, image, permission,
|
||||||
_basequery)
|
_basequery, config)
|
||||||
from data.database import (Repository, Namespace, RepositoryTag, Star, Image, ImageStorage, User,
|
from data.database import (Repository, Namespace, RepositoryTag, Star, Image, ImageStorage, User,
|
||||||
Visibility, RepositoryPermission, TupleSelector, RepositoryActionCount,
|
Visibility, RepositoryPermission, TupleSelector, RepositoryActionCount,
|
||||||
Role, RepositoryAuthorizedEmail, db_for_update)
|
Role, RepositoryAuthorizedEmail, db_for_update, get_epoch_timestamp,
|
||||||
|
db_random_func)
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -57,40 +58,82 @@ def purge_repository(namespace_name, repository_name):
|
||||||
fetched.delete_instance(recursive=True, delete_nullable=False)
|
fetched.delete_instance(recursive=True, delete_nullable=False)
|
||||||
|
|
||||||
|
|
||||||
def garbage_collect_repository(namespace_name, repository_name):
|
def find_repository_with_garbage(filter_list=None):
|
||||||
storage_id_whitelist = {}
|
# TODO(jschorr): Remove the filter once we have turned the experiment on for everyone.
|
||||||
|
if filter_list is not None and not filter_list:
|
||||||
|
return None
|
||||||
|
|
||||||
tag.garbage_collect_tags(namespace_name, repository_name)
|
epoch_timestamp = get_epoch_timestamp()
|
||||||
|
|
||||||
with db_transaction():
|
try:
|
||||||
# TODO (jake): We could probably select this and all the images in a single query using
|
candidates = (RepositoryTag
|
||||||
# a different kind of join.
|
.select(RepositoryTag.repository)
|
||||||
|
|
||||||
# Get a list of all images used by tags in the repository
|
|
||||||
tag_query = (RepositoryTag
|
|
||||||
.select(RepositoryTag, Image, ImageStorage)
|
|
||||||
.join(Image)
|
|
||||||
.join(ImageStorage, JOIN_LEFT_OUTER)
|
|
||||||
.switch(RepositoryTag)
|
|
||||||
.join(Repository)
|
.join(Repository)
|
||||||
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
||||||
.where(Repository.name == repository_name, Namespace.username == namespace_name))
|
.where(~(RepositoryTag.lifetime_end_ts >> None),
|
||||||
|
(RepositoryTag.lifetime_end_ts <=
|
||||||
|
(epoch_timestamp - Namespace.removed_tag_expiration_s)))
|
||||||
|
.limit(500)
|
||||||
|
.alias('candidates'))
|
||||||
|
|
||||||
|
if filter_list:
|
||||||
|
candidates = candidates.where(Namespace.username << filter_list)
|
||||||
|
|
||||||
|
found = (RepositoryTag
|
||||||
|
.select(candidates.c.repository_id)
|
||||||
|
.from_(candidates)
|
||||||
|
.order_by(db_random_func())
|
||||||
|
.get())
|
||||||
|
|
||||||
|
if found is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
return Repository.get(Repository.id == found.repository_id)
|
||||||
|
except RepositoryTag.DoesNotExist:
|
||||||
|
return None
|
||||||
|
except Repository.DoesNotExist:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def garbage_collect_repository(namespace_name, repository_name):
|
||||||
|
# If the namespace is the async experiment, don't perform garbage collection here.
|
||||||
|
# TODO(jschorr): Remove this check once we have turned the experiment on for everyone.
|
||||||
|
if namespace_name in config.app_config.get('EXP_ASYNC_GARBAGE_COLLECTION', []):
|
||||||
|
return
|
||||||
|
|
||||||
|
repo = get_repository(namespace_name, repository_name)
|
||||||
|
if repo is not None:
|
||||||
|
garbage_collect_repo(repo)
|
||||||
|
|
||||||
|
|
||||||
|
def garbage_collect_repo(repo):
|
||||||
|
logger.debug('Garbage collecting repository %s', repo.id)
|
||||||
|
|
||||||
|
storage_id_whitelist = {}
|
||||||
|
tag.garbage_collect_tags(repo)
|
||||||
|
|
||||||
|
with db_transaction():
|
||||||
|
# Get a list of all images used by tags in the repository
|
||||||
|
tagged_images = (Image
|
||||||
|
.select(Image.id, Image.ancestors)
|
||||||
|
.join(RepositoryTag)
|
||||||
|
.where(Image.repository == repo))
|
||||||
|
|
||||||
referenced_ancestors = set()
|
referenced_ancestors = set()
|
||||||
for one_tag in tag_query:
|
for tagged_image in tagged_images:
|
||||||
# The ancestor list is in the format '/1/2/3/', extract just the ids
|
# The ancestor list is in the format '/1/2/3/', extract just the ids
|
||||||
ancestor_id_strings = one_tag.image.ancestors.split('/')[1:-1]
|
ancestor_id_strings = tagged_image.ancestors.split('/')[1:-1]
|
||||||
ancestor_list = [int(img_id_str) for img_id_str in ancestor_id_strings]
|
ancestor_list = [int(img_id_str) for img_id_str in ancestor_id_strings]
|
||||||
referenced_ancestors = referenced_ancestors.union(set(ancestor_list))
|
referenced_ancestors = referenced_ancestors.union(set(ancestor_list))
|
||||||
referenced_ancestors.add(one_tag.image.id)
|
referenced_ancestors.add(tagged_image.id)
|
||||||
|
|
||||||
all_repo_images = image.get_repository_images(namespace_name, repository_name)
|
all_repo_images = Image.select(Image.id, Image.storage).where(Image.repository == repo)
|
||||||
all_images = {int(img.id): img for img in all_repo_images}
|
all_images = {int(img.id): img for img in all_repo_images}
|
||||||
to_remove = set(all_images.keys()).difference(referenced_ancestors)
|
to_remove = set(all_images.keys()).difference(referenced_ancestors)
|
||||||
|
|
||||||
if len(to_remove) > 0:
|
if len(to_remove) > 0:
|
||||||
logger.info('Cleaning up unreferenced images: %s', to_remove)
|
logger.info('Cleaning up unreferenced images: %s', to_remove)
|
||||||
storage_id_whitelist = {all_images[to_remove_id].storage.id for to_remove_id in to_remove}
|
storage_id_whitelist = {all_images[to_remove_id].storage_id for to_remove_id in to_remove}
|
||||||
Image.delete().where(Image.id << list(to_remove)).execute()
|
Image.delete().where(Image.id << list(to_remove)).execute()
|
||||||
|
|
||||||
if len(to_remove) > 0:
|
if len(to_remove) > 0:
|
||||||
|
|
|
@ -97,9 +97,7 @@ def delete_tag(namespace_name, repository_name, tag_name):
|
||||||
found.save()
|
found.save()
|
||||||
|
|
||||||
|
|
||||||
def garbage_collect_tags(namespace_name, repository_name):
|
def garbage_collect_tags(repo):
|
||||||
# We do this without using a join to prevent holding read locks on the repository table
|
|
||||||
repo = _basequery.get_existing_repository(namespace_name, repository_name)
|
|
||||||
expired_time = get_epoch_timestamp() - repo.namespace_user.removed_tag_expiration_s
|
expired_time = get_epoch_timestamp() - repo.namespace_user.removed_tag_expiration_s
|
||||||
|
|
||||||
tags_to_delete = list(RepositoryTag
|
tags_to_delete = list(RepositoryTag
|
||||||
|
|
|
@ -10,7 +10,7 @@ PUBLIC_USER = 'public'
|
||||||
|
|
||||||
REPO = 'somerepo'
|
REPO = 'somerepo'
|
||||||
|
|
||||||
class TestGarbageColection(unittest.TestCase):
|
class TestGarbageCollection(unittest.TestCase):
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _set_tag_expiration_policy(namespace, expiration_s):
|
def _set_tag_expiration_policy(namespace, expiration_s):
|
||||||
namespace_user = model.user.get_user(namespace)
|
namespace_user = model.user.get_user(namespace)
|
||||||
|
@ -80,9 +80,11 @@ class TestGarbageColection(unittest.TestCase):
|
||||||
def gcNow(self, repository):
|
def gcNow(self, repository):
|
||||||
model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name)
|
model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name)
|
||||||
|
|
||||||
def deleteTag(self, repository, tag):
|
def deleteTag(self, repository, tag, perform_gc=True):
|
||||||
model.tag.delete_tag(repository.namespace_user.username, repository.name, tag)
|
model.tag.delete_tag(repository.namespace_user.username, repository.name, tag)
|
||||||
model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name)
|
if perform_gc:
|
||||||
|
model.repository.garbage_collect_repository(repository.namespace_user.username,
|
||||||
|
repository.name)
|
||||||
|
|
||||||
def moveTag(self, repository, tag, docker_image_id):
|
def moveTag(self, repository, tag, docker_image_id):
|
||||||
model.tag.create_or_update_tag(repository.namespace_user.username, repository.name, tag,
|
model.tag.create_or_update_tag(repository.namespace_user.username, repository.name, tag,
|
||||||
|
@ -105,6 +107,47 @@ class TestGarbageColection(unittest.TestCase):
|
||||||
|
|
||||||
self.fail('Expected image %s to be deleted' % docker_image_id)
|
self.fail('Expected image %s to be deleted' % docker_image_id)
|
||||||
|
|
||||||
|
def test_has_garbage(self):
|
||||||
|
""" Remove all existing repositories, then add one without garbage, check, then add one with
|
||||||
|
garbage, and check again.
|
||||||
|
"""
|
||||||
|
# Delete all existing repos.
|
||||||
|
for repo in database.Repository.select():
|
||||||
|
repo.delete_instance(recursive=True)
|
||||||
|
|
||||||
|
# Change the time machine expiration on the namespace.
|
||||||
|
(database.User.update(removed_tag_expiration_s=1000000000)
|
||||||
|
.where(database.User.username == ADMIN_ACCESS_USER)
|
||||||
|
.execute())
|
||||||
|
|
||||||
|
# Create a repository without any garbage.
|
||||||
|
repository = self.createRepository(latest=['i1', 'i2', 'i3'])
|
||||||
|
|
||||||
|
# Ensure that no repositories are returned by the has garbage check.
|
||||||
|
self.assertIsNone(model.repository.find_repository_with_garbage())
|
||||||
|
|
||||||
|
# Delete a tag.
|
||||||
|
self.deleteTag(repository, 'latest', perform_gc=False)
|
||||||
|
|
||||||
|
# There should still not be any repositories with garbage, due to time machine.
|
||||||
|
self.assertIsNone(model.repository.find_repository_with_garbage())
|
||||||
|
|
||||||
|
# Change the time machine expiration on the namespace.
|
||||||
|
(database.User.update(removed_tag_expiration_s=0)
|
||||||
|
.where(database.User.username == ADMIN_ACCESS_USER)
|
||||||
|
.execute())
|
||||||
|
|
||||||
|
# Now we should find the repository for GC.
|
||||||
|
repository = model.repository.find_repository_with_garbage()
|
||||||
|
self.assertIsNotNone(repository)
|
||||||
|
self.assertEquals(REPO, repository.name)
|
||||||
|
|
||||||
|
# GC the repository.
|
||||||
|
model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name)
|
||||||
|
|
||||||
|
# There should now be no repositories with garbage.
|
||||||
|
self.assertIsNone(model.repository.find_repository_with_garbage())
|
||||||
|
|
||||||
|
|
||||||
def test_one_tag(self):
|
def test_one_tag(self):
|
||||||
""" Create a repository with a single tag, then remove that tag and verify that the repository
|
""" Create a repository with a single tag, then remove that tag and verify that the repository
|
||||||
|
|
30
workers/gcworker.py
Normal file
30
workers/gcworker.py
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from apscheduler.schedulers.blocking import BlockingScheduler
|
||||||
|
|
||||||
|
from app import app
|
||||||
|
from data.database import UseThenDisconnect
|
||||||
|
from data.model.repository import find_repository_with_garbage, garbage_collect_repo
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
sched = BlockingScheduler()
|
||||||
|
|
||||||
|
@sched.scheduled_job(trigger='interval', seconds=10)
|
||||||
|
def garbage_collect_repositories():
|
||||||
|
""" Performs garbage collection on repositories. """
|
||||||
|
|
||||||
|
with UseThenDisconnect(app.config):
|
||||||
|
repository = find_repository_with_garbage(app.config.get('EXP_ASYNC_GARBAGE_COLLECTION', []))
|
||||||
|
if repository is None:
|
||||||
|
logger.debug('No repository with garbage found')
|
||||||
|
return False
|
||||||
|
|
||||||
|
logger.debug('Starting GC of repository #%s (%s)', repository.id, repository.name)
|
||||||
|
garbage_collect_repo(repository)
|
||||||
|
logger.debug('Finished GC of repository #%s (%s)', repository.id, repository.name)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
sched.start()
|
Reference in a new issue