Fix deadlocks with tags and garbage collection.
This commit is contained in:
parent
2baa7fa14c
commit
201943ed1c
4 changed files with 72 additions and 75 deletions
|
@ -139,7 +139,7 @@ def uuid_generator():
|
||||||
return str(uuid.uuid4())
|
return str(uuid.uuid4())
|
||||||
|
|
||||||
|
|
||||||
_get_epoch_timestamp = lambda: int(time.time())
|
get_epoch_timestamp = lambda: int(time.time())
|
||||||
|
|
||||||
|
|
||||||
def close_db_filter(_):
|
def close_db_filter(_):
|
||||||
|
@ -483,7 +483,7 @@ class RepositoryTag(BaseModel):
|
||||||
name = CharField()
|
name = CharField()
|
||||||
image = ForeignKeyField(Image)
|
image = ForeignKeyField(Image)
|
||||||
repository = ForeignKeyField(Repository)
|
repository = ForeignKeyField(Repository)
|
||||||
lifetime_start_ts = IntegerField(default=_get_epoch_timestamp)
|
lifetime_start_ts = IntegerField(default=get_epoch_timestamp)
|
||||||
lifetime_end_ts = IntegerField(null=True, index=True)
|
lifetime_end_ts = IntegerField(null=True, index=True)
|
||||||
hidden = BooleanField(default=False)
|
hidden = BooleanField(default=False)
|
||||||
|
|
||||||
|
@ -492,6 +492,9 @@ class RepositoryTag(BaseModel):
|
||||||
read_slaves = (read_slave,)
|
read_slaves = (read_slave,)
|
||||||
indexes = (
|
indexes = (
|
||||||
(('repository', 'name'), False),
|
(('repository', 'name'), False),
|
||||||
|
|
||||||
|
# This unique index prevents deadlocks when concurrently moving and deleting tags
|
||||||
|
(('repository', 'name', 'lifetime_end_ts'), True),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,7 @@ from data.database import (User, Repository, Image, AccessToken, Role, Repositor
|
||||||
DerivedImageStorage, ImageStorageTransformation, random_string_generator,
|
DerivedImageStorage, ImageStorageTransformation, random_string_generator,
|
||||||
db, BUILD_PHASE, QuayUserField, ImageStorageSignature, QueueItem,
|
db, BUILD_PHASE, QuayUserField, ImageStorageSignature, QueueItem,
|
||||||
ImageStorageSignatureKind, validate_database_url, db_for_update,
|
ImageStorageSignatureKind, validate_database_url, db_for_update,
|
||||||
AccessTokenKind, Star)
|
AccessTokenKind, Star, get_epoch_timestamp)
|
||||||
from peewee import JOIN_LEFT_OUTER, fn
|
from peewee import JOIN_LEFT_OUTER, fn
|
||||||
from util.validation import (validate_username, validate_email, validate_password,
|
from util.validation import (validate_username, validate_email, validate_password,
|
||||||
INVALID_PASSWORD_MESSAGE)
|
INVALID_PASSWORD_MESSAGE)
|
||||||
|
@ -1577,9 +1577,11 @@ def get_repository_images(namespace_name, repository_name):
|
||||||
return _get_repository_images_base(namespace_name, repository_name, lambda q: q)
|
return _get_repository_images_base(namespace_name, repository_name, lambda q: q)
|
||||||
|
|
||||||
|
|
||||||
def _tag_alive(query):
|
def _tag_alive(query, now_ts=None):
|
||||||
|
if now_ts is None:
|
||||||
|
now_ts = get_epoch_timestamp()
|
||||||
return query.where((RepositoryTag.lifetime_end_ts >> None) |
|
return query.where((RepositoryTag.lifetime_end_ts >> None) |
|
||||||
(RepositoryTag.lifetime_end_ts > int(time.time())))
|
(RepositoryTag.lifetime_end_ts > now_ts))
|
||||||
|
|
||||||
|
|
||||||
def list_repository_tags(namespace_name, repository_name, include_hidden=False,
|
def list_repository_tags(namespace_name, repository_name, include_hidden=False,
|
||||||
|
@ -1610,13 +1612,18 @@ def list_repository_tags(namespace_name, repository_name, include_hidden=False,
|
||||||
def _garbage_collect_tags(namespace_name, repository_name):
|
def _garbage_collect_tags(namespace_name, repository_name):
|
||||||
# We do this without using a join to prevent holding read locks on the repository table
|
# We do this without using a join to prevent holding read locks on the repository table
|
||||||
repo = _get_repository(namespace_name, repository_name)
|
repo = _get_repository(namespace_name, repository_name)
|
||||||
now = int(time.time())
|
expired_time = get_epoch_timestamp() - repo.namespace_user.removed_tag_expiration_s
|
||||||
|
|
||||||
(RepositoryTag
|
tags_to_delete = list(RepositoryTag
|
||||||
.delete()
|
.select(RepositoryTag.id)
|
||||||
.where(RepositoryTag.repository == repo,
|
.where(RepositoryTag.repository == repo,
|
||||||
~(RepositoryTag.lifetime_end_ts >> None),
|
~(RepositoryTag.lifetime_end_ts >> None),
|
||||||
(RepositoryTag.lifetime_end_ts + repo.namespace_user.removed_tag_expiration_s) <= now)
|
(RepositoryTag.lifetime_end_ts <= expired_time))
|
||||||
|
.order_by(RepositoryTag.id))
|
||||||
|
if len(tags_to_delete) > 0:
|
||||||
|
(RepositoryTag
|
||||||
|
.delete()
|
||||||
|
.where(RepositoryTag.id << tags_to_delete)
|
||||||
.execute())
|
.execute())
|
||||||
|
|
||||||
|
|
||||||
|
@ -1713,7 +1720,7 @@ def _garbage_collect_storage(storage_id_whitelist):
|
||||||
logger.debug('Garbage collecting storages from candidates: %s', storage_id_whitelist)
|
logger.debug('Garbage collecting storages from candidates: %s', storage_id_whitelist)
|
||||||
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
||||||
# Track all of the data that should be removed from blob storage
|
# Track all of the data that should be removed from blob storage
|
||||||
placements_to_remove = orphaned_storage_query(ImageStoragePlacement
|
placements_to_remove = list(orphaned_storage_query(ImageStoragePlacement
|
||||||
.select(ImageStoragePlacement,
|
.select(ImageStoragePlacement,
|
||||||
ImageStorage,
|
ImageStorage,
|
||||||
ImageStorageLocation)
|
ImageStorageLocation)
|
||||||
|
@ -1722,35 +1729,28 @@ def _garbage_collect_storage(storage_id_whitelist):
|
||||||
.join(ImageStorage),
|
.join(ImageStorage),
|
||||||
storage_id_whitelist,
|
storage_id_whitelist,
|
||||||
(ImageStorage, ImageStoragePlacement,
|
(ImageStorage, ImageStoragePlacement,
|
||||||
ImageStorageLocation))
|
ImageStorageLocation)))
|
||||||
|
|
||||||
paths_to_remove = placements_query_to_paths_set(placements_to_remove.clone())
|
paths_to_remove = placements_query_to_paths_set(placements_to_remove)
|
||||||
|
|
||||||
# Remove the placements for orphaned storages
|
# Remove the placements for orphaned storages
|
||||||
placements_subquery = (placements_to_remove
|
if len(placements_to_remove) > 0:
|
||||||
.clone()
|
placement_ids_to_remove = [placement.id for placement in placements_to_remove]
|
||||||
.select(ImageStoragePlacement.id)
|
|
||||||
.alias('ps'))
|
|
||||||
inner = (ImageStoragePlacement
|
|
||||||
.select(placements_subquery.c.id)
|
|
||||||
.from_(placements_subquery))
|
|
||||||
placements_removed = (ImageStoragePlacement
|
placements_removed = (ImageStoragePlacement
|
||||||
.delete()
|
.delete()
|
||||||
.where(ImageStoragePlacement.id << inner)
|
.where(ImageStoragePlacement.id << placement_ids_to_remove)
|
||||||
.execute())
|
.execute())
|
||||||
logger.debug('Removed %s image storage placements', placements_removed)
|
logger.debug('Removed %s image storage placements', placements_removed)
|
||||||
|
|
||||||
# Remove all orphaned storages
|
# Remove all orphaned storages
|
||||||
# The comma after ImageStorage.id is VERY important, it makes it a tuple, which is a sequence
|
# The comma after ImageStorage.id is VERY important, it makes it a tuple, which is a sequence
|
||||||
orphaned_storages = orphaned_storage_query(ImageStorage.select(ImageStorage.id),
|
orphaned_storages = list(orphaned_storage_query(ImageStorage.select(ImageStorage.id),
|
||||||
storage_id_whitelist,
|
storage_id_whitelist,
|
||||||
(ImageStorage.id,)).alias('osq')
|
(ImageStorage.id,)).alias('osq'))
|
||||||
orphaned_storage_inner = (ImageStorage
|
if len(orphaned_storages) > 0:
|
||||||
.select(orphaned_storages.c.id)
|
|
||||||
.from_(orphaned_storages))
|
|
||||||
storages_removed = (ImageStorage
|
storages_removed = (ImageStorage
|
||||||
.delete()
|
.delete()
|
||||||
.where(ImageStorage.id << orphaned_storage_inner)
|
.where(ImageStorage.id << orphaned_storages)
|
||||||
.execute())
|
.execute())
|
||||||
logger.debug('Removed %s image storage records', storages_removed)
|
logger.debug('Removed %s image storage records', storages_removed)
|
||||||
|
|
||||||
|
@ -1803,40 +1803,34 @@ def get_parent_images(namespace_name, repository_name, image_obj):
|
||||||
|
|
||||||
def create_or_update_tag(namespace_name, repository_name, tag_name,
|
def create_or_update_tag(namespace_name, repository_name, tag_name,
|
||||||
tag_docker_image_id):
|
tag_docker_image_id):
|
||||||
|
|
||||||
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
|
||||||
try:
|
try:
|
||||||
repo = _get_repository(namespace_name, repository_name)
|
repo = _get_repository(namespace_name, repository_name)
|
||||||
except Repository.DoesNotExist:
|
except Repository.DoesNotExist:
|
||||||
raise DataModelException('Invalid repository %s/%s' % (namespace_name, repository_name))
|
raise DataModelException('Invalid repository %s/%s' % (namespace_name, repository_name))
|
||||||
|
|
||||||
|
now_ts = get_epoch_timestamp()
|
||||||
|
|
||||||
|
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
||||||
|
try:
|
||||||
|
tag = db_for_update(_tag_alive(RepositoryTag
|
||||||
|
.select()
|
||||||
|
.where(RepositoryTag.repository == repo,
|
||||||
|
RepositoryTag.name == tag_name), now_ts)).get()
|
||||||
|
tag.lifetime_end_ts = now_ts
|
||||||
|
tag.save()
|
||||||
|
except RepositoryTag.DoesNotExist:
|
||||||
|
pass
|
||||||
|
|
||||||
try:
|
try:
|
||||||
image = Image.get(Image.docker_image_id == tag_docker_image_id, Image.repository == repo)
|
image = Image.get(Image.docker_image_id == tag_docker_image_id, Image.repository == repo)
|
||||||
except Image.DoesNotExist:
|
except Image.DoesNotExist:
|
||||||
raise DataModelException('Invalid image with id: %s' % tag_docker_image_id)
|
raise DataModelException('Invalid image with id: %s' % tag_docker_image_id)
|
||||||
|
|
||||||
now_ts = int(time.time())
|
return RepositoryTag.create(repository=repo, image=image, name=tag_name,
|
||||||
|
|
||||||
created = RepositoryTag.create(repository=repo, image=image, name=tag_name,
|
|
||||||
lifetime_start_ts=now_ts)
|
lifetime_start_ts=now_ts)
|
||||||
|
|
||||||
try:
|
|
||||||
# When we move a tag, we really end the timeline of the old one and create a new one
|
|
||||||
query = _tag_alive(RepositoryTag
|
|
||||||
.select()
|
|
||||||
.where(RepositoryTag.repository == repo, RepositoryTag.name == tag_name,
|
|
||||||
RepositoryTag.id != created.id))
|
|
||||||
tag = query.get()
|
|
||||||
tag.lifetime_end_ts = now_ts
|
|
||||||
tag.save()
|
|
||||||
except RepositoryTag.DoesNotExist:
|
|
||||||
# No tag that needs to be ended
|
|
||||||
pass
|
|
||||||
|
|
||||||
return created
|
|
||||||
|
|
||||||
|
|
||||||
def delete_tag(namespace_name, repository_name, tag_name):
|
def delete_tag(namespace_name, repository_name, tag_name):
|
||||||
|
now_ts = get_epoch_timestamp()
|
||||||
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
with config.app_config['DB_TRANSACTION_FACTORY'](db):
|
||||||
try:
|
try:
|
||||||
query = _tag_alive(RepositoryTag
|
query = _tag_alive(RepositoryTag
|
||||||
|
@ -1845,21 +1839,21 @@ def delete_tag(namespace_name, repository_name, tag_name):
|
||||||
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
||||||
.where(Repository.name == repository_name,
|
.where(Repository.name == repository_name,
|
||||||
Namespace.username == namespace_name,
|
Namespace.username == namespace_name,
|
||||||
RepositoryTag.name == tag_name))
|
RepositoryTag.name == tag_name), now_ts)
|
||||||
found = db_for_update(query).get()
|
found = db_for_update(query).get()
|
||||||
except RepositoryTag.DoesNotExist:
|
except RepositoryTag.DoesNotExist:
|
||||||
msg = ('Invalid repository tag \'%s\' on repository \'%s/%s\'' %
|
msg = ('Invalid repository tag \'%s\' on repository \'%s/%s\'' %
|
||||||
(tag_name, namespace_name, repository_name))
|
(tag_name, namespace_name, repository_name))
|
||||||
raise DataModelException(msg)
|
raise DataModelException(msg)
|
||||||
|
|
||||||
found.lifetime_end_ts = int(time.time())
|
found.lifetime_end_ts = now_ts
|
||||||
found.save()
|
found.save()
|
||||||
|
|
||||||
|
|
||||||
def create_temporary_hidden_tag(repo, image, expiration_s):
|
def create_temporary_hidden_tag(repo, image, expiration_s):
|
||||||
""" Create a tag with a defined timeline, that will not appear in the UI or CLI. Returns the name
|
""" Create a tag with a defined timeline, that will not appear in the UI or CLI. Returns the name
|
||||||
of the temporary tag. """
|
of the temporary tag. """
|
||||||
now_ts = int(time.time())
|
now_ts = get_epoch_timestamp()
|
||||||
expire_ts = now_ts + expiration_s
|
expire_ts = now_ts + expiration_s
|
||||||
tag_name = str(uuid4())
|
tag_name = str(uuid4())
|
||||||
RepositoryTag.create(repository=repo, image=image, name=tag_name, lifetime_start_ts=now_ts,
|
RepositoryTag.create(repository=repo, image=image, name=tag_name, lifetime_start_ts=now_ts,
|
||||||
|
|
|
@ -103,7 +103,7 @@ def __create_subtree(repo, structure, creator_username, parent):
|
||||||
new_image.docker_image_id)
|
new_image.docker_image_id)
|
||||||
|
|
||||||
if tag_name[0] == '#':
|
if tag_name[0] == '#':
|
||||||
tag.lifetime_end_ts = int(time.time()) - 1
|
tag.lifetime_end_ts = get_epoch_timestamp() - 1
|
||||||
tag.save()
|
tag.save()
|
||||||
|
|
||||||
for subtree in subtrees:
|
for subtree in subtrees:
|
||||||
|
|
|
@ -14,7 +14,7 @@ up_mysql() {
|
||||||
|
|
||||||
down_mysql() {
|
down_mysql() {
|
||||||
docker kill mysql
|
docker kill mysql
|
||||||
docker rm mysql
|
docker rm -v mysql
|
||||||
}
|
}
|
||||||
|
|
||||||
up_postgres() {
|
up_postgres() {
|
||||||
|
@ -31,7 +31,7 @@ up_postgres() {
|
||||||
|
|
||||||
down_postgres() {
|
down_postgres() {
|
||||||
docker kill postgres
|
docker kill postgres
|
||||||
docker rm postgres
|
docker rm -v postgres
|
||||||
}
|
}
|
||||||
|
|
||||||
run_tests() {
|
run_tests() {
|
||||||
|
|
Reference in a new issue