From acd86008c889e7d2b4924135bdcb59f1c7046005 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 19 Jun 2015 14:55:30 -0400 Subject: [PATCH 1/4] Switch tag deletion to use a single query --- data/model/repository.py | 78 +++++++++++++++++++++++++++------------- data/model/tag.py | 21 ++++------- 2 files changed, 60 insertions(+), 39 deletions(-) diff --git a/data/model/repository.py b/data/model/repository.py index 90857f93c..99115847b 100644 --- a/data/model/repository.py +++ b/data/model/repository.py @@ -7,7 +7,8 @@ from data.model import (DataModelException, tag, db_transaction, storage, image, _basequery) from data.database import (Repository, Namespace, RepositoryTag, Star, Image, ImageStorage, User, Visibility, RepositoryPermission, TupleSelector, RepositoryActionCount, - Role, RepositoryAuthorizedEmail, db_for_update) + Role, RepositoryAuthorizedEmail, db_for_update, get_epoch_timestamp, + db_random_func) logger = logging.getLogger(__name__) @@ -57,40 +58,67 @@ def purge_repository(namespace_name, repository_name): fetched.delete_instance(recursive=True, delete_nullable=False) -def garbage_collect_repository(namespace_name, repository_name): - storage_id_whitelist = {} +def find_repository_with_garbage(): + epoch_timestamp = get_epoch_timestamp() - tag.garbage_collect_tags(namespace_name, repository_name) + try: + candidates = (RepositoryTag + .select(RepositoryTag.repository) + .join(Repository) + .join(Namespace, on=(Repository.namespace_user == Namespace.id)) + .where(~(RepositoryTag.lifetime_end_ts >> None), + (RepositoryTag.lifetime_end_ts <= + (epoch_timestamp - Namespace.removed_tag_expiration_s))) + .limit(500) + .alias('candidates')) + + found = (RepositoryTag + .select(candidates.c.repository) + .from_(candidates) + .order_by(db_random_func()) + .get()) + if not found: + return + + return Repository.get(Repository.id == found) + + except RepositoryTag.DoesNotExist: + return None + except Repository.DoesNotExist: + return None + + +def garbage_collect_repository(namespace_name, repository_name): + repo = get_repository(namespace_name, repository_name) + garbage_collect_repo(repo) + + +def garbage_collect_repo(repo): + storage_id_whitelist = {} + tag.garbage_collect_tags(repo) with db_transaction(): - # TODO (jake): We could probably select this and all the images in a single query using - # a different kind of join. - # Get a list of all images used by tags in the repository - tag_query = (RepositoryTag - .select(RepositoryTag, Image, ImageStorage) - .join(Image) - .join(ImageStorage, JOIN_LEFT_OUTER) - .switch(RepositoryTag) - .join(Repository) - .join(Namespace, on=(Repository.namespace_user == Namespace.id)) - .where(Repository.name == repository_name, Namespace.username == namespace_name)) + tagged_images = (Image + .select(Image.id, Image.ancestors) + .join(RepositoryTag) + .where(Image.repository == repo)) - referenced_ancestors = set() - for one_tag in tag_query: - # The ancestor list is in the format '/1/2/3/', extract just the ids - ancestor_id_strings = one_tag.image.ancestors.split('/')[1:-1] - ancestor_list = [int(img_id_str) for img_id_str in ancestor_id_strings] - referenced_ancestors = referenced_ancestors.union(set(ancestor_list)) - referenced_ancestors.add(one_tag.image.id) + referenced_anscestors = set() + for tagged_image in tagged_images: + # The anscestor list is in the format '/1/2/3/', extract just the ids + anscestor_id_strings = tagged_image.ancestors.split('/')[1:-1] + ancestor_list = [int(img_id_str) for img_id_str in anscestor_id_strings] + referenced_anscestors = referenced_anscestors.union(set(ancestor_list)) + referenced_anscestors.add(tagged_image.id) - all_repo_images = image.get_repository_images(namespace_name, repository_name) + all_repo_images = Image.select(Image.id, Image.storage).where(Image.repository == repo) all_images = {int(img.id): img for img in all_repo_images} - to_remove = set(all_images.keys()).difference(referenced_ancestors) + to_remove = set(all_images.keys()).difference(referenced_anscestors) if len(to_remove) > 0: logger.info('Cleaning up unreferenced images: %s', to_remove) - storage_id_whitelist = {all_images[to_remove_id].storage.id for to_remove_id in to_remove} + storage_id_whitelist = {all_images[to_remove_id].storage_id for to_remove_id in to_remove} Image.delete().where(Image.id << list(to_remove)).execute() if len(to_remove) > 0: diff --git a/data/model/tag.py b/data/model/tag.py index 7f4dc93e4..9ac1ea897 100644 --- a/data/model/tag.py +++ b/data/model/tag.py @@ -97,22 +97,15 @@ def delete_tag(namespace_name, repository_name, tag_name): found.save() -def garbage_collect_tags(namespace_name, repository_name): - # We do this without using a join to prevent holding read locks on the repository table - repo = _basequery.get_existing_repository(namespace_name, repository_name) +def garbage_collect_tags(repo): expired_time = get_epoch_timestamp() - repo.namespace_user.removed_tag_expiration_s - tags_to_delete = list(RepositoryTag - .select(RepositoryTag.id) - .where(RepositoryTag.repository == repo, - ~(RepositoryTag.lifetime_end_ts >> None), - (RepositoryTag.lifetime_end_ts <= expired_time)) - .order_by(RepositoryTag.id)) - if len(tags_to_delete) > 0: - (RepositoryTag - .delete() - .where(RepositoryTag.id << tags_to_delete) - .execute()) + (RepositoryTag + .delete() + .where(RepositoryTag.repository == repo, + ~(RepositoryTag.lifetime_end_ts >> None), + (RepositoryTag.lifetime_end_ts <= expired_time)) + .execute()) def get_tag_image(namespace_name, repository_name, tag_name): From 70de107268cf796bdc41456d1491f1a1ba2e2701 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 19 Jun 2015 14:55:44 -0400 Subject: [PATCH 2/4] Make GC of repositories fully async for whitelisted namespaces This change adds a worker to conduct GC on repositories with garbage every 10s. Fixes #144 --- conf/init/service/gcworker/log/run | 2 ++ conf/init/service/gcworker/run | 8 +++++ config.py | 3 ++ data/model/repository.py | 29 +++++++++++++----- test/test_gc.py | 49 ++++++++++++++++++++++++++++-- workers/gcworker.py | 30 ++++++++++++++++++ 6 files changed, 111 insertions(+), 10 deletions(-) create mode 100755 conf/init/service/gcworker/log/run create mode 100755 conf/init/service/gcworker/run create mode 100644 workers/gcworker.py diff --git a/conf/init/service/gcworker/log/run b/conf/init/service/gcworker/log/run new file mode 100755 index 000000000..cf6bdc1d7 --- /dev/null +++ b/conf/init/service/gcworker/log/run @@ -0,0 +1,2 @@ +#!/bin/sh +exec logger -i -t gcworker \ No newline at end of file diff --git a/conf/init/service/gcworker/run b/conf/init/service/gcworker/run new file mode 100755 index 000000000..6a843d4b8 --- /dev/null +++ b/conf/init/service/gcworker/run @@ -0,0 +1,8 @@ +#! /bin/bash + +echo 'Starting GC worker' + +cd / +venv/bin/python -m workers.gcworker 2>&1 + +echo 'Repository GC exited' \ No newline at end of file diff --git a/config.py b/config.py index e99f32aca..7a81c68cd 100644 --- a/config.py +++ b/config.py @@ -231,3 +231,6 @@ class DefaultConfig(object): '#7f7f7f', '#c7c7c7', '#bcbd22', '#1f77b4', '#17becf', '#9edae5', '#393b79', '#5254a3', '#6b6ecf', '#9c9ede', '#9ecae1', '#31a354', '#b5cf6b', '#a1d99b', '#8c6d31', '#ad494a', '#e7ba52', '#a55194'] + + # Experiment: Async garbage collection + EXP_ASYNC_GARBAGE_COLLECTION = [] diff --git a/data/model/repository.py b/data/model/repository.py index 99115847b..937e51a52 100644 --- a/data/model/repository.py +++ b/data/model/repository.py @@ -4,7 +4,7 @@ from peewee import JOIN_LEFT_OUTER, fn from datetime import timedelta, datetime from data.model import (DataModelException, tag, db_transaction, storage, image, permission, - _basequery) + _basequery, config) from data.database import (Repository, Namespace, RepositoryTag, Star, Image, ImageStorage, User, Visibility, RepositoryPermission, TupleSelector, RepositoryActionCount, Role, RepositoryAuthorizedEmail, db_for_update, get_epoch_timestamp, @@ -58,7 +58,11 @@ def purge_repository(namespace_name, repository_name): fetched.delete_instance(recursive=True, delete_nullable=False) -def find_repository_with_garbage(): +def find_repository_with_garbage(filter_list=None): + # TODO(jschorr): Remove the filter once we have turned the experiment on for everyone. + if filter_list is not None and not filter_list: + return None + epoch_timestamp = get_epoch_timestamp() try: @@ -72,16 +76,19 @@ def find_repository_with_garbage(): .limit(500) .alias('candidates')) + if filter_list: + candidates = candidates.where(Namespace.username << filter_list) + found = (RepositoryTag - .select(candidates.c.repository) + .select(candidates.c.repository_id) .from_(candidates) .order_by(db_random_func()) .get()) - if not found: + + if found is None: return - return Repository.get(Repository.id == found) - + return Repository.get(Repository.id == found.repository_id) except RepositoryTag.DoesNotExist: return None except Repository.DoesNotExist: @@ -89,11 +96,19 @@ def find_repository_with_garbage(): def garbage_collect_repository(namespace_name, repository_name): + # If the namespace is the async experiment, don't perform garbage collection here. + # TODO(jschorr): Remove this check once we have turned the experiment on for everyone. + if namespace_name in config.app_config.get('EXP_ASYNC_GARBAGE_COLLECTION', []): + return + repo = get_repository(namespace_name, repository_name) - garbage_collect_repo(repo) + if repo is not None: + garbage_collect_repo(repo) def garbage_collect_repo(repo): + logger.debug('Garbage collecting repository %s', repo.id) + storage_id_whitelist = {} tag.garbage_collect_tags(repo) diff --git a/test/test_gc.py b/test/test_gc.py index a9953a1a3..27f67f82a 100644 --- a/test/test_gc.py +++ b/test/test_gc.py @@ -10,7 +10,7 @@ PUBLIC_USER = 'public' REPO = 'somerepo' -class TestGarbageColection(unittest.TestCase): +class TestGarbageCollection(unittest.TestCase): @staticmethod def _set_tag_expiration_policy(namespace, expiration_s): namespace_user = model.user.get_user(namespace) @@ -80,9 +80,11 @@ class TestGarbageColection(unittest.TestCase): def gcNow(self, repository): model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name) - def deleteTag(self, repository, tag): + def deleteTag(self, repository, tag, perform_gc=True): model.tag.delete_tag(repository.namespace_user.username, repository.name, tag) - model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name) + if perform_gc: + model.repository.garbage_collect_repository(repository.namespace_user.username, + repository.name) def moveTag(self, repository, tag, docker_image_id): model.tag.create_or_update_tag(repository.namespace_user.username, repository.name, tag, @@ -105,6 +107,47 @@ class TestGarbageColection(unittest.TestCase): self.fail('Expected image %s to be deleted' % docker_image_id) + def test_has_garbage(self): + """ Remove all existing repositories, then add one without garbage, check, then add one with + garbage, and check again. + """ + # Delete all existing repos. + for repo in database.Repository.select(): + repo.delete_instance(recursive=True) + + # Change the time machine expiration on the namespace. + (database.User.update(removed_tag_expiration_s=1000000000) + .where(database.User.username == ADMIN_ACCESS_USER) + .execute()) + + # Create a repository without any garbage. + repository = self.createRepository(latest=['i1', 'i2', 'i3']) + + # Ensure that no repositories are returned by the has garbage check. + self.assertIsNone(model.repository.find_repository_with_garbage()) + + # Delete a tag. + self.deleteTag(repository, 'latest', perform_gc=False) + + # There should still not be any repositories with garbage, due to time machine. + self.assertIsNone(model.repository.find_repository_with_garbage()) + + # Change the time machine expiration on the namespace. + (database.User.update(removed_tag_expiration_s=0) + .where(database.User.username == ADMIN_ACCESS_USER) + .execute()) + + # Now we should find the repository for GC. + repository = model.repository.find_repository_with_garbage() + self.assertIsNotNone(repository) + self.assertEquals(REPO, repository.name) + + # GC the repository. + model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name) + + # There should now be no repositories with garbage. + self.assertIsNone(model.repository.find_repository_with_garbage()) + def test_one_tag(self): """ Create a repository with a single tag, then remove that tag and verify that the repository diff --git a/workers/gcworker.py b/workers/gcworker.py new file mode 100644 index 000000000..d60dd5719 --- /dev/null +++ b/workers/gcworker.py @@ -0,0 +1,30 @@ +import logging + +from apscheduler.schedulers.blocking import BlockingScheduler + +from app import app +from data.database import UseThenDisconnect +from data.model.repository import find_repository_with_garbage, garbage_collect_repo + +logger = logging.getLogger(__name__) +sched = BlockingScheduler() + +@sched.scheduled_job(trigger='interval', seconds=10) +def garbage_collect_repositories(): + """ Performs garbage collection on repositories. """ + + with UseThenDisconnect(app.config): + repository = find_repository_with_garbage(app.config.get('EXP_ASYNC_GARBAGE_COLLECTION', [])) + if repository is None: + logger.debug('No repository with garbage found') + return False + + logger.debug('Starting GC of repository #%s (%s)', repository.id, repository.name) + garbage_collect_repo(repository) + logger.debug('Finished GC of repository #%s (%s)', repository.id, repository.name) + return True + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + sched.start() From ba7686af99ab2eddfe1ff4b9dbc73760c1d604bf Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 28 Jul 2015 14:34:55 -0400 Subject: [PATCH 3/4] Switch back to the read-then-write tag deletion code We changed to this originally to avoid locks --- data/model/tag.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/data/model/tag.py b/data/model/tag.py index 9ac1ea897..199d99704 100644 --- a/data/model/tag.py +++ b/data/model/tag.py @@ -100,12 +100,17 @@ def delete_tag(namespace_name, repository_name, tag_name): def garbage_collect_tags(repo): expired_time = get_epoch_timestamp() - repo.namespace_user.removed_tag_expiration_s - (RepositoryTag - .delete() - .where(RepositoryTag.repository == repo, - ~(RepositoryTag.lifetime_end_ts >> None), - (RepositoryTag.lifetime_end_ts <= expired_time)) - .execute()) + tags_to_delete = list(RepositoryTag + .select(RepositoryTag.id) + .where(RepositoryTag.repository == repo, + ~(RepositoryTag.lifetime_end_ts >> None), + (RepositoryTag.lifetime_end_ts <= expired_time)) + .order_by(RepositoryTag.id)) + if len(tags_to_delete) > 0: + (RepositoryTag + .delete() + .where(RepositoryTag.id << tags_to_delete) + .execute()) def get_tag_image(namespace_name, repository_name, tag_name): From 0fdc8b0f1fa5857d86c09b73fc0926f4e926fc08 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 28 Jul 2015 14:45:03 -0400 Subject: [PATCH 4/4] Fix spelling of ancestors --- data/model/repository.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/data/model/repository.py b/data/model/repository.py index 937e51a52..24199c00b 100644 --- a/data/model/repository.py +++ b/data/model/repository.py @@ -119,17 +119,17 @@ def garbage_collect_repo(repo): .join(RepositoryTag) .where(Image.repository == repo)) - referenced_anscestors = set() + referenced_ancestors = set() for tagged_image in tagged_images: - # The anscestor list is in the format '/1/2/3/', extract just the ids - anscestor_id_strings = tagged_image.ancestors.split('/')[1:-1] - ancestor_list = [int(img_id_str) for img_id_str in anscestor_id_strings] - referenced_anscestors = referenced_anscestors.union(set(ancestor_list)) - referenced_anscestors.add(tagged_image.id) + # The ancestor list is in the format '/1/2/3/', extract just the ids + ancestor_id_strings = tagged_image.ancestors.split('/')[1:-1] + ancestor_list = [int(img_id_str) for img_id_str in ancestor_id_strings] + referenced_ancestors = referenced_ancestors.union(set(ancestor_list)) + referenced_ancestors.add(tagged_image.id) all_repo_images = Image.select(Image.id, Image.storage).where(Image.repository == repo) all_images = {int(img.id): img for img in all_repo_images} - to_remove = set(all_images.keys()).difference(referenced_anscestors) + to_remove = set(all_images.keys()).difference(referenced_ancestors) if len(to_remove) > 0: logger.info('Cleaning up unreferenced images: %s', to_remove)