From 201943ed1c1339c67ca4cc905719e88cbaee4feb Mon Sep 17 00:00:00 2001 From: Jake Moshenko Date: Fri, 20 Mar 2015 23:21:20 -0400 Subject: [PATCH] Fix deadlocks with tags and garbage collection. --- data/database.py | 7 ++- data/model/legacy.py | 134 +++++++++++++++++++++---------------------- initdb.py | 2 +- test/fulldbtest.sh | 4 +- 4 files changed, 72 insertions(+), 75 deletions(-) diff --git a/data/database.py b/data/database.py index 2ddd2fada..e12c1e147 100644 --- a/data/database.py +++ b/data/database.py @@ -139,7 +139,7 @@ def uuid_generator(): return str(uuid.uuid4()) -_get_epoch_timestamp = lambda: int(time.time()) +get_epoch_timestamp = lambda: int(time.time()) def close_db_filter(_): @@ -483,7 +483,7 @@ class RepositoryTag(BaseModel): name = CharField() image = ForeignKeyField(Image) repository = ForeignKeyField(Repository) - lifetime_start_ts = IntegerField(default=_get_epoch_timestamp) + lifetime_start_ts = IntegerField(default=get_epoch_timestamp) lifetime_end_ts = IntegerField(null=True, index=True) hidden = BooleanField(default=False) @@ -492,6 +492,9 @@ class RepositoryTag(BaseModel): read_slaves = (read_slave,) indexes = ( (('repository', 'name'), False), + + # This unique index prevents deadlocks when concurrently moving and deleting tags + (('repository', 'name', 'lifetime_end_ts'), True), ) diff --git a/data/model/legacy.py b/data/model/legacy.py index 2d076c5cc..aa968408c 100644 --- a/data/model/legacy.py +++ b/data/model/legacy.py @@ -18,7 +18,7 @@ from data.database import (User, Repository, Image, AccessToken, Role, Repositor DerivedImageStorage, ImageStorageTransformation, random_string_generator, db, BUILD_PHASE, QuayUserField, ImageStorageSignature, QueueItem, ImageStorageSignatureKind, validate_database_url, db_for_update, - AccessTokenKind, Star) + AccessTokenKind, Star, get_epoch_timestamp) from peewee import JOIN_LEFT_OUTER, fn from util.validation import (validate_username, validate_email, validate_password, INVALID_PASSWORD_MESSAGE) @@ -1577,9 +1577,11 @@ def get_repository_images(namespace_name, repository_name): return _get_repository_images_base(namespace_name, repository_name, lambda q: q) -def _tag_alive(query): +def _tag_alive(query, now_ts=None): + if now_ts is None: + now_ts = get_epoch_timestamp() return query.where((RepositoryTag.lifetime_end_ts >> None) | - (RepositoryTag.lifetime_end_ts > int(time.time()))) + (RepositoryTag.lifetime_end_ts > now_ts)) def list_repository_tags(namespace_name, repository_name, include_hidden=False, @@ -1610,14 +1612,19 @@ def list_repository_tags(namespace_name, repository_name, include_hidden=False, def _garbage_collect_tags(namespace_name, repository_name): # We do this without using a join to prevent holding read locks on the repository table repo = _get_repository(namespace_name, repository_name) - now = int(time.time()) + expired_time = get_epoch_timestamp() - repo.namespace_user.removed_tag_expiration_s - (RepositoryTag - .delete() - .where(RepositoryTag.repository == repo, - ~(RepositoryTag.lifetime_end_ts >> None), - (RepositoryTag.lifetime_end_ts + repo.namespace_user.removed_tag_expiration_s) <= now) - .execute()) + tags_to_delete = list(RepositoryTag + .select(RepositoryTag.id) + .where(RepositoryTag.repository == repo, + ~(RepositoryTag.lifetime_end_ts >> None), + (RepositoryTag.lifetime_end_ts <= expired_time)) + .order_by(RepositoryTag.id)) + if len(tags_to_delete) > 0: + (RepositoryTag + .delete() + .where(RepositoryTag.id << tags_to_delete) + .execute()) def garbage_collect_repository(namespace_name, repository_name): @@ -1713,46 +1720,39 @@ def _garbage_collect_storage(storage_id_whitelist): logger.debug('Garbage collecting storages from candidates: %s', storage_id_whitelist) with config.app_config['DB_TRANSACTION_FACTORY'](db): # Track all of the data that should be removed from blob storage - placements_to_remove = orphaned_storage_query(ImageStoragePlacement - .select(ImageStoragePlacement, - ImageStorage, - ImageStorageLocation) - .join(ImageStorageLocation) - .switch(ImageStoragePlacement) - .join(ImageStorage), - storage_id_whitelist, - (ImageStorage, ImageStoragePlacement, - ImageStorageLocation)) + placements_to_remove = list(orphaned_storage_query(ImageStoragePlacement + .select(ImageStoragePlacement, + ImageStorage, + ImageStorageLocation) + .join(ImageStorageLocation) + .switch(ImageStoragePlacement) + .join(ImageStorage), + storage_id_whitelist, + (ImageStorage, ImageStoragePlacement, + ImageStorageLocation))) - paths_to_remove = placements_query_to_paths_set(placements_to_remove.clone()) + paths_to_remove = placements_query_to_paths_set(placements_to_remove) # Remove the placements for orphaned storages - placements_subquery = (placements_to_remove - .clone() - .select(ImageStoragePlacement.id) - .alias('ps')) - inner = (ImageStoragePlacement - .select(placements_subquery.c.id) - .from_(placements_subquery)) - placements_removed = (ImageStoragePlacement - .delete() - .where(ImageStoragePlacement.id << inner) - .execute()) - logger.debug('Removed %s image storage placements', placements_removed) + if len(placements_to_remove) > 0: + placement_ids_to_remove = [placement.id for placement in placements_to_remove] + placements_removed = (ImageStoragePlacement + .delete() + .where(ImageStoragePlacement.id << placement_ids_to_remove) + .execute()) + logger.debug('Removed %s image storage placements', placements_removed) # Remove all orphaned storages # The comma after ImageStorage.id is VERY important, it makes it a tuple, which is a sequence - orphaned_storages = orphaned_storage_query(ImageStorage.select(ImageStorage.id), - storage_id_whitelist, - (ImageStorage.id,)).alias('osq') - orphaned_storage_inner = (ImageStorage - .select(orphaned_storages.c.id) - .from_(orphaned_storages)) - storages_removed = (ImageStorage - .delete() - .where(ImageStorage.id << orphaned_storage_inner) - .execute()) - logger.debug('Removed %s image storage records', storages_removed) + orphaned_storages = list(orphaned_storage_query(ImageStorage.select(ImageStorage.id), + storage_id_whitelist, + (ImageStorage.id,)).alias('osq')) + if len(orphaned_storages) > 0: + storages_removed = (ImageStorage + .delete() + .where(ImageStorage.id << orphaned_storages) + .execute()) + logger.debug('Removed %s image storage records', storages_removed) # We are going to make the conscious decision to not delete image storage blobs inside # transactions. @@ -1803,40 +1803,34 @@ def get_parent_images(namespace_name, repository_name, image_obj): def create_or_update_tag(namespace_name, repository_name, tag_name, tag_docker_image_id): + try: + repo = _get_repository(namespace_name, repository_name) + except Repository.DoesNotExist: + raise DataModelException('Invalid repository %s/%s' % (namespace_name, repository_name)) + + now_ts = get_epoch_timestamp() with config.app_config['DB_TRANSACTION_FACTORY'](db): try: - repo = _get_repository(namespace_name, repository_name) - except Repository.DoesNotExist: - raise DataModelException('Invalid repository %s/%s' % (namespace_name, repository_name)) + tag = db_for_update(_tag_alive(RepositoryTag + .select() + .where(RepositoryTag.repository == repo, + RepositoryTag.name == tag_name), now_ts)).get() + tag.lifetime_end_ts = now_ts + tag.save() + except RepositoryTag.DoesNotExist: + pass try: image = Image.get(Image.docker_image_id == tag_docker_image_id, Image.repository == repo) except Image.DoesNotExist: raise DataModelException('Invalid image with id: %s' % tag_docker_image_id) - now_ts = int(time.time()) - - created = RepositoryTag.create(repository=repo, image=image, name=tag_name, - lifetime_start_ts=now_ts) - - try: - # When we move a tag, we really end the timeline of the old one and create a new one - query = _tag_alive(RepositoryTag - .select() - .where(RepositoryTag.repository == repo, RepositoryTag.name == tag_name, - RepositoryTag.id != created.id)) - tag = query.get() - tag.lifetime_end_ts = now_ts - tag.save() - except RepositoryTag.DoesNotExist: - # No tag that needs to be ended - pass - - return created - + return RepositoryTag.create(repository=repo, image=image, name=tag_name, + lifetime_start_ts=now_ts) def delete_tag(namespace_name, repository_name, tag_name): + now_ts = get_epoch_timestamp() with config.app_config['DB_TRANSACTION_FACTORY'](db): try: query = _tag_alive(RepositoryTag @@ -1845,21 +1839,21 @@ def delete_tag(namespace_name, repository_name, tag_name): .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .where(Repository.name == repository_name, Namespace.username == namespace_name, - RepositoryTag.name == tag_name)) + RepositoryTag.name == tag_name), now_ts) found = db_for_update(query).get() except RepositoryTag.DoesNotExist: msg = ('Invalid repository tag \'%s\' on repository \'%s/%s\'' % (tag_name, namespace_name, repository_name)) raise DataModelException(msg) - found.lifetime_end_ts = int(time.time()) + found.lifetime_end_ts = now_ts found.save() def create_temporary_hidden_tag(repo, image, expiration_s): """ Create a tag with a defined timeline, that will not appear in the UI or CLI. Returns the name of the temporary tag. """ - now_ts = int(time.time()) + now_ts = get_epoch_timestamp() expire_ts = now_ts + expiration_s tag_name = str(uuid4()) RepositoryTag.create(repository=repo, image=image, name=tag_name, lifetime_start_ts=now_ts, diff --git a/initdb.py b/initdb.py index f35b77111..104e0fc19 100644 --- a/initdb.py +++ b/initdb.py @@ -103,7 +103,7 @@ def __create_subtree(repo, structure, creator_username, parent): new_image.docker_image_id) if tag_name[0] == '#': - tag.lifetime_end_ts = int(time.time()) - 1 + tag.lifetime_end_ts = get_epoch_timestamp() - 1 tag.save() for subtree in subtrees: diff --git a/test/fulldbtest.sh b/test/fulldbtest.sh index d3fa0caa7..975f3ddeb 100755 --- a/test/fulldbtest.sh +++ b/test/fulldbtest.sh @@ -14,7 +14,7 @@ up_mysql() { down_mysql() { docker kill mysql - docker rm mysql + docker rm -v mysql } up_postgres() { @@ -31,7 +31,7 @@ up_postgres() { down_postgres() { docker kill postgres - docker rm postgres + docker rm -v postgres } run_tests() {