Optimize GC query for looking up deletable storages
This commit is contained in:
parent
640012103c
commit
9e4f8cac03
2 changed files with 171 additions and 110 deletions
|
@ -46,6 +46,31 @@ def add_storage_placement(storage, location_name):
|
|||
pass
|
||||
|
||||
|
||||
def _orphaned_storage_query(candidate_ids):
|
||||
""" Returns the subset of the candidate ImageStorage IDs representing storages that are no
|
||||
longer referenced by images.
|
||||
"""
|
||||
# Issue a union query to find all storages that are still referenced by a candidate storage. This
|
||||
# is much faster than the group_by and having call we used to use here.
|
||||
nonorphaned_queries = []
|
||||
for counter, candidate_id in enumerate(candidate_ids):
|
||||
query_alias = 'q{0}'.format(counter)
|
||||
storage_subq = (ImageStorage
|
||||
.select(ImageStorage.id)
|
||||
.join(Image)
|
||||
.where(ImageStorage.id == candidate_id)
|
||||
.limit(1)
|
||||
.alias(query_alias))
|
||||
|
||||
nonorphaned_queries.append(ImageStorage
|
||||
.select(SQL('*'))
|
||||
.from_(storage_subq))
|
||||
|
||||
# Build the set of storages that are missing. These storages are orphaned.
|
||||
nonorphaned_storage_ids = {storage.id for storage in _reduce_as_tree(nonorphaned_queries)}
|
||||
return list(candidate_ids - nonorphaned_storage_ids)
|
||||
|
||||
|
||||
def garbage_collect_storage(storage_id_whitelist):
|
||||
if len(storage_id_whitelist) == 0:
|
||||
return
|
||||
|
@ -55,27 +80,21 @@ def garbage_collect_storage(storage_id_whitelist):
|
|||
get_layer_path(placement.storage))
|
||||
for placement in placements_query}
|
||||
|
||||
def orphaned_storage_query(select_base_query, candidates, group_by):
|
||||
return (select_base_query
|
||||
.switch(ImageStorage)
|
||||
.join(Image, JOIN_LEFT_OUTER)
|
||||
.where(ImageStorage.id << list(candidates))
|
||||
.group_by(*group_by)
|
||||
.having(fn.Count(Image.id) == 0))
|
||||
|
||||
# Note: Both of these deletes must occur in the same transaction (unfortunately) because a
|
||||
# storage without any placement is invalid, and a placement cannot exist without a storage.
|
||||
# TODO(jake): We might want to allow for null storages on placements, which would allow us to
|
||||
# delete the storages, then delete the placements in a non-transaction.
|
||||
logger.debug('Garbage collecting storages from candidates: %s', storage_id_whitelist)
|
||||
with db_transaction():
|
||||
# Track all of the data that should be removed from blob storage
|
||||
placements_to_remove = list(orphaned_storage_query(ImageStoragePlacement
|
||||
.select(ImageStoragePlacement,
|
||||
ImageStorage)
|
||||
.join(ImageStorage),
|
||||
storage_id_whitelist,
|
||||
(ImageStorage.id, ImageStoragePlacement.id)))
|
||||
orphaned_storage_ids = _orphaned_storage_query(storage_id_whitelist)
|
||||
if len(orphaned_storage_ids) == 0:
|
||||
# Nothing to GC.
|
||||
return
|
||||
|
||||
placements_to_remove = list(ImageStoragePlacement
|
||||
.select()
|
||||
.join(ImageStorage)
|
||||
.where(ImageStorage.id << orphaned_storage_ids))
|
||||
|
||||
paths_to_remove = placements_query_to_paths_set(placements_to_remove)
|
||||
|
||||
|
@ -89,28 +108,23 @@ def garbage_collect_storage(storage_id_whitelist):
|
|||
logger.debug('Removed %s image storage placements', placements_removed)
|
||||
|
||||
# Remove all orphaned storages
|
||||
# The comma after ImageStorage.id is VERY important, it makes it a tuple, which is a sequence
|
||||
orphaned_storages = list(orphaned_storage_query(ImageStorage.select(ImageStorage.id),
|
||||
storage_id_whitelist,
|
||||
(ImageStorage.id,)).alias('osq'))
|
||||
if len(orphaned_storages) > 0:
|
||||
torrents_removed = (TorrentInfo
|
||||
.delete()
|
||||
.where(TorrentInfo.storage << orphaned_storages)
|
||||
.execute())
|
||||
logger.debug('Removed %s torrent info records', torrents_removed)
|
||||
torrents_removed = (TorrentInfo
|
||||
.delete()
|
||||
.where(TorrentInfo.storage << orphaned_storage_ids)
|
||||
.execute())
|
||||
logger.debug('Removed %s torrent info records', torrents_removed)
|
||||
|
||||
signatures_removed = (ImageStorageSignature
|
||||
.delete()
|
||||
.where(ImageStorageSignature.storage << orphaned_storages)
|
||||
.execute())
|
||||
logger.debug('Removed %s image storage signatures', signatures_removed)
|
||||
|
||||
storages_removed = (ImageStorage
|
||||
signatures_removed = (ImageStorageSignature
|
||||
.delete()
|
||||
.where(ImageStorage.id << orphaned_storages)
|
||||
.where(ImageStorageSignature.storage << orphaned_storage_ids)
|
||||
.execute())
|
||||
logger.debug('Removed %s image storage records', storages_removed)
|
||||
logger.debug('Removed %s image storage signatures', signatures_removed)
|
||||
|
||||
storages_removed = (ImageStorage
|
||||
.delete()
|
||||
.where(ImageStorage.id << orphaned_storage_ids)
|
||||
.execute())
|
||||
logger.debug('Removed %s image storage records', storages_removed)
|
||||
|
||||
# We are going to make the conscious decision to not delete image storage blobs inside
|
||||
# transactions.
|
||||
|
|
199
test/test_gc.py
199
test/test_gc.py
|
@ -1,9 +1,12 @@
|
|||
import unittest
|
||||
import time
|
||||
|
||||
from peewee import fn, JOIN_LEFT_OUTER
|
||||
|
||||
from app import app, storage
|
||||
from initdb import setup_database_for_testing, finished_database_for_testing
|
||||
from data import model, database
|
||||
from data.database import Image, ImageStorage, DerivedStorageForImage
|
||||
from endpoints.v2.manifest import _generate_and_store_manifest
|
||||
|
||||
|
||||
|
@ -12,6 +15,29 @@ PUBLIC_USER = 'public'
|
|||
|
||||
REPO = 'somerepo'
|
||||
|
||||
|
||||
class assert_no_new_dangling_storages(object):
|
||||
""" Specialized assertion for ensuring that GC cleans up all dangling storages.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.existing_count = 0
|
||||
|
||||
def _get_dangling_count(self):
|
||||
storage_ids = set([current.id for current in ImageStorage.select()])
|
||||
referneced_by_image = set([image.storage_id for image in Image.select()])
|
||||
referenced_by_derived = set([derived.derivative_id for derived in DerivedStorageForImage.select()])
|
||||
|
||||
return len(storage_ids - referneced_by_image - referenced_by_derived)
|
||||
|
||||
def __enter__(self):
|
||||
self.existing_count = self._get_dangling_count()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
updated_count = self._get_dangling_count()
|
||||
assert updated_count == self.existing_count
|
||||
|
||||
|
||||
class TestGarbageCollection(unittest.TestCase):
|
||||
@staticmethod
|
||||
def _set_tag_expiration_policy(namespace, expiration_s):
|
||||
|
@ -122,6 +148,7 @@ class TestGarbageCollection(unittest.TestCase):
|
|||
|
||||
self.fail('Expected image %s to be deleted' % docker_image_id)
|
||||
|
||||
|
||||
def test_has_garbage(self):
|
||||
""" Remove all existing repositories, then add one without garbage, check, then add one with
|
||||
garbage, and check again.
|
||||
|
@ -167,144 +194,164 @@ class TestGarbageCollection(unittest.TestCase):
|
|||
def test_one_tag(self):
|
||||
""" Create a repository with a single tag, then remove that tag and verify that the repository
|
||||
is now empty. """
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'])
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertDeleted(repository, 'i1', 'i2', 'i3')
|
||||
with assert_no_new_dangling_storages():
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'])
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertDeleted(repository, 'i1', 'i2', 'i3')
|
||||
|
||||
|
||||
def test_two_tags_unshared_images(self):
|
||||
""" Repository has two tags with no shared images between them. """
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['f1', 'f2'])
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertDeleted(repository, 'i1', 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'f1', 'f2')
|
||||
with assert_no_new_dangling_storages():
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['f1', 'f2'])
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertDeleted(repository, 'i1', 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'f1', 'f2')
|
||||
|
||||
|
||||
def test_two_tags_shared_images(self):
|
||||
""" Repository has two tags with shared images. Deleting the tag should only remove the
|
||||
unshared images.
|
||||
"""
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertDeleted(repository, 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'i1', 'f1')
|
||||
with assert_no_new_dangling_storages():
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertDeleted(repository, 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'i1', 'f1')
|
||||
|
||||
|
||||
def test_unrelated_repositories(self):
|
||||
""" Two repositories with different images. Removing the tag from one leaves the other's
|
||||
images intact.
|
||||
"""
|
||||
repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1')
|
||||
repository2 = self.createRepository(latest=['j1', 'j2', 'j3'], name='repo2')
|
||||
with assert_no_new_dangling_storages():
|
||||
repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1')
|
||||
repository2 = self.createRepository(latest=['j1', 'j2', 'j3'], name='repo2')
|
||||
|
||||
self.deleteTag(repository1, 'latest')
|
||||
self.deleteTag(repository1, 'latest')
|
||||
|
||||
self.assertDeleted(repository1, 'i1', 'i2', 'i3')
|
||||
self.assertNotDeleted(repository2, 'j1', 'j2', 'j3')
|
||||
|
||||
self.assertDeleted(repository1, 'i1', 'i2', 'i3')
|
||||
self.assertNotDeleted(repository2, 'j1', 'j2', 'j3')
|
||||
|
||||
def test_related_repositories(self):
|
||||
""" Two repositories with shared images. Removing the tag from one leaves the other's
|
||||
images intact.
|
||||
"""
|
||||
repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1')
|
||||
repository2 = self.createRepository(latest=['i1', 'i2', 'j1'], name='repo2')
|
||||
with assert_no_new_dangling_storages():
|
||||
repository1 = self.createRepository(latest=['i1', 'i2', 'i3'], name='repo1')
|
||||
repository2 = self.createRepository(latest=['i1', 'i2', 'j1'], name='repo2')
|
||||
|
||||
self.deleteTag(repository1, 'latest')
|
||||
self.deleteTag(repository1, 'latest')
|
||||
|
||||
self.assertDeleted(repository1, 'i3')
|
||||
self.assertNotDeleted(repository2, 'i1', 'i2', 'j1')
|
||||
|
||||
self.assertDeleted(repository1, 'i3')
|
||||
self.assertNotDeleted(repository2, 'i1', 'i2', 'j1')
|
||||
|
||||
def test_inaccessible_repositories(self):
|
||||
""" Two repositories under different namespaces should result in the images being deleted
|
||||
but not completely removed from the database.
|
||||
"""
|
||||
repository1 = self.createRepository(namespace=ADMIN_ACCESS_USER, latest=['i1', 'i2', 'i3'])
|
||||
repository2 = self.createRepository(namespace=PUBLIC_USER, latest=['i1', 'i2', 'i3'])
|
||||
with assert_no_new_dangling_storages():
|
||||
repository1 = self.createRepository(namespace=ADMIN_ACCESS_USER, latest=['i1', 'i2', 'i3'])
|
||||
repository2 = self.createRepository(namespace=PUBLIC_USER, latest=['i1', 'i2', 'i3'])
|
||||
|
||||
self.deleteTag(repository1, 'latest')
|
||||
self.assertDeleted(repository1, 'i1', 'i2', 'i3')
|
||||
self.assertNotDeleted(repository2, 'i1', 'i2', 'i3')
|
||||
self.deleteTag(repository1, 'latest')
|
||||
self.assertDeleted(repository1, 'i1', 'i2', 'i3')
|
||||
self.assertNotDeleted(repository2, 'i1', 'i2', 'i3')
|
||||
|
||||
|
||||
def test_multiple_shared_images(self):
|
||||
""" Repository has multiple tags with shared images. Selectively deleting the tags, and
|
||||
verifying at each step.
|
||||
"""
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'],
|
||||
third=['t1', 't2', 't3'], fourth=['i1', 'f1'])
|
||||
with assert_no_new_dangling_storages():
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'],
|
||||
third=['t1', 't2', 't3'], fourth=['i1', 'f1'])
|
||||
|
||||
# Delete tag other. Should delete f2, since it is not shared.
|
||||
self.deleteTag(repository, 'other')
|
||||
self.assertDeleted(repository, 'f2')
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3', 'f1')
|
||||
# Delete tag other. Should delete f2, since it is not shared.
|
||||
self.deleteTag(repository, 'other')
|
||||
self.assertDeleted(repository, 'f2')
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3', 'f1')
|
||||
|
||||
# Move tag fourth to i3. This should remove f1 since it is no longer referenced.
|
||||
self.moveTag(repository, 'fourth', 'i3')
|
||||
self.assertDeleted(repository, 'f1')
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3')
|
||||
# Move tag fourth to i3. This should remove f1 since it is no longer referenced.
|
||||
self.moveTag(repository, 'fourth', 'i3')
|
||||
self.assertDeleted(repository, 'f1')
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3')
|
||||
|
||||
# Delete tag 'latest'. This should do nothing since fourth is on the same branch.
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3')
|
||||
# Delete tag 'latest'. This should do nothing since fourth is on the same branch.
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3')
|
||||
|
||||
# Delete tag 'third'. This should remove t1->t3.
|
||||
self.deleteTag(repository, 'third')
|
||||
self.assertDeleted(repository, 't1', 't2', 't3')
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3')
|
||||
# Delete tag 'third'. This should remove t1->t3.
|
||||
self.deleteTag(repository, 'third')
|
||||
self.assertDeleted(repository, 't1', 't2', 't3')
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3')
|
||||
|
||||
# Add tag to i1.
|
||||
self.moveTag(repository, 'newtag', 'i1')
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3')
|
||||
# Add tag to i1.
|
||||
self.moveTag(repository, 'newtag', 'i1')
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3')
|
||||
|
||||
# Delete tag 'fourth'. This should remove i2 and i3.
|
||||
self.deleteTag(repository, 'fourth')
|
||||
self.assertDeleted(repository, 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'i1')
|
||||
# Delete tag 'fourth'. This should remove i2 and i3.
|
||||
self.deleteTag(repository, 'fourth')
|
||||
self.assertDeleted(repository, 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'i1')
|
||||
|
||||
# Delete tag 'newtag'. This should remove the remaining image.
|
||||
self.deleteTag(repository, 'newtag')
|
||||
self.assertDeleted(repository, 'i1')
|
||||
|
||||
# Delete tag 'newtag'. This should remove the remaining image.
|
||||
self.deleteTag(repository, 'newtag')
|
||||
self.assertDeleted(repository, 'i1')
|
||||
|
||||
def test_empty_gc(self):
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'],
|
||||
third=['t1', 't2', 't3'], fourth=['i1', 'f1'])
|
||||
with assert_no_new_dangling_storages():
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1', 'f2'],
|
||||
third=['t1', 't2', 't3'], fourth=['i1', 'f1'])
|
||||
|
||||
self.gcNow(repository)
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3', 'f1', 'f2')
|
||||
|
||||
self.gcNow(repository)
|
||||
self.assertNotDeleted(repository, 'i1', 'i2', 'i3', 't1', 't2', 't3', 'f1', 'f2')
|
||||
|
||||
def test_time_machine_no_gc(self):
|
||||
""" Repository has two tags with shared images. Deleting the tag should not remove any images
|
||||
"""
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
|
||||
self._set_tag_expiration_policy(repository.namespace_user.username, 60*60*24)
|
||||
with assert_no_new_dangling_storages():
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
|
||||
self._set_tag_expiration_policy(repository.namespace_user.username, 60*60*24)
|
||||
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertNotDeleted(repository, 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'i1', 'f1')
|
||||
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertNotDeleted(repository, 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'i1', 'f1')
|
||||
|
||||
def test_time_machine_gc(self):
|
||||
""" Repository has two tags with shared images. Deleting the second tag should cause the images
|
||||
for the first deleted tag to gc.
|
||||
"""
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
|
||||
with assert_no_new_dangling_storages():
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
|
||||
|
||||
self._set_tag_expiration_policy(repository.namespace_user.username, 1)
|
||||
self._set_tag_expiration_policy(repository.namespace_user.username, 1)
|
||||
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertNotDeleted(repository, 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'i1', 'f1')
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertNotDeleted(repository, 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'i1', 'f1')
|
||||
|
||||
time.sleep(2)
|
||||
time.sleep(2)
|
||||
|
||||
self.deleteTag(repository, 'other') # This will cause the images associated with latest to gc
|
||||
self.assertDeleted(repository, 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'i1', 'f1')
|
||||
|
||||
self.deleteTag(repository, 'other') # This will cause the images associated with latest to gc
|
||||
self.assertDeleted(repository, 'i2', 'i3')
|
||||
self.assertNotDeleted(repository, 'i1', 'f1')
|
||||
|
||||
def test_manifest_gc(self):
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
|
||||
_generate_and_store_manifest(ADMIN_ACCESS_USER, REPO, 'latest')
|
||||
with assert_no_new_dangling_storages():
|
||||
repository = self.createRepository(latest=['i1', 'i2', 'i3'], other=['i1', 'f1'])
|
||||
_generate_and_store_manifest(ADMIN_ACCESS_USER, REPO, 'latest')
|
||||
|
||||
self._set_tag_expiration_policy(repository.namespace_user.username, 0)
|
||||
self._set_tag_expiration_policy(repository.namespace_user.username, 0)
|
||||
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertDeleted(repository, 'i2', 'i3')
|
||||
self.deleteTag(repository, 'latest')
|
||||
self.assertDeleted(repository, 'i2', 'i3')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
Reference in a new issue