From 9e4f8cac037ba6e65eb3dc2873b6ea8652fda5a2 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 25 Jul 2016 14:02:00 -0700 Subject: [PATCH] Optimize GC query for looking up deletable storages --- data/model/storage.py | 82 +++++++++-------- test/test_gc.py | 199 ++++++++++++++++++++++++++---------------- 2 files changed, 171 insertions(+), 110 deletions(-) diff --git a/data/model/storage.py b/data/model/storage.py index 5acb8ce39..8cdcf42f9 100644 --- a/data/model/storage.py +++ b/data/model/storage.py @@ -46,6 +46,31 @@ def add_storage_placement(storage, location_name): pass +def _orphaned_storage_query(candidate_ids): + """ Returns the subset of the candidate ImageStorage IDs representing storages that are no + longer referenced by images. + """ + # Issue a union query to find all storages that are still referenced by a candidate storage. This + # is much faster than the group_by and having call we used to use here. + nonorphaned_queries = [] + for counter, candidate_id in enumerate(candidate_ids): + query_alias = 'q{0}'.format(counter) + storage_subq = (ImageStorage + .select(ImageStorage.id) + .join(Image) + .where(ImageStorage.id == candidate_id) + .limit(1) + .alias(query_alias)) + + nonorphaned_queries.append(ImageStorage + .select(SQL('*')) + .from_(storage_subq)) + + # Build the set of storages that are missing. These storages are orphaned. + nonorphaned_storage_ids = {storage.id for storage in _reduce_as_tree(nonorphaned_queries)} + return list(candidate_ids - nonorphaned_storage_ids) + + def garbage_collect_storage(storage_id_whitelist): if len(storage_id_whitelist) == 0: return @@ -55,27 +80,21 @@ def garbage_collect_storage(storage_id_whitelist): get_layer_path(placement.storage)) for placement in placements_query} - def orphaned_storage_query(select_base_query, candidates, group_by): - return (select_base_query - .switch(ImageStorage) - .join(Image, JOIN_LEFT_OUTER) - .where(ImageStorage.id << list(candidates)) - .group_by(*group_by) - .having(fn.Count(Image.id) == 0)) - # Note: Both of these deletes must occur in the same transaction (unfortunately) because a # storage without any placement is invalid, and a placement cannot exist without a storage. # TODO(jake): We might want to allow for null storages on placements, which would allow us to # delete the storages, then delete the placements in a non-transaction. logger.debug('Garbage collecting storages from candidates: %s', storage_id_whitelist) with db_transaction(): - # Track all of the data that should be removed from blob storage - placements_to_remove = list(orphaned_storage_query(ImageStoragePlacement - .select(ImageStoragePlacement, - ImageStorage) - .join(ImageStorage), - storage_id_whitelist, - (ImageStorage.id, ImageStoragePlacement.id))) + orphaned_storage_ids = _orphaned_storage_query(storage_id_whitelist) + if len(orphaned_storage_ids) == 0: + # Nothing to GC. + return + + placements_to_remove = list(ImageStoragePlacement + .select() + .join(ImageStorage) + .where(ImageStorage.id << orphaned_storage_ids)) paths_to_remove = placements_query_to_paths_set(placements_to_remove) @@ -89,28 +108,23 @@ def garbage_collect_storage(storage_id_whitelist): logger.debug('Removed %s image storage placements', placements_removed) # Remove all orphaned storages - # The comma after ImageStorage.id is VERY important, it makes it a tuple, which is a sequence - orphaned_storages = list(orphaned_storage_query(ImageStorage.select(ImageStorage.id), - storage_id_whitelist, - (ImageStorage.id,)).alias('osq')) - if len(orphaned_storages) > 0: - torrents_removed = (TorrentInfo - .delete() - .where(TorrentInfo.storage << orphaned_storages) - .execute()) - logger.debug('Removed %s torrent info records', torrents_removed) + torrents_removed = (TorrentInfo + .delete() + .where(TorrentInfo.storage << orphaned_storage_ids) + .execute()) + logger.debug('Removed %s torrent info records', torrents_removed) - signatures_removed = (ImageStorageSignature - .delete() - .where(ImageStorageSignature.storage << orphaned_storages) - .execute()) - logger.debug('Removed %s image storage signatures', signatures_removed) - - storages_removed = (ImageStorage + signatures_removed = (ImageStorageSignature .delete() - .where(ImageStorage.id << orphaned_storages) + .where(ImageStorageSignature.storage << orphaned_storage_ids) .execute()) - logger.debug('Removed %s image storage records', storages_removed) + logger.debug('Removed %s image storage signatures', signatures_removed) + + storages_removed = (ImageStorage + .delete() + .where(ImageStorage.id << orphaned_storage_ids) + .execute()) + logger.debug('Removed %s image storage records', storages_removed) # We are going to make the conscious decision to not delete image storage blobs inside # transactions. diff --git a/test/test_gc.py b/test/test_gc.py index 871b4a8a7..2c438c1bc 100644 --- a/test/test_gc.py +++ b/test/test_gc.py @@ -1,9 +1,12 @@ import unittest import time +from peewee import fn, JOIN_LEFT_OUTER + from app import app, storage from initdb import setup_database_for_testing, finished_database_for_testing from data import model, database +from data.database import Image, ImageStorage, DerivedStorageForImage from endpoints.v2.manifest import _generate_and_store_manifest @@ -12,6 +15,29 @@ PUBLIC_USER = 'public' REPO = 'somerepo' + +class assert_no_new_dangling_storages(object): + """ Specialized assertion for ensuring that GC cleans up all dangling storages. + """ + def __init__(self): + self.existing_count = 0 + + def _get_dangling_count(self): + storage_ids = set([current.id for current in ImageStorage.select()]) + referneced_by_image = set([image.storage_id for image in Image.select()]) + referenced_by_derived = set([derived.derivative_id for derived in DerivedStorageForImage.select()]) + + return len(storage_ids - referneced_by_image - referenced_by_derived) + + def __enter__(self): + self.existing_count = self._get_dangling_count() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + updated_count = self._get_dangling_count() + assert updated_count == self.existing_count + + class TestGarbageCollection(unittest.TestCase): @staticmethod def _set_tag_expiration_policy(namespace, expiration_s): @@ -122,6 +148,7 @@ class TestGarbageCollection(unittest.TestCase): self.fail('Expected image %s to be deleted' % docker_image_id) + def test_has_garbage(self): """ Remove all existing repositories, then add one without garbage, check, then add one with garbage, and check again. @@ -167,144 +194,164 @@ class TestGarbageCollection(unittest.TestCase): def test_one_tag(self): """ Create a repository with a single tag, then remove that tag and verify that the repository is now empty. """ - repository = self.createRepository(latest=['i1', 'i2', 'i3']) - self.deleteTag(repository, 'latest') - self.assertDeleted(repository, 'i1', 'i2', 'i3') + with assert_no_new_dangling_storages(): + repository = self.createRepository(latest=['i1', 'i2', 'i3']) + self.deleteTag(repository, 'latest') + self.assertDeleted(repository, 'i1', 'i2', 'i3') + def test_two_tags_unshared_images(self): """ Repository has two tags with no shared images between them. """ - repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['f1', 'f2']) - self.deleteTag(repository, 'latest') - self.assertDeleted(repository, 'i1', 'i2', 'i3') - self.assertNotDeleted(repository, 'f1', 'f2') + with assert_no_new_dangling_storages(): + repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['f1', 'f2']) + self.deleteTag(repository, 'latest') + self.assertDeleted(repository, 'i1', 'i2', 'i3') + self.assertNotDeleted(repository, 'f1', 'f2') + def test_two_tags_shared_images(self): """ Repository has two tags with shared images. Deleting the tag should only remove the unshared images. """ - repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1']) - self.deleteTag(repository, 'latest') - self.assertDeleted(repository, 'i2', 'i3') - self.assertNotDeleted(repository, 'i1', 'f1') + with assert_no_new_dangling_storages(): + repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1']) + self.deleteTag(repository, 'latest') + self.assertDeleted(repository, 'i2', 'i3') + self.assertNotDeleted(repository, 'i1', 'f1') + def test_unrelated_repositories(self): """ Two repositories with different images. Removing the tag from one leaves the other's images intact. """ - repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1') - repository2 = self.createRepository(latest=['j1', 'j2', 'j3'], name='repo2') + with assert_no_new_dangling_storages(): + repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1') + repository2 = self.createRepository(latest=['j1', 'j2', 'j3'], name='repo2') - self.deleteTag(repository1, 'latest') + self.deleteTag(repository1, 'latest') + + self.assertDeleted(repository1, 'i1', 'i2', 'i3') + self.assertNotDeleted(repository2, 'j1', 'j2', 'j3') - self.assertDeleted(repository1, 'i1', 'i2', 'i3') - self.assertNotDeleted(repository2, 'j1', 'j2', 'j3') def test_related_repositories(self): """ Two repositories with shared images. Removing the tag from one leaves the other's images intact. """ - repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1') - repository2 = self.createRepository(latest=['i1', 'i2', 'j1'], name='repo2') + with assert_no_new_dangling_storages(): + repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1') + repository2 = self.createRepository(latest=['i1', 'i2', 'j1'], name='repo2') - self.deleteTag(repository1, 'latest') + self.deleteTag(repository1, 'latest') + + self.assertDeleted(repository1, 'i3') + self.assertNotDeleted(repository2, 'i1', 'i2', 'j1') - self.assertDeleted(repository1, 'i3') - self.assertNotDeleted(repository2, 'i1', 'i2', 'j1') def test_inaccessible_repositories(self): """ Two repositories under different namespaces should result in the images being deleted but not completely removed from the database. """ - repository1 = self.createRepository(namespace=ADMIN_ACCESS_USER, latest=['i1', 'i2', 'i3']) - repository2 = self.createRepository(namespace=PUBLIC_USER, latest=['i1', 'i2', 'i3']) + with assert_no_new_dangling_storages(): + repository1 = self.createRepository(namespace=ADMIN_ACCESS_USER, latest=['i1', 'i2', 'i3']) + repository2 = self.createRepository(namespace=PUBLIC_USER, latest=['i1', 'i2', 'i3']) - self.deleteTag(repository1, 'latest') - self.assertDeleted(repository1, 'i1', 'i2', 'i3') - self.assertNotDeleted(repository2, 'i1', 'i2', 'i3') + self.deleteTag(repository1, 'latest') + self.assertDeleted(repository1, 'i1', 'i2', 'i3') + self.assertNotDeleted(repository2, 'i1', 'i2', 'i3') def test_multiple_shared_images(self): """ Repository has multiple tags with shared images. Selectively deleting the tags, and verifying at each step. """ - repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'], - third=['t1', 't2', 't3'], fourth=['i1', 'f1']) + with assert_no_new_dangling_storages(): + repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'], + third=['t1', 't2', 't3'], fourth=['i1', 'f1']) - # Delete tag other. Should delete f2, since it is not shared. - self.deleteTag(repository, 'other') - self.assertDeleted(repository, 'f2') - self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3', 'f1') + # Delete tag other. Should delete f2, since it is not shared. + self.deleteTag(repository, 'other') + self.assertDeleted(repository, 'f2') + self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3', 'f1') - # Move tag fourth to i3. This should remove f1 since it is no longer referenced. - self.moveTag(repository, 'fourth', 'i3') - self.assertDeleted(repository, 'f1') - self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3') + # Move tag fourth to i3. This should remove f1 since it is no longer referenced. + self.moveTag(repository, 'fourth', 'i3') + self.assertDeleted(repository, 'f1') + self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3') - # Delete tag 'latest'. This should do nothing since fourth is on the same branch. - self.deleteTag(repository, 'latest') - self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3') + # Delete tag 'latest'. This should do nothing since fourth is on the same branch. + self.deleteTag(repository, 'latest') + self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3') - # Delete tag 'third'. This should remove t1->t3. - self.deleteTag(repository, 'third') - self.assertDeleted(repository, 't1', 't2', 't3') - self.assertNotDeleted(repository, 'i1', 'i2', 'i3') + # Delete tag 'third'. This should remove t1->t3. + self.deleteTag(repository, 'third') + self.assertDeleted(repository, 't1', 't2', 't3') + self.assertNotDeleted(repository, 'i1', 'i2', 'i3') - # Add tag to i1. - self.moveTag(repository, 'newtag', 'i1') - self.assertNotDeleted(repository, 'i1', 'i2', 'i3') + # Add tag to i1. + self.moveTag(repository, 'newtag', 'i1') + self.assertNotDeleted(repository, 'i1', 'i2', 'i3') - # Delete tag 'fourth'. This should remove i2 and i3. - self.deleteTag(repository, 'fourth') - self.assertDeleted(repository, 'i2', 'i3') - self.assertNotDeleted(repository, 'i1') + # Delete tag 'fourth'. This should remove i2 and i3. + self.deleteTag(repository, 'fourth') + self.assertDeleted(repository, 'i2', 'i3') + self.assertNotDeleted(repository, 'i1') + + # Delete tag 'newtag'. This should remove the remaining image. + self.deleteTag(repository, 'newtag') + self.assertDeleted(repository, 'i1') - # Delete tag 'newtag'. This should remove the remaining image. - self.deleteTag(repository, 'newtag') - self.assertDeleted(repository, 'i1') def test_empty_gc(self): - repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'], - third=['t1', 't2', 't3'], fourth=['i1', 'f1']) + with assert_no_new_dangling_storages(): + repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'], + third=['t1', 't2', 't3'], fourth=['i1', 'f1']) + + self.gcNow(repository) + self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3', 'f1', 'f2') - self.gcNow(repository) - self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3', 'f1', 'f2') def test_time_machine_no_gc(self): """ Repository has two tags with shared images. Deleting the tag should not remove any images """ - repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1']) - self._set_tag_expiration_policy(repository.namespace_user.username, 60*60*24) + with assert_no_new_dangling_storages(): + repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1']) + self._set_tag_expiration_policy(repository.namespace_user.username, 60*60*24) + + self.deleteTag(repository, 'latest') + self.assertNotDeleted(repository, 'i2', 'i3') + self.assertNotDeleted(repository, 'i1', 'f1') - self.deleteTag(repository, 'latest') - self.assertNotDeleted(repository, 'i2', 'i3') - self.assertNotDeleted(repository, 'i1', 'f1') def test_time_machine_gc(self): """ Repository has two tags with shared images. Deleting the second tag should cause the images for the first deleted tag to gc. """ - repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1']) + with assert_no_new_dangling_storages(): + repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1']) - self._set_tag_expiration_policy(repository.namespace_user.username, 1) + self._set_tag_expiration_policy(repository.namespace_user.username, 1) - self.deleteTag(repository, 'latest') - self.assertNotDeleted(repository, 'i2', 'i3') - self.assertNotDeleted(repository, 'i1', 'f1') + self.deleteTag(repository, 'latest') + self.assertNotDeleted(repository, 'i2', 'i3') + self.assertNotDeleted(repository, 'i1', 'f1') - time.sleep(2) + time.sleep(2) + + self.deleteTag(repository, 'other') # This will cause the images associated with latest to gc + self.assertDeleted(repository, 'i2', 'i3') + self.assertNotDeleted(repository, 'i1', 'f1') - self.deleteTag(repository, 'other') # This will cause the images associated with latest to gc - self.assertDeleted(repository, 'i2', 'i3') - self.assertNotDeleted(repository, 'i1', 'f1') def test_manifest_gc(self): - repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1']) - _generate_and_store_manifest(ADMIN_ACCESS_USER, REPO, 'latest') + with assert_no_new_dangling_storages(): + repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1']) + _generate_and_store_manifest(ADMIN_ACCESS_USER, REPO, 'latest') - self._set_tag_expiration_policy(repository.namespace_user.username, 0) + self._set_tag_expiration_policy(repository.namespace_user.username, 0) - self.deleteTag(repository, 'latest') - self.assertDeleted(repository, 'i2', 'i3') + self.deleteTag(repository, 'latest') + self.assertDeleted(repository, 'i2', 'i3') if __name__ == '__main__':