Merge pull request #2404 from coreos-inc/cas-gc-fix
Fix GC handling around CAS paths
This commit is contained in:
commit
cbac673d58
3 changed files with 149 additions and 8 deletions
|
@ -7,6 +7,7 @@ import sys
|
|||
import time
|
||||
import uuid
|
||||
|
||||
from contextlib import contextmanager
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
from random import SystemRandom
|
||||
|
@ -230,6 +231,7 @@ db_match_func = CallableProxy()
|
|||
db_for_update = CallableProxy()
|
||||
db_transaction = CallableProxy()
|
||||
db_concat_func = CallableProxy()
|
||||
ensure_under_transaction = CallableProxy()
|
||||
|
||||
|
||||
def validate_database_url(url, db_kwargs, connect_timeout=5):
|
||||
|
@ -286,7 +288,16 @@ def configure(config_object):
|
|||
def _db_transaction():
|
||||
return config_object['DB_TRANSACTION_FACTORY'](db)
|
||||
|
||||
@contextmanager
|
||||
def _ensure_under_transaction():
|
||||
if not config_object['TESTING']:
|
||||
if db.transaction_depth() == 0:
|
||||
raise Exception('Expected to be under a transaction')
|
||||
|
||||
yield
|
||||
|
||||
db_transaction.initialize(_db_transaction)
|
||||
ensure_under_transaction.initialize(_ensure_under_transaction)
|
||||
|
||||
def random_string_generator(length=16):
|
||||
def random_string():
|
||||
|
|
|
@ -8,7 +8,8 @@ from data.model import (config, db_transaction, InvalidImageException, TorrentIn
|
|||
DataModelException, _basequery)
|
||||
from data.database import (ImageStorage, Image, ImageStoragePlacement, ImageStorageLocation,
|
||||
ImageStorageTransformation, ImageStorageSignature,
|
||||
ImageStorageSignatureKind, Repository, Namespace, TorrentInfo)
|
||||
ImageStorageSignatureKind, Repository, Namespace, TorrentInfo,
|
||||
ensure_under_transaction)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -79,10 +80,38 @@ def garbage_collect_storage(storage_id_whitelist):
|
|||
if len(storage_id_whitelist) == 0:
|
||||
return []
|
||||
|
||||
def placements_query_to_paths_set(placements_query):
|
||||
return {(get_image_location_for_id(placement.location_id).name,
|
||||
get_layer_path(placement.storage))
|
||||
for placement in placements_query}
|
||||
def placements_to_filtered_paths_set(placements_list):
|
||||
""" Returns the list of paths to remove from storage, filtered from the given placements
|
||||
query by removing any CAS paths that are still referenced by storage(s) in the database.
|
||||
"""
|
||||
with ensure_under_transaction():
|
||||
if not placements_list:
|
||||
return set()
|
||||
|
||||
# Find the content checksums not referenced by other storages. Any that are, we cannot
|
||||
# remove.
|
||||
content_checksums = set([placement.storage.content_checksum for placement in placements_list
|
||||
if placement.storage.cas_path])
|
||||
|
||||
unreferenced_checksums = set()
|
||||
if content_checksums:
|
||||
query = (ImageStorage
|
||||
.select(ImageStorage.content_checksum)
|
||||
.where(ImageStorage.content_checksum << list(content_checksums)))
|
||||
referenced_checksums = set([image_storage.content_checksum for image_storage in query])
|
||||
if referenced_checksums:
|
||||
logger.warning('GC attempted to remove CAS checksums %s, which are still referenced',
|
||||
referenced_checksums)
|
||||
|
||||
unreferenced_checksums = content_checksums - referenced_checksums
|
||||
|
||||
# Return all placements for all image storages found not at a CAS path or with a content
|
||||
# checksum that is referenced.
|
||||
return {(get_image_location_for_id(placement.location_id).name,
|
||||
get_layer_path(placement.storage))
|
||||
for placement in placements_list
|
||||
if not placement.storage.cas_path or
|
||||
placement.storage.content_checksum in unreferenced_checksums}
|
||||
|
||||
# Note: Both of these deletes must occur in the same transaction (unfortunately) because a
|
||||
# storage without any placement is invalid, and a placement cannot exist without a storage.
|
||||
|
@ -96,12 +125,10 @@ def garbage_collect_storage(storage_id_whitelist):
|
|||
return []
|
||||
|
||||
placements_to_remove = list(ImageStoragePlacement
|
||||
.select()
|
||||
.select(ImageStoragePlacement, ImageStorage)
|
||||
.join(ImageStorage)
|
||||
.where(ImageStorage.id << orphaned_storage_ids))
|
||||
|
||||
paths_to_remove = placements_query_to_paths_set(placements_to_remove)
|
||||
|
||||
# Remove the placements for orphaned storages
|
||||
if len(placements_to_remove) > 0:
|
||||
placement_ids_to_remove = [placement.id for placement in placements_to_remove]
|
||||
|
@ -130,6 +157,11 @@ def garbage_collect_storage(storage_id_whitelist):
|
|||
.execute())
|
||||
logger.debug('Removed %s image storage records', storages_removed)
|
||||
|
||||
# Determine the paths to remove. We cannot simply remove all paths matching storages, as CAS
|
||||
# can share the same path. We further filter these paths by checking for any storages still in
|
||||
# the database with the same content checksum.
|
||||
paths_to_remove = placements_to_filtered_paths_set(placements_to_remove)
|
||||
|
||||
# We are going to make the conscious decision to not delete image storage blobs inside
|
||||
# transactions.
|
||||
# This may end up producing garbage in s3, trading off for higher availability in the database.
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import unittest
|
||||
import time
|
||||
import hashlib
|
||||
|
||||
from contextlib import contextmanager
|
||||
from playhouse.test_utils import assert_query_count
|
||||
|
@ -183,6 +184,12 @@ class TestGarbageCollection(unittest.TestCase):
|
|||
|
||||
self.assertEquals(expect_storage_removed, bool(removed_image_storages))
|
||||
|
||||
# Ensure all CAS storage is in the storage engine.
|
||||
preferred = storage.preferred_locations[0]
|
||||
for storage_row in ImageStorage.select():
|
||||
if storage_row.cas_path:
|
||||
storage.get_content({preferred}, storage.blob_path(storage_row.content_checksum))
|
||||
|
||||
def test_has_garbage(self):
|
||||
""" Remove all existing repositories, then add one without garbage, check, then add one with
|
||||
garbage, and check again.
|
||||
|
@ -405,6 +412,97 @@ class TestGarbageCollection(unittest.TestCase):
|
|||
self.assertDeleted(repository, 'i1')
|
||||
self.assertNotDeleted(repository, 'i2')
|
||||
|
||||
def test_image_with_cas(self):
|
||||
""" A repository with a tag pointing to an image backed by CAS. Deleting and GCing the tag
|
||||
should result in the storage and its CAS data being removed.
|
||||
"""
|
||||
with self.assert_gc_integrity(expect_storage_removed=True):
|
||||
repository = self.createRepository()
|
||||
|
||||
# Create an image storage record under CAS.
|
||||
content = 'hello world'
|
||||
digest = 'sha256:' + hashlib.sha256(content).hexdigest()
|
||||
preferred = storage.preferred_locations[0]
|
||||
storage.put_content({preferred}, storage.blob_path(digest), content)
|
||||
|
||||
image_storage = database.ImageStorage.create(content_checksum=digest, uploading=False)
|
||||
location = database.ImageStorageLocation.get(name=preferred)
|
||||
database.ImageStoragePlacement.create(location=location, storage=image_storage)
|
||||
|
||||
# Ensure the CAS path exists.
|
||||
self.assertTrue(storage.exists({preferred}, storage.blob_path(digest)))
|
||||
|
||||
# Create the image and the tag.
|
||||
first_image = Image.create(docker_image_id='i1',
|
||||
repository=repository, storage=image_storage,
|
||||
ancestors='/')
|
||||
|
||||
model.tag.store_tag_manifest(repository.namespace_user.username, repository.name,
|
||||
'first', first_image.docker_image_id,
|
||||
'sha:someshahere1', '{}')
|
||||
|
||||
self.assertNotDeleted(repository, 'i1')
|
||||
|
||||
# Delete the tag.
|
||||
self.deleteTag(repository, 'first')
|
||||
self.assertDeleted(repository, 'i1')
|
||||
|
||||
# Ensure the CAS path is gone.
|
||||
self.assertFalse(storage.exists({preferred}, storage.blob_path(digest)))
|
||||
|
||||
def test_images_shared_cas(self):
|
||||
""" A repository, each two tags, pointing to the same image, which has image storage
|
||||
with the same *CAS path*, but *distinct records*. Deleting the first tag should delete the
|
||||
first image, and its storage, but not the file in storage, as it shares its CAS path.
|
||||
"""
|
||||
with self.assert_gc_integrity(expect_storage_removed=True):
|
||||
repository = self.createRepository()
|
||||
|
||||
# Create two image storage records with the same content checksum.
|
||||
content = 'hello world'
|
||||
digest = 'sha256:' + hashlib.sha256(content).hexdigest()
|
||||
preferred = storage.preferred_locations[0]
|
||||
storage.put_content({preferred}, storage.blob_path(digest), content)
|
||||
|
||||
is1 = database.ImageStorage.create(content_checksum=digest, uploading=False)
|
||||
is2 = database.ImageStorage.create(content_checksum=digest, uploading=False)
|
||||
|
||||
location = database.ImageStorageLocation.get(name=preferred)
|
||||
|
||||
database.ImageStoragePlacement.create(location=location, storage=is1)
|
||||
database.ImageStoragePlacement.create(location=location, storage=is2)
|
||||
|
||||
# Ensure the CAS path exists.
|
||||
self.assertTrue(storage.exists({preferred}, storage.blob_path(digest)))
|
||||
|
||||
# Create two images in the repository, and two tags, each pointing to one of the storages.
|
||||
first_image = Image.create(docker_image_id='i1',
|
||||
repository=repository, storage=is1,
|
||||
ancestors='/')
|
||||
|
||||
second_image = Image.create(docker_image_id='i2',
|
||||
repository=repository, storage=is2,
|
||||
ancestors='/')
|
||||
|
||||
model.tag.store_tag_manifest(repository.namespace_user.username, repository.name,
|
||||
'first', first_image.docker_image_id,
|
||||
'sha:someshahere1', '{}')
|
||||
|
||||
model.tag.store_tag_manifest(repository.namespace_user.username, repository.name,
|
||||
'second', second_image.docker_image_id,
|
||||
'sha:someshahere2', '{}')
|
||||
|
||||
self.assertNotDeleted(repository, 'i1', 'i2')
|
||||
|
||||
# Delete the first tag.
|
||||
self.deleteTag(repository, 'first')
|
||||
self.assertDeleted(repository, 'i1')
|
||||
self.assertNotDeleted(repository, 'i2')
|
||||
|
||||
# Ensure the CAS path still exists.
|
||||
self.assertTrue(storage.exists({preferred}, storage.blob_path(digest)))
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Reference in a new issue