From 69e550d12579882eb093d6e7f66be53b7fc5aac1 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 6 Mar 2017 16:51:05 -0500 Subject: [PATCH 1/2] Fix GC handling around CAS paths Adds code to ensure we never GC CAS paths that are shared amongst multiple ImageStorage rows, as well as an associated pair of tests to catch the positive and negative cases. --- data/model/storage.py | 36 +++++++++++++--- test/test_gc.py | 98 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+), 5 deletions(-) diff --git a/data/model/storage.py b/data/model/storage.py index 61153714e..b0d6e2487 100644 --- a/data/model/storage.py +++ b/data/model/storage.py @@ -79,10 +79,33 @@ def garbage_collect_storage(storage_id_whitelist): if len(storage_id_whitelist) == 0: return [] - def placements_query_to_paths_set(placements_query): + def placements_to_filtered_paths_set(placements_list): + """ Returns the list of paths to remove from storage, filtered from the given placements + query by removing any CAS paths that are still referenced by storage(s) in the database. + """ + if not placements_list: + return set() + + # Find the content checksums not referenced by other storages. Any that are, we cannot + # remove. + content_checksums = set([placement.storage.content_checksum for placement in placements_list + if placement.storage.cas_path]) + + unreferenced_checksums = set() + if content_checksums: + query = (ImageStorage + .select(ImageStorage.content_checksum) + .where(ImageStorage.content_checksum << list(content_checksums))) + referenced_checksums = set([image_storage.content_checksum for image_storage in query]) + unreferenced_checksums = content_checksums - referenced_checksums + + # Return all placements for all image storages found not at a CAS path or with a content + # checksum that is referenced. return {(get_image_location_for_id(placement.location_id).name, get_layer_path(placement.storage)) - for placement in placements_query} + for placement in placements_list + if not placement.storage.cas_path or + placement.storage.content_checksum in unreferenced_checksums} # Note: Both of these deletes must occur in the same transaction (unfortunately) because a # storage without any placement is invalid, and a placement cannot exist without a storage. @@ -96,12 +119,10 @@ def garbage_collect_storage(storage_id_whitelist): return [] placements_to_remove = list(ImageStoragePlacement - .select() + .select(ImageStoragePlacement, ImageStorage) .join(ImageStorage) .where(ImageStorage.id << orphaned_storage_ids)) - paths_to_remove = placements_query_to_paths_set(placements_to_remove) - # Remove the placements for orphaned storages if len(placements_to_remove) > 0: placement_ids_to_remove = [placement.id for placement in placements_to_remove] @@ -130,6 +151,11 @@ def garbage_collect_storage(storage_id_whitelist): .execute()) logger.debug('Removed %s image storage records', storages_removed) + # Determine the paths to remove. We cannot simply remove all paths matching storages, as CAS + # can share the same path. We further filter these paths by checking for any storages still in + # the database with the same content checksum. + paths_to_remove = placements_to_filtered_paths_set(placements_to_remove) + # We are going to make the conscious decision to not delete image storage blobs inside # transactions. # This may end up producing garbage in s3, trading off for higher availability in the database. diff --git a/test/test_gc.py b/test/test_gc.py index 5155d354f..c8ed5ecaa 100644 --- a/test/test_gc.py +++ b/test/test_gc.py @@ -1,5 +1,6 @@ import unittest import time +import hashlib from contextlib import contextmanager from playhouse.test_utils import assert_query_count @@ -183,6 +184,12 @@ class TestGarbageCollection(unittest.TestCase): self.assertEquals(expect_storage_removed, bool(removed_image_storages)) + # Ensure all CAS storage is in the storage engine. + preferred = storage.preferred_locations[0] + for storage_row in ImageStorage.select(): + if storage_row.cas_path: + storage.get_content({preferred}, storage.blob_path(storage_row.content_checksum)) + def test_has_garbage(self): """ Remove all existing repositories, then add one without garbage, check, then add one with garbage, and check again. @@ -405,6 +412,97 @@ class TestGarbageCollection(unittest.TestCase): self.assertDeleted(repository, 'i1') self.assertNotDeleted(repository, 'i2') + def test_image_with_cas(self): + """ A repository with a tag pointing to an image backed by CAS. Deleting and GCing the tag + should result in the storage and its CAS data being removed. + """ + with self.assert_gc_integrity(expect_storage_removed=True): + repository = self.createRepository() + + # Create an image storage record under CAS. + content = 'hello world' + digest = 'sha256:' + hashlib.sha256(content).hexdigest() + preferred = storage.preferred_locations[0] + storage.put_content({preferred}, storage.blob_path(digest), content) + + image_storage = database.ImageStorage.create(content_checksum=digest, uploading=False) + location = database.ImageStorageLocation.get(name=preferred) + database.ImageStoragePlacement.create(location=location, storage=image_storage) + + # Ensure the CAS path exists. + self.assertTrue(storage.exists({preferred}, storage.blob_path(digest))) + + # Create the image and the tag. + first_image = Image.create(docker_image_id='i1', + repository=repository, storage=image_storage, + ancestors='/') + + model.tag.store_tag_manifest(repository.namespace_user.username, repository.name, + 'first', first_image.docker_image_id, + 'sha:someshahere1', '{}') + + self.assertNotDeleted(repository, 'i1') + + # Delete the tag. + self.deleteTag(repository, 'first') + self.assertDeleted(repository, 'i1') + + # Ensure the CAS path is gone. + self.assertFalse(storage.exists({preferred}, storage.blob_path(digest))) + + def test_images_shared_cas(self): + """ A repository, each two tags, pointing to the same image, which has image storage + with the same *CAS path*, but *distinct records*. Deleting the first tag should delete the + first image, and its storage, but not the file in storage, as it shares its CAS path. + """ + with self.assert_gc_integrity(expect_storage_removed=True): + repository = self.createRepository() + + # Create two image storage records with the same content checksum. + content = 'hello world' + digest = 'sha256:' + hashlib.sha256(content).hexdigest() + preferred = storage.preferred_locations[0] + storage.put_content({preferred}, storage.blob_path(digest), content) + + is1 = database.ImageStorage.create(content_checksum=digest, uploading=False) + is2 = database.ImageStorage.create(content_checksum=digest, uploading=False) + + location = database.ImageStorageLocation.get(name=preferred) + + database.ImageStoragePlacement.create(location=location, storage=is1) + database.ImageStoragePlacement.create(location=location, storage=is2) + + # Ensure the CAS path exists. + self.assertTrue(storage.exists({preferred}, storage.blob_path(digest))) + + # Create two images in the repository, and two tags, each pointing to one of the storages. + first_image = Image.create(docker_image_id='i1', + repository=repository, storage=is1, + ancestors='/') + + second_image = Image.create(docker_image_id='i2', + repository=repository, storage=is2, + ancestors='/') + + model.tag.store_tag_manifest(repository.namespace_user.username, repository.name, + 'first', first_image.docker_image_id, + 'sha:someshahere1', '{}') + + model.tag.store_tag_manifest(repository.namespace_user.username, repository.name, + 'second', second_image.docker_image_id, + 'sha:someshahere2', '{}') + + self.assertNotDeleted(repository, 'i1', 'i2') + + # Delete the first tag. + self.deleteTag(repository, 'first') + self.assertDeleted(repository, 'i1') + self.assertNotDeleted(repository, 'i2') + + # Ensure the CAS path still exists. + self.assertTrue(storage.exists({preferred}, storage.blob_path(digest))) + + if __name__ == '__main__': unittest.main() From 62312e64619e80619a3bd2d8c84dd8a00b4ab975 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 8 Mar 2017 17:01:07 -0500 Subject: [PATCH 2/2] Add warning when CAS paths are skipped and ensure we are under a transaction --- data/database.py | 11 ++++++++++ data/model/storage.py | 48 ++++++++++++++++++++++++------------------- 2 files changed, 38 insertions(+), 21 deletions(-) diff --git a/data/database.py b/data/database.py index f07ad6592..d51f052bf 100644 --- a/data/database.py +++ b/data/database.py @@ -7,6 +7,7 @@ import sys import time import uuid +from contextlib import contextmanager from collections import defaultdict from datetime import datetime from random import SystemRandom @@ -230,6 +231,7 @@ db_match_func = CallableProxy() db_for_update = CallableProxy() db_transaction = CallableProxy() db_concat_func = CallableProxy() +ensure_under_transaction = CallableProxy() def validate_database_url(url, db_kwargs, connect_timeout=5): @@ -286,7 +288,16 @@ def configure(config_object): def _db_transaction(): return config_object['DB_TRANSACTION_FACTORY'](db) + @contextmanager + def _ensure_under_transaction(): + if not config_object['TESTING']: + if db.transaction_depth() == 0: + raise Exception('Expected to be under a transaction') + + yield + db_transaction.initialize(_db_transaction) + ensure_under_transaction.initialize(_ensure_under_transaction) def random_string_generator(length=16): def random_string(): diff --git a/data/model/storage.py b/data/model/storage.py index b0d6e2487..79ab529bb 100644 --- a/data/model/storage.py +++ b/data/model/storage.py @@ -8,7 +8,8 @@ from data.model import (config, db_transaction, InvalidImageException, TorrentIn DataModelException, _basequery) from data.database import (ImageStorage, Image, ImageStoragePlacement, ImageStorageLocation, ImageStorageTransformation, ImageStorageSignature, - ImageStorageSignatureKind, Repository, Namespace, TorrentInfo) + ImageStorageSignatureKind, Repository, Namespace, TorrentInfo, + ensure_under_transaction) logger = logging.getLogger(__name__) @@ -83,29 +84,34 @@ def garbage_collect_storage(storage_id_whitelist): """ Returns the list of paths to remove from storage, filtered from the given placements query by removing any CAS paths that are still referenced by storage(s) in the database. """ - if not placements_list: - return set() + with ensure_under_transaction(): + if not placements_list: + return set() - # Find the content checksums not referenced by other storages. Any that are, we cannot - # remove. - content_checksums = set([placement.storage.content_checksum for placement in placements_list - if placement.storage.cas_path]) + # Find the content checksums not referenced by other storages. Any that are, we cannot + # remove. + content_checksums = set([placement.storage.content_checksum for placement in placements_list + if placement.storage.cas_path]) - unreferenced_checksums = set() - if content_checksums: - query = (ImageStorage - .select(ImageStorage.content_checksum) - .where(ImageStorage.content_checksum << list(content_checksums))) - referenced_checksums = set([image_storage.content_checksum for image_storage in query]) - unreferenced_checksums = content_checksums - referenced_checksums + unreferenced_checksums = set() + if content_checksums: + query = (ImageStorage + .select(ImageStorage.content_checksum) + .where(ImageStorage.content_checksum << list(content_checksums))) + referenced_checksums = set([image_storage.content_checksum for image_storage in query]) + if referenced_checksums: + logger.warning('GC attempted to remove CAS checksums %s, which are still referenced', + referenced_checksums) - # Return all placements for all image storages found not at a CAS path or with a content - # checksum that is referenced. - return {(get_image_location_for_id(placement.location_id).name, - get_layer_path(placement.storage)) - for placement in placements_list - if not placement.storage.cas_path or - placement.storage.content_checksum in unreferenced_checksums} + unreferenced_checksums = content_checksums - referenced_checksums + + # Return all placements for all image storages found not at a CAS path or with a content + # checksum that is referenced. + return {(get_image_location_for_id(placement.location_id).name, + get_layer_path(placement.storage)) + for placement in placements_list + if not placement.storage.cas_path or + placement.storage.content_checksum in unreferenced_checksums} # Note: Both of these deletes must occur in the same transaction (unfortunately) because a # storage without any placement is invalid, and a placement cannot exist without a storage.