Reduce database bandwidth by tracking gc candidate images.

This commit is contained in:
Jake Moshenko 2016-08-26 14:48:39 -04:00
parent 0815f6b6c4
commit 584a5a7ddd
5 changed files with 161 additions and 107 deletions

View file

@ -34,6 +34,7 @@ _SCHEME_DRIVERS = {
'postgresql+psycopg2': PostgresqlDatabase, 'postgresql+psycopg2': PostgresqlDatabase,
} }
SCHEME_RANDOM_FUNCTION = { SCHEME_RANDOM_FUNCTION = {
'mysql': fn.Rand, 'mysql': fn.Rand,
'mysql+pymysql': fn.Rand, 'mysql+pymysql': fn.Rand,
@ -42,12 +43,37 @@ SCHEME_RANDOM_FUNCTION = {
'postgresql+psycopg2': fn.Random, 'postgresql+psycopg2': fn.Random,
} }
def pipes_concat(arg1, arg2, *extra_args):
""" Concat function for sqlite, since it doesn't support fn.Concat.
Concatenates clauses with || characters.
"""
reduced = arg1.concat(arg2)
for arg in extra_args:
reduced = reduced.concat(arg)
return reduced
def function_concat(arg1, arg2, *extra_args):
""" Default implementation of concat which uses fn.Concat(). Used by all
database engines except sqlite.
"""
return fn.Concat(arg1, arg2, *extra_args)
SCHEME_SPECIALIZED_CONCAT = {
'sqlite': pipes_concat,
}
def real_for_update(query): def real_for_update(query):
return query.for_update() return query.for_update()
def null_for_update(query): def null_for_update(query):
return query return query
def delete_instance_filtered(instance, model_class, delete_nullable, skip_transitive_deletes): def delete_instance_filtered(instance, model_class, delete_nullable, skip_transitive_deletes):
""" Deletes the DB instance recursively, skipping any models in the skip_transitive_deletes set. """ Deletes the DB instance recursively, skipping any models in the skip_transitive_deletes set.
@ -181,6 +207,7 @@ read_slave = Proxy()
db_random_func = CallableProxy() db_random_func = CallableProxy()
db_for_update = CallableProxy() db_for_update = CallableProxy()
db_transaction = CallableProxy() db_transaction = CallableProxy()
db_concat_func = CallableProxy()
def validate_database_url(url, db_kwargs, connect_timeout=5): def validate_database_url(url, db_kwargs, connect_timeout=5):
@ -227,6 +254,8 @@ def configure(config_object):
db_random_func.initialize(SCHEME_RANDOM_FUNCTION[parsed_write_uri.drivername]) db_random_func.initialize(SCHEME_RANDOM_FUNCTION[parsed_write_uri.drivername])
db_for_update.initialize(SCHEME_SPECIALIZED_FOR_UPDATE.get(parsed_write_uri.drivername, db_for_update.initialize(SCHEME_SPECIALIZED_FOR_UPDATE.get(parsed_write_uri.drivername,
real_for_update)) real_for_update))
db_concat_func.initialize(SCHEME_SPECIALIZED_CONCAT.get(parsed_write_uri.drivername,
function_concat))
read_slave_uri = config_object.get('DB_READ_SLAVE_URI', None) read_slave_uri = config_object.get('DB_READ_SLAVE_URI', None)
if read_slave_uri is not None: if read_slave_uri is not None:

View file

@ -11,7 +11,7 @@ from data.database import (Repository, Namespace, RepositoryTag, Star, Image, Us
Visibility, RepositoryPermission, RepositoryActionCount, Visibility, RepositoryPermission, RepositoryActionCount,
Role, RepositoryAuthorizedEmail, TagManifest, DerivedStorageForImage, Role, RepositoryAuthorizedEmail, TagManifest, DerivedStorageForImage,
Label, TagManifestLabel, db_for_update, get_epoch_timestamp, Label, TagManifestLabel, db_for_update, get_epoch_timestamp,
db_random_func) db_random_func, db_concat_func)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -43,45 +43,21 @@ def get_repository(namespace_name, repository_name):
return None return None
def _purge_all_repository_tags(namespace_name, repository_name):
""" Immediately purge all repository tags without respecting the lifeline procedure """
try:
repo = _basequery.get_existing_repository(namespace_name, repository_name)
except Repository.DoesNotExist:
raise DataModelException('Invalid repository \'%s/%s\'' %
(namespace_name, repository_name))
# Finds all the tags to delete.
repo_tags = list(RepositoryTag.select().where(RepositoryTag.repository == repo.id))
if not repo_tags:
return
# Find all labels to delete.
manifest_labels_query = (TagManifestLabel
.select()
.where(TagManifestLabel.repository == repo))
label_ids = [manifest_label.label_id for manifest_label in manifest_labels_query]
if label_ids:
# Delete all the mapping entries.
TagManifestLabel.delete().where(TagManifestLabel.repository == repo).execute()
# Delete all the matching labels.
Label.delete().where(Label.id << label_ids).execute()
# Delete all the manifests.
TagManifest.delete().where(TagManifest.tag << repo_tags).execute()
# Delete all tags.
RepositoryTag.delete().where(RepositoryTag.repository == repo.id).execute()
def purge_repository(namespace_name, repository_name): def purge_repository(namespace_name, repository_name):
repo = _basequery.get_existing_repository(namespace_name, repository_name)
# Delete all tags to allow gc to reclaim storage # Delete all tags to allow gc to reclaim storage
_purge_all_repository_tags(namespace_name, repository_name) previously_referenced = tag.purge_all_tags(repo)
unreferenced_image_q = Image.select(Image.id).where(Image.repository == repo)
if len(previously_referenced) > 0:
unreferenced_image_q = (unreferenced_image_q
.where(~(Image.id << list(previously_referenced))))
unreferenced_candidates = set(img[0] for img in unreferenced_image_q.tuples())
# Gc to remove the images and storage # Gc to remove the images and storage
garbage_collect_repository(namespace_name, repository_name) garbage_collect_repo(repo, previously_referenced | unreferenced_candidates)
# Delete the rest of the repository metadata # Delete the rest of the repository metadata
fetched = _basequery.get_existing_repository(namespace_name, repository_name) fetched = _basequery.get_existing_repository(namespace_name, repository_name)
@ -135,34 +111,46 @@ def find_repository_with_garbage(limit_to_gc_policy_s):
return None return None
def garbage_collect_repository(namespace_name, repository_name): def garbage_collect_repo(repo, extra_candidate_set=None):
repo = get_repository(namespace_name, repository_name)
if repo is not None:
garbage_collect_repo(repo)
def garbage_collect_repo(repo):
logger.debug('Garbage collecting repository %s', repo.id) logger.debug('Garbage collecting repository %s', repo.id)
storage_id_whitelist = set() storage_id_whitelist = set()
tag.garbage_collect_tags(repo) candidate_orphan_image_set = tag.garbage_collect_tags(repo)
if extra_candidate_set:
candidate_orphan_image_set.update(extra_candidate_set)
if not len(candidate_orphan_image_set):
logger.debug('No candidate images for GC for repo: %s', repo.id)
return
candidates_orphans = list(candidate_orphan_image_set)
with db_transaction(): with db_transaction():
# Get a list of all images used by tags in the repository Candidate = Image.alias()
tagged_images = (Image Tagged = Image.alias()
.select(Image.id, Image.ancestors) ancestor_superset = Tagged.ancestors ** db_concat_func(Candidate.ancestors, Candidate.id, '/%')
.join(RepositoryTag)
.where(Image.repository == repo))
def gen_referenced_ancestors(): # We are going to compute all images which are being referenced in two ways:
for tagged_image in tagged_images: # First, we will find all images which have their ancestor paths appear in
# The ancestor list is in the format '/1/2/3/', extract just the ids # another image. Secondly, we union in all of the candidate images which are
ancestor_id_strings = tagged_image.ancestor_list() # directly referenced by a tag. This can be used in a subquery to directly
for img_id_str in ancestor_id_strings: # find which candidates are being referenced without any client side
yield int(img_id_str) # computation or extra round trips.
yield tagged_image.id ancestor_referenced = (Candidate
.select(Candidate.id)
.join(Tagged, on=ancestor_superset)
.join(RepositoryTag, on=(Tagged.id == RepositoryTag.image))
.where(RepositoryTag.repository == repo.id,
Candidate.id << candidates_orphans))
referenced_ancestors = set(gen_referenced_ancestors()) direct_referenced = (Candidate
.select(Candidate.id)
.join(RepositoryTag)
.where(RepositoryTag.repository == repo.id,
Candidate.id << candidates_orphans))
referenced_candidates = (direct_referenced | ancestor_referenced)
# We desire two pieces of information from the database from the following # We desire two pieces of information from the database from the following
# query: all of the image ids which are associated with this repository, # query: all of the image ids which are associated with this repository,
@ -171,13 +159,18 @@ def garbage_collect_repo(repo):
# code, which is overkill for just two fields, we use a tuple query, and # code, which is overkill for just two fields, we use a tuple query, and
# feed that directly to the dictionary tuple constructor which takes an # feed that directly to the dictionary tuple constructor which takes an
# iterable of tuples containing [(k, v), (k, v), ...] # iterable of tuples containing [(k, v), (k, v), ...]
all_repo_images = Image.select(Image.id, Image.storage).where(Image.repository == repo).tuples() unreferenced_candidates = (Image
images_to_storages = dict(all_repo_images) .select(Image.id, Image.storage)
to_remove = list(set(images_to_storages.keys()).difference(referenced_ancestors)) .where(Image.id << candidates_orphans,
~(Image.id << referenced_candidates))
.tuples())
unreferecend_images_to_storages = dict(unreferenced_candidates)
to_remove = unreferecend_images_to_storages.keys()
if len(to_remove) > 0: if len(to_remove) > 0:
logger.info('Cleaning up unreferenced images: %s', to_remove) logger.info('Cleaning up unreferenced images: %s', to_remove)
storage_id_whitelist = {images_to_storages[to_remove_id] for to_remove_id in to_remove} storage_id_whitelist = set(unreferecend_images_to_storages.values())
# Lookup any derived images for the images to remove. # Lookup any derived images for the images to remove.
derived = DerivedStorageForImage.select().where( derived = DerivedStorageForImage.select().where(

View file

@ -138,54 +138,86 @@ def delete_tag(namespace_name, repository_name, tag_name):
def garbage_collect_tags(repo): def garbage_collect_tags(repo):
expired_time = get_epoch_timestamp() - repo.namespace_user.removed_tag_expiration_s """ Remove all of the tags that have gone past their garbage collection
expiration window, and return a set of image ids which *may* have been
orphaned.
"""
def add_expiration_data(base_query):
expired_clause = get_epoch_timestamp() - Namespace.removed_tag_expiration_s
return (base_query
.switch(RepositoryTag)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(~(RepositoryTag.lifetime_end_ts >> None),
RepositoryTag.lifetime_end_ts <= expired_clause))
return _delete_tags(repo, add_expiration_data)
tags_to_delete = list(RepositoryTag def purge_all_tags(repo):
.select(RepositoryTag.id) """ Remove all tags from the repository, and return a set of all of the images
.where(RepositoryTag.repository == repo, ids which are now orphaned.
~(RepositoryTag.lifetime_end_ts >> None), """
(RepositoryTag.lifetime_end_ts <= expired_time)) return _delete_tags(repo)
.order_by(RepositoryTag.id))
if len(tags_to_delete) > 0: def _delete_tags(repo, query_modifier=None):
with db_transaction(): """ Garbage collect the tags for a repository and return a set of the image
manifests_to_delete = list(TagManifest ids which may now be orphaned.
.select(TagManifest.id) """
.join(RepositoryTag) tags_to_delete_q = (RepositoryTag
.where(RepositoryTag.id << tags_to_delete)) .select(RepositoryTag.id, Image.ancestors, Image.id)
.join(Image)
.where(RepositoryTag.repository == repo))
num_deleted_manifests = 0 if query_modifier is not None:
if len(manifests_to_delete) > 0: tags_to_delete_q = query_modifier(tags_to_delete_q)
# Find the set of IDs for all the labels to delete.
manifest_labels_query = (TagManifestLabel
.select()
.where(TagManifestLabel.repository == repo,
TagManifestLabel.annotated << manifests_to_delete))
label_ids = [manifest_label.label_id for manifest_label in manifest_labels_query] tags_to_delete = list(tags_to_delete_q)
if label_ids:
# Delete all the mapping entries.
(TagManifestLabel
.delete()
.where(TagManifestLabel.repository == repo,
TagManifestLabel.annotated << manifests_to_delete)
.execute())
# Delete all the matching labels. if len(tags_to_delete) == 0:
Label.delete().where(Label.id << label_ids).execute() return set()
# Delete the tag manifests themselves. with db_transaction():
num_deleted_manifests = (TagManifest manifests_to_delete = list(TagManifest
.delete() .select(TagManifest.id)
.where(TagManifest.id << manifests_to_delete) .join(RepositoryTag)
.execute()) .where(RepositoryTag.id << tags_to_delete))
num_deleted_tags = (RepositoryTag num_deleted_manifests = 0
.delete() if len(manifests_to_delete) > 0:
.where(RepositoryTag.id << tags_to_delete) # Find the set of IDs for all the labels to delete.
.execute()) manifest_labels_query = (TagManifestLabel
.select()
.where(TagManifestLabel.repository == repo,
TagManifestLabel.annotated << manifests_to_delete))
logger.debug('Removed %s tags with %s manifests', num_deleted_tags, num_deleted_manifests) label_ids = [manifest_label.label_id for manifest_label in manifest_labels_query]
if label_ids:
# Delete all the mapping entries.
(TagManifestLabel
.delete()
.where(TagManifestLabel.repository == repo,
TagManifestLabel.annotated << manifests_to_delete)
.execute())
# Delete all the matching labels.
Label.delete().where(Label.id << label_ids).execute()
num_deleted_manifests = (TagManifest
.delete()
.where(TagManifest.id << manifests_to_delete)
.execute())
num_deleted_tags = (RepositoryTag
.delete()
.where(RepositoryTag.id << tags_to_delete)
.execute())
logger.debug('Removed %s tags with %s manifests', num_deleted_tags, num_deleted_manifests)
ancestors = reduce(lambda r, l: r | l,
(set(tag.image.ancestor_id_list()) for tag in tags_to_delete))
direct_referenced = {tag.image.id for tag in tags_to_delete}
return ancestors | direct_referenced
def _get_repo_tag_image(tag_name, include_storage, modifier): def _get_repo_tag_image(tag_name, include_storage, modifier):

View file

@ -144,18 +144,17 @@ class TestGarbageCollection(unittest.TestCase):
return repo return repo
def gcNow(self, repository): def gcNow(self, repository):
model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name) model.repository.garbage_collect_repo(repository)
def deleteTag(self, repository, tag, perform_gc=True): def deleteTag(self, repository, tag, perform_gc=True):
model.tag.delete_tag(repository.namespace_user.username, repository.name, tag) model.tag.delete_tag(repository.namespace_user.username, repository.name, tag)
if perform_gc: if perform_gc:
model.repository.garbage_collect_repository(repository.namespace_user.username, model.repository.garbage_collect_repo(repository)
repository.name)
def moveTag(self, repository, tag, docker_image_id): def moveTag(self, repository, tag, docker_image_id):
model.tag.create_or_update_tag(repository.namespace_user.username, repository.name, tag, model.tag.create_or_update_tag(repository.namespace_user.username, repository.name, tag,
docker_image_id) docker_image_id)
model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name) model.repository.garbage_collect_repo(repository)
def assertNotDeleted(self, repository, *args): def assertNotDeleted(self, repository, *args):
for docker_image_id in args: for docker_image_id in args:
@ -212,7 +211,7 @@ class TestGarbageCollection(unittest.TestCase):
self.assertEquals(REPO, repository.name) self.assertEquals(REPO, repository.name)
# GC the repository. # GC the repository.
model.repository.garbage_collect_repository(repository.namespace_user.username, repository.name) model.repository.garbage_collect_repo(repository)
# There should now be no repositories with garbage. # There should now be no repositories with garbage.
self.assertIsNone(model.repository.find_repository_with_garbage(0)) self.assertIsNone(model.repository.find_repository_with_garbage(0))

View file

@ -36,7 +36,8 @@ class TestManifests(unittest.TestCase):
def _perform_cleanup(self): def _perform_cleanup(self):
database.RepositoryTag.delete().where(database.RepositoryTag.hidden == True).execute() database.RepositoryTag.delete().where(database.RepositoryTag.hidden == True).execute()
model.repository.garbage_collect_repository(ADMIN_ACCESS_USER, REPO) repo_object = model.repository.get_repository(ADMIN_ACCESS_USER, REPO)
model.repository.garbage_collect_repo(repo_object)
def test_missing_link(self): def test_missing_link(self):
""" Tests for a corner case that could result in missing a link to a blob referenced by a """ Tests for a corner case that could result in missing a link to a blob referenced by a