diff --git a/data/database.py b/data/database.py index f07ad6592..d51f052bf 100644 --- a/data/database.py +++ b/data/database.py @@ -7,6 +7,7 @@ import sys import time import uuid +from contextlib import contextmanager from collections import defaultdict from datetime import datetime from random import SystemRandom @@ -230,6 +231,7 @@ db_match_func = CallableProxy() db_for_update = CallableProxy() db_transaction = CallableProxy() db_concat_func = CallableProxy() +ensure_under_transaction = CallableProxy() def validate_database_url(url, db_kwargs, connect_timeout=5): @@ -286,7 +288,16 @@ def configure(config_object): def _db_transaction(): return config_object['DB_TRANSACTION_FACTORY'](db) + @contextmanager + def _ensure_under_transaction(): + if not config_object['TESTING']: + if db.transaction_depth() == 0: + raise Exception('Expected to be under a transaction') + + yield + db_transaction.initialize(_db_transaction) + ensure_under_transaction.initialize(_ensure_under_transaction) def random_string_generator(length=16): def random_string(): diff --git a/data/model/storage.py b/data/model/storage.py index 61153714e..79ab529bb 100644 --- a/data/model/storage.py +++ b/data/model/storage.py @@ -8,7 +8,8 @@ from data.model import (config, db_transaction, InvalidImageException, TorrentIn DataModelException, _basequery) from data.database import (ImageStorage, Image, ImageStoragePlacement, ImageStorageLocation, ImageStorageTransformation, ImageStorageSignature, - ImageStorageSignatureKind, Repository, Namespace, TorrentInfo) + ImageStorageSignatureKind, Repository, Namespace, TorrentInfo, + ensure_under_transaction) logger = logging.getLogger(__name__) @@ -79,10 +80,38 @@ def garbage_collect_storage(storage_id_whitelist): if len(storage_id_whitelist) == 0: return [] - def placements_query_to_paths_set(placements_query): - return {(get_image_location_for_id(placement.location_id).name, - get_layer_path(placement.storage)) - for placement in placements_query} + def placements_to_filtered_paths_set(placements_list): + """ Returns the list of paths to remove from storage, filtered from the given placements + query by removing any CAS paths that are still referenced by storage(s) in the database. + """ + with ensure_under_transaction(): + if not placements_list: + return set() + + # Find the content checksums not referenced by other storages. Any that are, we cannot + # remove. + content_checksums = set([placement.storage.content_checksum for placement in placements_list + if placement.storage.cas_path]) + + unreferenced_checksums = set() + if content_checksums: + query = (ImageStorage + .select(ImageStorage.content_checksum) + .where(ImageStorage.content_checksum << list(content_checksums))) + referenced_checksums = set([image_storage.content_checksum for image_storage in query]) + if referenced_checksums: + logger.warning('GC attempted to remove CAS checksums %s, which are still referenced', + referenced_checksums) + + unreferenced_checksums = content_checksums - referenced_checksums + + # Return all placements for all image storages found not at a CAS path or with a content + # checksum that is referenced. + return {(get_image_location_for_id(placement.location_id).name, + get_layer_path(placement.storage)) + for placement in placements_list + if not placement.storage.cas_path or + placement.storage.content_checksum in unreferenced_checksums} # Note: Both of these deletes must occur in the same transaction (unfortunately) because a # storage without any placement is invalid, and a placement cannot exist without a storage. @@ -96,12 +125,10 @@ def garbage_collect_storage(storage_id_whitelist): return [] placements_to_remove = list(ImageStoragePlacement - .select() + .select(ImageStoragePlacement, ImageStorage) .join(ImageStorage) .where(ImageStorage.id << orphaned_storage_ids)) - paths_to_remove = placements_query_to_paths_set(placements_to_remove) - # Remove the placements for orphaned storages if len(placements_to_remove) > 0: placement_ids_to_remove = [placement.id for placement in placements_to_remove] @@ -130,6 +157,11 @@ def garbage_collect_storage(storage_id_whitelist): .execute()) logger.debug('Removed %s image storage records', storages_removed) + # Determine the paths to remove. We cannot simply remove all paths matching storages, as CAS + # can share the same path. We further filter these paths by checking for any storages still in + # the database with the same content checksum. + paths_to_remove = placements_to_filtered_paths_set(placements_to_remove) + # We are going to make the conscious decision to not delete image storage blobs inside # transactions. # This may end up producing garbage in s3, trading off for higher availability in the database. diff --git a/test/test_gc.py b/test/test_gc.py index 5155d354f..c8ed5ecaa 100644 --- a/test/test_gc.py +++ b/test/test_gc.py @@ -1,5 +1,6 @@ import unittest import time +import hashlib from contextlib import contextmanager from playhouse.test_utils import assert_query_count @@ -183,6 +184,12 @@ class TestGarbageCollection(unittest.TestCase): self.assertEquals(expect_storage_removed, bool(removed_image_storages)) + # Ensure all CAS storage is in the storage engine. + preferred = storage.preferred_locations[0] + for storage_row in ImageStorage.select(): + if storage_row.cas_path: + storage.get_content({preferred}, storage.blob_path(storage_row.content_checksum)) + def test_has_garbage(self): """ Remove all existing repositories, then add one without garbage, check, then add one with garbage, and check again. @@ -405,6 +412,97 @@ class TestGarbageCollection(unittest.TestCase): self.assertDeleted(repository, 'i1') self.assertNotDeleted(repository, 'i2') + def test_image_with_cas(self): + """ A repository with a tag pointing to an image backed by CAS. Deleting and GCing the tag + should result in the storage and its CAS data being removed. + """ + with self.assert_gc_integrity(expect_storage_removed=True): + repository = self.createRepository() + + # Create an image storage record under CAS. + content = 'hello world' + digest = 'sha256:' + hashlib.sha256(content).hexdigest() + preferred = storage.preferred_locations[0] + storage.put_content({preferred}, storage.blob_path(digest), content) + + image_storage = database.ImageStorage.create(content_checksum=digest, uploading=False) + location = database.ImageStorageLocation.get(name=preferred) + database.ImageStoragePlacement.create(location=location, storage=image_storage) + + # Ensure the CAS path exists. + self.assertTrue(storage.exists({preferred}, storage.blob_path(digest))) + + # Create the image and the tag. + first_image = Image.create(docker_image_id='i1', + repository=repository, storage=image_storage, + ancestors='/') + + model.tag.store_tag_manifest(repository.namespace_user.username, repository.name, + 'first', first_image.docker_image_id, + 'sha:someshahere1', '{}') + + self.assertNotDeleted(repository, 'i1') + + # Delete the tag. + self.deleteTag(repository, 'first') + self.assertDeleted(repository, 'i1') + + # Ensure the CAS path is gone. + self.assertFalse(storage.exists({preferred}, storage.blob_path(digest))) + + def test_images_shared_cas(self): + """ A repository, each two tags, pointing to the same image, which has image storage + with the same *CAS path*, but *distinct records*. Deleting the first tag should delete the + first image, and its storage, but not the file in storage, as it shares its CAS path. + """ + with self.assert_gc_integrity(expect_storage_removed=True): + repository = self.createRepository() + + # Create two image storage records with the same content checksum. + content = 'hello world' + digest = 'sha256:' + hashlib.sha256(content).hexdigest() + preferred = storage.preferred_locations[0] + storage.put_content({preferred}, storage.blob_path(digest), content) + + is1 = database.ImageStorage.create(content_checksum=digest, uploading=False) + is2 = database.ImageStorage.create(content_checksum=digest, uploading=False) + + location = database.ImageStorageLocation.get(name=preferred) + + database.ImageStoragePlacement.create(location=location, storage=is1) + database.ImageStoragePlacement.create(location=location, storage=is2) + + # Ensure the CAS path exists. + self.assertTrue(storage.exists({preferred}, storage.blob_path(digest))) + + # Create two images in the repository, and two tags, each pointing to one of the storages. + first_image = Image.create(docker_image_id='i1', + repository=repository, storage=is1, + ancestors='/') + + second_image = Image.create(docker_image_id='i2', + repository=repository, storage=is2, + ancestors='/') + + model.tag.store_tag_manifest(repository.namespace_user.username, repository.name, + 'first', first_image.docker_image_id, + 'sha:someshahere1', '{}') + + model.tag.store_tag_manifest(repository.namespace_user.username, repository.name, + 'second', second_image.docker_image_id, + 'sha:someshahere2', '{}') + + self.assertNotDeleted(repository, 'i1', 'i2') + + # Delete the first tag. + self.deleteTag(repository, 'first') + self.assertDeleted(repository, 'i1') + self.assertNotDeleted(repository, 'i2') + + # Ensure the CAS path still exists. + self.assertTrue(storage.exists({preferred}, storage.blob_path(digest))) + + if __name__ == '__main__': unittest.main()