From 584a5a7ddd10e75064584fd4b7d00f0735479de6 Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Fri, 26 Aug 2016 14:48:39 -0400 Subject: [PATCH] Reduce database bandwidth by tracking gc candidate images. --- data/database.py | 29 ++++++++++ data/model/repository.py | 115 ++++++++++++++++++--------------------- data/model/tag.py | 112 ++++++++++++++++++++++++-------------- test/test_gc.py | 9 ++- test/test_manifests.py | 3 +- 5 files changed, 161 insertions(+), 107 deletions(-) diff --git a/data/database.py b/data/database.py index 8e34d21ea..935fad2a6 100644 --- a/data/database.py +++ b/data/database.py @@ -34,6 +34,7 @@ _SCHEME_DRIVERS = { 'postgresql+psycopg2': PostgresqlDatabase, } + SCHEME_RANDOM_FUNCTION = { 'mysql': fn.Rand, 'mysql+pymysql': fn.Rand, @@ -42,12 +43,37 @@ SCHEME_RANDOM_FUNCTION = { 'postgresql+psycopg2': fn.Random, } + +def pipes_concat(arg1, arg2, *extra_args): + """ Concat function for sqlite, since it doesn't support fn.Concat. + Concatenates clauses with || characters. + """ + reduced = arg1.concat(arg2) + for arg in extra_args: + reduced = reduced.concat(arg) + return reduced + + +def function_concat(arg1, arg2, *extra_args): + """ Default implementation of concat which uses fn.Concat(). Used by all + database engines except sqlite. + """ + return fn.Concat(arg1, arg2, *extra_args) + + +SCHEME_SPECIALIZED_CONCAT = { + 'sqlite': pipes_concat, +} + + def real_for_update(query): return query.for_update() + def null_for_update(query): return query + def delete_instance_filtered(instance, model_class, delete_nullable, skip_transitive_deletes): """ Deletes the DB instance recursively, skipping any models in the skip_transitive_deletes set. @@ -181,6 +207,7 @@ read_slave = Proxy() db_random_func = CallableProxy() db_for_update = CallableProxy() db_transaction = CallableProxy() +db_concat_func = CallableProxy() def validate_database_url(url, db_kwargs, connect_timeout=5): @@ -227,6 +254,8 @@ def configure(config_object): db_random_func.initialize(SCHEME_RANDOM_FUNCTION[parsed_write_uri.drivername]) db_for_update.initialize(SCHEME_SPECIALIZED_FOR_UPDATE.get(parsed_write_uri.drivername, real_for_update)) + db_concat_func.initialize(SCHEME_SPECIALIZED_CONCAT.get(parsed_write_uri.drivername, + function_concat)) read_slave_uri = config_object.get('DB_READ_SLAVE_URI', None) if read_slave_uri is not None: diff --git a/data/model/repository.py b/data/model/repository.py index e0bf7234b..75b4b65c7 100644 --- a/data/model/repository.py +++ b/data/model/repository.py @@ -11,7 +11,7 @@ from data.database import (Repository, Namespace, RepositoryTag, Star, Image, Us Visibility, RepositoryPermission, RepositoryActionCount, Role, RepositoryAuthorizedEmail, TagManifest, DerivedStorageForImage, Label, TagManifestLabel, db_for_update, get_epoch_timestamp, - db_random_func) + db_random_func, db_concat_func) logger = logging.getLogger(__name__) @@ -43,45 +43,21 @@ def get_repository(namespace_name, repository_name): return None -def _purge_all_repository_tags(namespace_name, repository_name): - """ Immediately purge all repository tags without respecting the lifeline procedure """ - try: - repo = _basequery.get_existing_repository(namespace_name, repository_name) - except Repository.DoesNotExist: - raise DataModelException('Invalid repository \'%s/%s\'' % - (namespace_name, repository_name)) - - # Finds all the tags to delete. - repo_tags = list(RepositoryTag.select().where(RepositoryTag.repository == repo.id)) - if not repo_tags: - return - - # Find all labels to delete. - manifest_labels_query = (TagManifestLabel - .select() - .where(TagManifestLabel.repository == repo)) - - label_ids = [manifest_label.label_id for manifest_label in manifest_labels_query] - if label_ids: - # Delete all the mapping entries. - TagManifestLabel.delete().where(TagManifestLabel.repository == repo).execute() - - # Delete all the matching labels. - Label.delete().where(Label.id << label_ids).execute() - - # Delete all the manifests. - TagManifest.delete().where(TagManifest.tag << repo_tags).execute() - - # Delete all tags. - RepositoryTag.delete().where(RepositoryTag.repository == repo.id).execute() - - def purge_repository(namespace_name, repository_name): + repo = _basequery.get_existing_repository(namespace_name, repository_name) + # Delete all tags to allow gc to reclaim storage - _purge_all_repository_tags(namespace_name, repository_name) + previously_referenced = tag.purge_all_tags(repo) + unreferenced_image_q = Image.select(Image.id).where(Image.repository == repo) + + if len(previously_referenced) > 0: + unreferenced_image_q = (unreferenced_image_q + .where(~(Image.id << list(previously_referenced)))) + + unreferenced_candidates = set(img[0] for img in unreferenced_image_q.tuples()) # Gc to remove the images and storage - garbage_collect_repository(namespace_name, repository_name) + garbage_collect_repo(repo, previously_referenced | unreferenced_candidates) # Delete the rest of the repository metadata fetched = _basequery.get_existing_repository(namespace_name, repository_name) @@ -135,34 +111,46 @@ def find_repository_with_garbage(limit_to_gc_policy_s): return None -def garbage_collect_repository(namespace_name, repository_name): - repo = get_repository(namespace_name, repository_name) - if repo is not None: - garbage_collect_repo(repo) - - -def garbage_collect_repo(repo): +def garbage_collect_repo(repo, extra_candidate_set=None): logger.debug('Garbage collecting repository %s', repo.id) storage_id_whitelist = set() - tag.garbage_collect_tags(repo) + candidate_orphan_image_set = tag.garbage_collect_tags(repo) + + if extra_candidate_set: + candidate_orphan_image_set.update(extra_candidate_set) + + if not len(candidate_orphan_image_set): + logger.debug('No candidate images for GC for repo: %s', repo.id) + return + + candidates_orphans = list(candidate_orphan_image_set) with db_transaction(): - # Get a list of all images used by tags in the repository - tagged_images = (Image - .select(Image.id, Image.ancestors) - .join(RepositoryTag) - .where(Image.repository == repo)) + Candidate = Image.alias() + Tagged = Image.alias() + ancestor_superset = Tagged.ancestors ** db_concat_func(Candidate.ancestors, Candidate.id, '/%') - def gen_referenced_ancestors(): - for tagged_image in tagged_images: - # The ancestor list is in the format '/1/2/3/', extract just the ids - ancestor_id_strings = tagged_image.ancestor_list() - for img_id_str in ancestor_id_strings: - yield int(img_id_str) - yield tagged_image.id + # We are going to compute all images which are being referenced in two ways: + # First, we will find all images which have their ancestor paths appear in + # another image. Secondly, we union in all of the candidate images which are + # directly referenced by a tag. This can be used in a subquery to directly + # find which candidates are being referenced without any client side + # computation or extra round trips. + ancestor_referenced = (Candidate + .select(Candidate.id) + .join(Tagged, on=ancestor_superset) + .join(RepositoryTag, on=(Tagged.id == RepositoryTag.image)) + .where(RepositoryTag.repository == repo.id, + Candidate.id << candidates_orphans)) - referenced_ancestors = set(gen_referenced_ancestors()) + direct_referenced = (Candidate + .select(Candidate.id) + .join(RepositoryTag) + .where(RepositoryTag.repository == repo.id, + Candidate.id << candidates_orphans)) + + referenced_candidates = (direct_referenced | ancestor_referenced) # We desire two pieces of information from the database from the following # query: all of the image ids which are associated with this repository, @@ -171,13 +159,18 @@ def garbage_collect_repo(repo): # code, which is overkill for just two fields, we use a tuple query, and # feed that directly to the dictionary tuple constructor which takes an # iterable of tuples containing [(k, v), (k, v), ...] - all_repo_images = Image.select(Image.id, Image.storage).where(Image.repository == repo).tuples() - images_to_storages = dict(all_repo_images) - to_remove = list(set(images_to_storages.keys()).difference(referenced_ancestors)) + unreferenced_candidates = (Image + .select(Image.id, Image.storage) + .where(Image.id << candidates_orphans, + ~(Image.id << referenced_candidates)) + .tuples()) + + unreferecend_images_to_storages = dict(unreferenced_candidates) + to_remove = unreferecend_images_to_storages.keys() if len(to_remove) > 0: logger.info('Cleaning up unreferenced images: %s', to_remove) - storage_id_whitelist = {images_to_storages[to_remove_id] for to_remove_id in to_remove} + storage_id_whitelist = set(unreferecend_images_to_storages.values()) # Lookup any derived images for the images to remove. derived = DerivedStorageForImage.select().where( diff --git a/data/model/tag.py b/data/model/tag.py index 125339697..03fa30ec3 100644 --- a/data/model/tag.py +++ b/data/model/tag.py @@ -138,54 +138,86 @@ def delete_tag(namespace_name, repository_name, tag_name): def garbage_collect_tags(repo): - expired_time = get_epoch_timestamp() - repo.namespace_user.removed_tag_expiration_s + """ Remove all of the tags that have gone past their garbage collection + expiration window, and return a set of image ids which *may* have been + orphaned. + """ + def add_expiration_data(base_query): + expired_clause = get_epoch_timestamp() - Namespace.removed_tag_expiration_s + return (base_query + .switch(RepositoryTag) + .join(Repository) + .join(Namespace, on=(Repository.namespace_user == Namespace.id)) + .where(~(RepositoryTag.lifetime_end_ts >> None), + RepositoryTag.lifetime_end_ts <= expired_clause)) + return _delete_tags(repo, add_expiration_data) - tags_to_delete = list(RepositoryTag - .select(RepositoryTag.id) - .where(RepositoryTag.repository == repo, - ~(RepositoryTag.lifetime_end_ts >> None), - (RepositoryTag.lifetime_end_ts <= expired_time)) - .order_by(RepositoryTag.id)) +def purge_all_tags(repo): + """ Remove all tags from the repository, and return a set of all of the images + ids which are now orphaned. + """ + return _delete_tags(repo) - if len(tags_to_delete) > 0: - with db_transaction(): - manifests_to_delete = list(TagManifest - .select(TagManifest.id) - .join(RepositoryTag) - .where(RepositoryTag.id << tags_to_delete)) +def _delete_tags(repo, query_modifier=None): + """ Garbage collect the tags for a repository and return a set of the image + ids which may now be orphaned. + """ + tags_to_delete_q = (RepositoryTag + .select(RepositoryTag.id, Image.ancestors, Image.id) + .join(Image) + .where(RepositoryTag.repository == repo)) - num_deleted_manifests = 0 - if len(manifests_to_delete) > 0: - # Find the set of IDs for all the labels to delete. - manifest_labels_query = (TagManifestLabel - .select() - .where(TagManifestLabel.repository == repo, - TagManifestLabel.annotated << manifests_to_delete)) + if query_modifier is not None: + tags_to_delete_q = query_modifier(tags_to_delete_q) - label_ids = [manifest_label.label_id for manifest_label in manifest_labels_query] - if label_ids: - # Delete all the mapping entries. - (TagManifestLabel - .delete() - .where(TagManifestLabel.repository == repo, - TagManifestLabel.annotated << manifests_to_delete) - .execute()) + tags_to_delete = list(tags_to_delete_q) - # Delete all the matching labels. - Label.delete().where(Label.id << label_ids).execute() + if len(tags_to_delete) == 0: + return set() - # Delete the tag manifests themselves. - num_deleted_manifests = (TagManifest - .delete() - .where(TagManifest.id << manifests_to_delete) - .execute()) + with db_transaction(): + manifests_to_delete = list(TagManifest + .select(TagManifest.id) + .join(RepositoryTag) + .where(RepositoryTag.id << tags_to_delete)) - num_deleted_tags = (RepositoryTag - .delete() - .where(RepositoryTag.id << tags_to_delete) - .execute()) + num_deleted_manifests = 0 + if len(manifests_to_delete) > 0: + # Find the set of IDs for all the labels to delete. + manifest_labels_query = (TagManifestLabel + .select() + .where(TagManifestLabel.repository == repo, + TagManifestLabel.annotated << manifests_to_delete)) - logger.debug('Removed %s tags with %s manifests', num_deleted_tags, num_deleted_manifests) + label_ids = [manifest_label.label_id for manifest_label in manifest_labels_query] + if label_ids: + # Delete all the mapping entries. + (TagManifestLabel + .delete() + .where(TagManifestLabel.repository == repo, + TagManifestLabel.annotated << manifests_to_delete) + .execute()) + + # Delete all the matching labels. + Label.delete().where(Label.id << label_ids).execute() + + + num_deleted_manifests = (TagManifest + .delete() + .where(TagManifest.id << manifests_to_delete) + .execute()) + + num_deleted_tags = (RepositoryTag + .delete() + .where(RepositoryTag.id << tags_to_delete) + .execute()) + + logger.debug('Removed %s tags with %s manifests', num_deleted_tags, num_deleted_manifests) + + ancestors = reduce(lambda r, l: r | l, + (set(tag.image.ancestor_id_list()) for tag in tags_to_delete)) + direct_referenced = {tag.image.id for tag in tags_to_delete} + return ancestors | direct_referenced def _get_repo_tag_image(tag_name, include_storage, modifier): diff --git a/test/test_gc.py b/test/test_gc.py index 8be27da4b..b313f3024 100644 --- a/test/test_gc.py +++ b/test/test_gc.py @@ -144,18 +144,17 @@ class TestGarbageCollection(unittest.TestCase): return repo def gcNow(self, repository): - model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name) + model.repository.garbage_collect_repo(repository) def deleteTag(self, repository, tag, perform_gc=True): model.tag.delete_tag(repository.namespace_user.username, repository.name, tag) if perform_gc: - model.repository.garbage_collect_repository(repository.namespace_user.username, - repository.name) + model.repository.garbage_collect_repo(repository) def moveTag(self, repository, tag, docker_image_id): model.tag.create_or_update_tag(repository.namespace_user.username, repository.name, tag, docker_image_id) - model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name) + model.repository.garbage_collect_repo(repository) def assertNotDeleted(self, repository, *args): for docker_image_id in args: @@ -212,7 +211,7 @@ class TestGarbageCollection(unittest.TestCase): self.assertEquals(REPO, repository.name) # GC the repository. - model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name) + model.repository.garbage_collect_repo(repository) # There should now be no repositories with garbage. self.assertIsNone(model.repository.find_repository_with_garbage(0)) diff --git a/test/test_manifests.py b/test/test_manifests.py index d8a4574f9..03f2ff539 100644 --- a/test/test_manifests.py +++ b/test/test_manifests.py @@ -36,7 +36,8 @@ class TestManifests(unittest.TestCase): def _perform_cleanup(self): database.RepositoryTag.delete().where(database.RepositoryTag.hidden == True).execute() - model.repository.garbage_collect_repository(ADMIN_ACCESS_USER, REPO) + repo_object = model.repository.get_repository(ADMIN_ACCESS_USER, REPO) + model.repository.garbage_collect_repo(repo_object) def test_missing_link(self): """ Tests for a corner case that could result in missing a link to a blob referenced by a