diff --git a/conf/init/service/gcworker/log/run b/conf/init/service/gcworker/log/run new file mode 100755 index 000000000..cf6bdc1d7 --- /dev/null +++ b/conf/init/service/gcworker/log/run @@ -0,0 +1,2 @@ +#!/bin/sh +exec logger -i -t gcworker \ No newline at end of file diff --git a/conf/init/service/gcworker/run b/conf/init/service/gcworker/run new file mode 100755 index 000000000..6a843d4b8 --- /dev/null +++ b/conf/init/service/gcworker/run @@ -0,0 +1,8 @@ +#! /bin/bash + +echo 'Starting GC worker' + +cd / +venv/bin/python -m workers.gcworker 2>&1 + +echo 'Repository GC exited' \ No newline at end of file diff --git a/config.py b/config.py index e99f32aca..7a81c68cd 100644 --- a/config.py +++ b/config.py @@ -231,3 +231,6 @@ class DefaultConfig(object): '#7f7f7f', '#c7c7c7', '#bcbd22', '#1f77b4', '#17becf', '#9edae5', '#393b79', '#5254a3', '#6b6ecf', '#9c9ede', '#9ecae1', '#31a354', '#b5cf6b', '#a1d99b', '#8c6d31', '#ad494a', '#e7ba52', '#a55194'] + + # Experiment: Async garbage collection + EXP_ASYNC_GARBAGE_COLLECTION = [] diff --git a/data/model/repository.py b/data/model/repository.py index 90857f93c..24199c00b 100644 --- a/data/model/repository.py +++ b/data/model/repository.py @@ -4,10 +4,11 @@ from peewee import JOIN_LEFT_OUTER, fn from datetime import timedelta, datetime from data.model import (DataModelException, tag, db_transaction, storage, image, permission, - _basequery) + _basequery, config) from data.database import (Repository, Namespace, RepositoryTag, Star, Image, ImageStorage, User, Visibility, RepositoryPermission, TupleSelector, RepositoryActionCount, - Role, RepositoryAuthorizedEmail, db_for_update) + Role, RepositoryAuthorizedEmail, db_for_update, get_epoch_timestamp, + db_random_func) logger = logging.getLogger(__name__) @@ -57,40 +58,82 @@ def purge_repository(namespace_name, repository_name): fetched.delete_instance(recursive=True, delete_nullable=False) -def garbage_collect_repository(namespace_name, repository_name): - storage_id_whitelist = {} +def find_repository_with_garbage(filter_list=None): + # TODO(jschorr): Remove the filter once we have turned the experiment on for everyone. + if filter_list is not None and not filter_list: + return None - tag.garbage_collect_tags(namespace_name, repository_name) + epoch_timestamp = get_epoch_timestamp() + + try: + candidates = (RepositoryTag + .select(RepositoryTag.repository) + .join(Repository) + .join(Namespace, on=(Repository.namespace_user == Namespace.id)) + .where(~(RepositoryTag.lifetime_end_ts >> None), + (RepositoryTag.lifetime_end_ts <= + (epoch_timestamp - Namespace.removed_tag_expiration_s))) + .limit(500) + .alias('candidates')) + + if filter_list: + candidates = candidates.where(Namespace.username << filter_list) + + found = (RepositoryTag + .select(candidates.c.repository_id) + .from_(candidates) + .order_by(db_random_func()) + .get()) + + if found is None: + return + + return Repository.get(Repository.id == found.repository_id) + except RepositoryTag.DoesNotExist: + return None + except Repository.DoesNotExist: + return None + + +def garbage_collect_repository(namespace_name, repository_name): + # If the namespace is the async experiment, don't perform garbage collection here. + # TODO(jschorr): Remove this check once we have turned the experiment on for everyone. + if namespace_name in config.app_config.get('EXP_ASYNC_GARBAGE_COLLECTION', []): + return + + repo = get_repository(namespace_name, repository_name) + if repo is not None: + garbage_collect_repo(repo) + + +def garbage_collect_repo(repo): + logger.debug('Garbage collecting repository %s', repo.id) + + storage_id_whitelist = {} + tag.garbage_collect_tags(repo) with db_transaction(): - # TODO (jake): We could probably select this and all the images in a single query using - # a different kind of join. - # Get a list of all images used by tags in the repository - tag_query = (RepositoryTag - .select(RepositoryTag, Image, ImageStorage) - .join(Image) - .join(ImageStorage, JOIN_LEFT_OUTER) - .switch(RepositoryTag) - .join(Repository) - .join(Namespace, on=(Repository.namespace_user == Namespace.id)) - .where(Repository.name == repository_name, Namespace.username == namespace_name)) + tagged_images = (Image + .select(Image.id, Image.ancestors) + .join(RepositoryTag) + .where(Image.repository == repo)) referenced_ancestors = set() - for one_tag in tag_query: + for tagged_image in tagged_images: # The ancestor list is in the format '/1/2/3/', extract just the ids - ancestor_id_strings = one_tag.image.ancestors.split('/')[1:-1] + ancestor_id_strings = tagged_image.ancestors.split('/')[1:-1] ancestor_list = [int(img_id_str) for img_id_str in ancestor_id_strings] referenced_ancestors = referenced_ancestors.union(set(ancestor_list)) - referenced_ancestors.add(one_tag.image.id) + referenced_ancestors.add(tagged_image.id) - all_repo_images = image.get_repository_images(namespace_name, repository_name) + all_repo_images = Image.select(Image.id, Image.storage).where(Image.repository == repo) all_images = {int(img.id): img for img in all_repo_images} to_remove = set(all_images.keys()).difference(referenced_ancestors) if len(to_remove) > 0: logger.info('Cleaning up unreferenced images: %s', to_remove) - storage_id_whitelist = {all_images[to_remove_id].storage.id for to_remove_id in to_remove} + storage_id_whitelist = {all_images[to_remove_id].storage_id for to_remove_id in to_remove} Image.delete().where(Image.id << list(to_remove)).execute() if len(to_remove) > 0: diff --git a/data/model/tag.py b/data/model/tag.py index 7f4dc93e4..199d99704 100644 --- a/data/model/tag.py +++ b/data/model/tag.py @@ -97,9 +97,7 @@ def delete_tag(namespace_name, repository_name, tag_name): found.save() -def garbage_collect_tags(namespace_name, repository_name): - # We do this without using a join to prevent holding read locks on the repository table - repo = _basequery.get_existing_repository(namespace_name, repository_name) +def garbage_collect_tags(repo): expired_time = get_epoch_timestamp() - repo.namespace_user.removed_tag_expiration_s tags_to_delete = list(RepositoryTag diff --git a/test/test_gc.py b/test/test_gc.py index a9953a1a3..27f67f82a 100644 --- a/test/test_gc.py +++ b/test/test_gc.py @@ -10,7 +10,7 @@ PUBLIC_USER = 'public' REPO = 'somerepo' -class TestGarbageColection(unittest.TestCase): +class TestGarbageCollection(unittest.TestCase): @staticmethod def _set_tag_expiration_policy(namespace, expiration_s): namespace_user = model.user.get_user(namespace) @@ -80,9 +80,11 @@ class TestGarbageColection(unittest.TestCase): def gcNow(self, repository): model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name) - def deleteTag(self, repository, tag): + def deleteTag(self, repository, tag, perform_gc=True): model.tag.delete_tag(repository.namespace_user.username, repository.name, tag) - model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name) + if perform_gc: + model.repository.garbage_collect_repository(repository.namespace_user.username, + repository.name) def moveTag(self, repository, tag, docker_image_id): model.tag.create_or_update_tag(repository.namespace_user.username, repository.name, tag, @@ -105,6 +107,47 @@ class TestGarbageColection(unittest.TestCase): self.fail('Expected image %s to be deleted' % docker_image_id) + def test_has_garbage(self): + """ Remove all existing repositories, then add one without garbage, check, then add one with + garbage, and check again. + """ + # Delete all existing repos. + for repo in database.Repository.select(): + repo.delete_instance(recursive=True) + + # Change the time machine expiration on the namespace. + (database.User.update(removed_tag_expiration_s=1000000000) + .where(database.User.username == ADMIN_ACCESS_USER) + .execute()) + + # Create a repository without any garbage. + repository = self.createRepository(latest=['i1', 'i2', 'i3']) + + # Ensure that no repositories are returned by the has garbage check. + self.assertIsNone(model.repository.find_repository_with_garbage()) + + # Delete a tag. + self.deleteTag(repository, 'latest', perform_gc=False) + + # There should still not be any repositories with garbage, due to time machine. + self.assertIsNone(model.repository.find_repository_with_garbage()) + + # Change the time machine expiration on the namespace. + (database.User.update(removed_tag_expiration_s=0) + .where(database.User.username == ADMIN_ACCESS_USER) + .execute()) + + # Now we should find the repository for GC. + repository = model.repository.find_repository_with_garbage() + self.assertIsNotNone(repository) + self.assertEquals(REPO, repository.name) + + # GC the repository. + model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name) + + # There should now be no repositories with garbage. + self.assertIsNone(model.repository.find_repository_with_garbage()) + def test_one_tag(self): """ Create a repository with a single tag, then remove that tag and verify that the repository diff --git a/workers/gcworker.py b/workers/gcworker.py new file mode 100644 index 000000000..d60dd5719 --- /dev/null +++ b/workers/gcworker.py @@ -0,0 +1,30 @@ +import logging + +from apscheduler.schedulers.blocking import BlockingScheduler + +from app import app +from data.database import UseThenDisconnect +from data.model.repository import find_repository_with_garbage, garbage_collect_repo + +logger = logging.getLogger(__name__) +sched = BlockingScheduler() + +@sched.scheduled_job(trigger='interval', seconds=10) +def garbage_collect_repositories(): + """ Performs garbage collection on repositories. """ + + with UseThenDisconnect(app.config): + repository = find_repository_with_garbage(app.config.get('EXP_ASYNC_GARBAGE_COLLECTION', [])) + if repository is None: + logger.debug('No repository with garbage found') + return False + + logger.debug('Starting GC of repository #%s (%s)', repository.id, repository.name) + garbage_collect_repo(repository) + logger.debug('Finished GC of repository #%s (%s)', repository.id, repository.name) + return True + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + sched.start()