Make namespace deletion asynchronous

Instead of deleting a namespace synchronously as before, we now mark the namespace for deletion, disable it, and rename it. A worker then comes along and deletes the namespace in the background. This results in a *significantly* better user experience, as the namespace deletion operation now "completes" in under a second, where before it could take 10s of minutes at the worse.

Fixes https://jira.coreos.com/browse/QUAY-838
This commit is contained in:
Joseph Schorr 2018-02-23 16:45:16 -05:00
parent d9015a1863
commit 8bc55a5676
21 changed files with 244 additions and 129 deletions

7
app.py
View file

@ -228,8 +228,13 @@ secscan_notification_queue = WorkQueue(app.config['SECSCAN_NOTIFICATION_QUEUE_NA
has_namespace=False,
metric_queue=metric_queue)
# Note: We set `has_namespace` to `False` here, as we explicitly want this queue to not be emptied
# when a namespace is marked for deletion.
namespace_gc_queue = WorkQueue(app.config['NAMESPACE_GC_QUEUE_NAME'], tf, has_namespace=False,
metric_queue=metric_queue)
all_queues = [image_replication_queue, dockerfile_build_queue, notification_queue,
secscan_notification_queue, chunk_cleanup_queue]
secscan_notification_queue, chunk_cleanup_queue, namespace_gc_queue]
secscan_api = SecurityScannerAPI(app, app.config, storage)
tuf_metadata_api = TUFMetadataAPI(app, app.config)

View file

@ -0,0 +1,4 @@
#!/bin/sh
# Start the logger
exec logger -i -t namespacegcworker

View file

@ -0,0 +1,9 @@
#! /bin/bash
echo 'Starting Namespace GC worker'
QUAYPATH=${QUAYPATH:-"."}
cd ${QUAYDIR:-"/"}
PYTHONPATH=$QUAYPATH venv/bin/python -m workers.namespacegcworker 2>&1
echo 'Namespace GC exited'

View file

@ -171,6 +171,7 @@ class DefaultConfig(ImmutableConfig):
REPLICATION_QUEUE_NAME = 'imagestoragereplication'
SECSCAN_NOTIFICATION_QUEUE_NAME = 'security_notification'
CHUNK_CLEANUP_QUEUE_NAME = 'chunk_cleanup'
NAMESPACE_GC_QUEUE_NAME = 'namespacegc'
# Super user config. Note: This MUST BE an empty list for the default config.
SUPER_USERS = []

View file

@ -470,13 +470,21 @@ class User(BaseModel):
RepositoryNotification, OAuthAuthorizationCode,
RepositoryActionCount, TagManifestLabel, Tag,
ManifestLabel, BlobUploading, TeamSync,
RepositorySearchScore} | beta_classes
RepositorySearchScore, DeletedNamespace} | beta_classes
delete_instance_filtered(self, User, delete_nullable, skip_transitive_deletes)
Namespace = User.alias()
class DeletedNamespace(BaseModel):
namespace = QuayUserField(index=True, allows_robots=False, unique=True)
marked = DateTimeField(default=datetime.now)
original_username = CharField(index=True)
original_email = CharField(index=True)
queue_id = CharField(null=True, index=True)
class UserPromptTypes(object):
CONFIRM_USERNAME = 'confirm_username'
ENTER_NAME = 'enter_name'

View file

@ -0,0 +1,39 @@
"""Add deleted namespace table
Revision ID: b4c2d45bc132
Revises: 152edccba18c
Create Date: 2018-02-27 11:43:02.329941
"""
# revision identifiers, used by Alembic.
revision = 'b4c2d45bc132'
down_revision = '152edccba18c'
from alembic import op
import sqlalchemy as sa
def upgrade(tables):
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('deletednamespace',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('namespace_id', sa.Integer(), nullable=False),
sa.Column('marked', sa.DateTime(), nullable=False),
sa.Column('original_username', sa.String(length=255), nullable=False),
sa.Column('original_email', sa.String(length=255), nullable=False),
sa.Column('queue_id', sa.String(length=255), nullable=True),
sa.ForeignKeyConstraint(['namespace_id'], ['user.id'], name=op.f('fk_deletednamespace_namespace_id_user')),
sa.PrimaryKeyConstraint('id', name=op.f('pk_deletednamespace'))
)
op.create_index('deletednamespace_namespace_id', 'deletednamespace', ['namespace_id'], unique=True)
op.create_index('deletednamespace_original_email', 'deletednamespace', ['original_email'], unique=False)
op.create_index('deletednamespace_original_username', 'deletednamespace', ['original_username'], unique=False)
op.create_index('deletednamespace_queue_id', 'deletednamespace', ['queue_id'], unique=False)
# ### end Alembic commands ###
def downgrade(tables):
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('deletednamespace')
# ### end Alembic commands ###

View file

@ -4,9 +4,11 @@ import pytest
from mock import patch
from data.database import EmailConfirmation
from data.database import EmailConfirmation, User, DeletedNamespace
from data.model.user import create_user_noverify, validate_reset_code, get_active_users
from data.model.user import mark_namespace_for_deletion, delete_namespace_via_marker
from util.timedeltastring import convert_to_timedelta
from data.queue import WorkQueue
from test.fixtures import *
def test_create_user_with_expiration(initialized_db):
@ -38,3 +40,46 @@ def test_get_active_users(disabled, initialized_db):
for user in users:
if not disabled:
assert user.enabled
def test_mark_namespace_for_deletion(initialized_db):
def create_transaction(db):
return db.transaction()
# Create a user and then mark it for deletion.
user = create_user_noverify('foobar', 'foo@example.com', email_required=False)
# Mark the user for deletion.
queue = WorkQueue('testgcnamespace', create_transaction)
mark_namespace_for_deletion(user, [], queue)
# Ensure the older user is still in the DB.
assert User.get(id=user.id).username != 'foobar'
# Ensure we can create a user with the same namespace again.
new_user = create_user_noverify('foobar', 'foo@example.com', email_required=False)
assert new_user.id != user.id
# Ensure the older user is still in the DB.
assert User.get(id=user.id).username != 'foobar'
def test_delete_namespace_via_marker(initialized_db):
def create_transaction(db):
return db.transaction()
# Create a user and then mark it for deletion.
user = create_user_noverify('foobar', 'foo@example.com', email_required=False)
# Mark the user for deletion.
queue = WorkQueue('testgcnamespace', create_transaction)
marker_id = mark_namespace_for_deletion(user, [], queue)
# Delete the user.
delete_namespace_via_marker(marker_id, [])
# Ensure the user was actually deleted.
with pytest.raises(User.DoesNotExist):
User.get(id=user.id)
with pytest.raises(DeletedNamespace.DoesNotExist):
DeletedNamespace.get(id=marker_id)

View file

@ -14,7 +14,7 @@ from data.database import (User, LoginService, FederatedLogin, RepositoryPermiss
EmailConfirmation, Role, db_for_update, random_string_generator,
UserRegion, ImageStorageLocation,
ServiceKeyApproval, OAuthApplication, RepositoryBuildTrigger,
UserPromptKind, UserPrompt, UserPromptTypes)
UserPromptKind, UserPrompt, UserPromptTypes, DeletedNamespace)
from data.model import (DataModelException, InvalidPasswordException, InvalidRobotException,
InvalidUsernameException, InvalidEmailAddressException,
TooManyLoginAttemptsException, db_transaction,
@ -835,7 +835,14 @@ def get_solely_admined_organizations(user_obj):
return solely_admined
def delete_user(user, queues, force=False):
def mark_namespace_for_deletion(user, queues, namespace_gc_queue, force=False):
""" Marks a namespace (as referenced by the given user) for deletion. A queue item will be added
to delete the namespace's repositories and storage, while the namespace itself will be
renamed, disabled, and delinked from other tables.
"""
if not user.enabled:
return None
if not force and not user.organization:
# Ensure that the user is not the sole admin for any organizations. If so, then the user
# cannot be deleted before those organizations are deleted or reassigned.
@ -854,10 +861,66 @@ def delete_user(user, queues, force=False):
for queue in queues:
queue.delete_namespaced_items(user.username)
# Delete non-repository related items. This operation is very quick, so we can do so here.
_delete_user_linked_data(user)
with db_transaction():
original_username = user.username
user = db_for_update(User.select().where(User.id == user.id)).get()
# Mark the namespace as deleted and ready for GC.
try:
marker = DeletedNamespace.create(namespace=user,
original_username=original_username,
original_email=user.email)
except IntegrityError:
return
# Disable the namespace itself, and replace its various unique fields with UUIDs.
user.enabled = False
user.username = str(uuid4())
user.email = str(uuid4())
user.save()
# Add a queueitem to delete the namespace.
marker.queue_id = namespace_gc_queue.put([str(user.id)], json.dumps({
'marker_id': marker.id,
'original_username': original_username,
}))
marker.save()
return marker.id
def delete_namespace_via_marker(marker_id, queues):
""" Deletes a namespace referenced by the given DeletedNamespace marker ID. """
try:
marker = DeletedNamespace.get(id=marker_id)
except DeletedNamespace.DoesNotExist:
return
delete_user(marker.namespace, queues)
def delete_user(user, queues):
""" Deletes a user/organization/robot. Should *not* be called by any user-facing API. Instead,
mark_namespace_for_deletion should be used, and the queue should call this method.
"""
# Delete all queue items for the user.
for queue in queues:
queue.delete_namespaced_items(user.username)
# Delete any repositories under the user's namespace.
for repo in list(Repository.select().where(Repository.namespace_user == user)):
repository.purge_repository(user.username, repo.name)
# Delete non-repository related items.
_delete_user_linked_data(user)
# Delete the user itself.
user.delete_instance(recursive=True, delete_nullable=True)
def _delete_user_linked_data(user):
if user.organization:
# Delete the organization's teams.
for team in Team.select().where(Team.organization == user):
@ -879,9 +942,6 @@ def delete_user(user, queues, force=False):
# falling and only occurs if a superuser is being deleted.
ServiceKeyApproval.update(approver=None).where(ServiceKeyApproval.approver == user).execute()
# Delete the user itself.
user.delete_instance(recursive=True, delete_nullable=True)
def get_pull_credentials(robotname):
try:

View file

@ -6,7 +6,7 @@ from flask import request
import features
from app import billing as stripe, avatar, all_queues, authentication
from app import billing as stripe, avatar, all_queues, authentication, namespace_gc_queue
from endpoints.api import (resource, nickname, ApiResource, validate_json_request, request_error,
related_user_resource, internal_only, require_user_admin, log_action,
show_if, path_param, require_scope, require_fresh_login)
@ -217,7 +217,7 @@ class Organization(ApiResource):
except model.InvalidOrganizationException:
raise NotFound()
model.user.delete_user(org, all_queues)
model.user.mark_namespace_for_deletion(org, all_queues, namespace_gc_queue)
return '', 204
raise Unauthorized()

View file

@ -412,7 +412,7 @@ class SuperUserManagement(ApiResource):
if superusers.is_superuser(username):
raise InvalidRequest('Cannot delete a superuser')
pre_oci_model.delete_user(username)
pre_oci_model.mark_user_for_deletion(username)
return '', 204
raise Unauthorized()
@ -540,7 +540,7 @@ class SuperUserOrganizationManagement(ApiResource):
def delete(self, name):
""" Deletes the specified organization. """
if SuperUserPermission().can():
pre_oci_model.delete_organization(name)
pre_oci_model.mark_organization_for_deletion(name)
return '', 204
raise Unauthorized()

View file

@ -353,7 +353,7 @@ class SuperuserDataInterface(object):
"""
@abstractmethod
def delete_user(self, username):
def mark_user_for_deletion(self, username):
"""
Returns None
"""
@ -383,7 +383,7 @@ class SuperuserDataInterface(object):
"""
@abstractmethod
def delete_organization(self, name):
def mark_organization_for_deletion(self, name):
"""
Returns None
"""

View file

@ -2,7 +2,7 @@ import features
from flask import request
from app import all_queues, userfiles
from app import all_queues, userfiles, namespace_gc_queue
from auth.permissions import ReadRepositoryPermission, ModifyRepositoryPermission, AdministerRepositoryPermission
from data import model, database
from endpoints.api.build import get_job_config, _get_build_status
@ -141,9 +141,9 @@ class PreOCIModel(SuperuserDataInterface):
return Organization(org.username, org.email)
def delete_organization(self, name):
def mark_organization_for_deletion(self, name):
org = model.organization.get_organization(name)
model.user.delete_user(org, all_queues)
model.user.mark_namespace_for_deletion(org, all_queues, namespace_gc_queue, force=True)
def take_ownership(self, namespace, authed_user):
entity = model.user.get_user_or_org(namespace)
@ -172,9 +172,9 @@ class PreOCIModel(SuperuserDataInterface):
user = model.user.get_nonrobot_user(username)
model.user.change_password(user, password)
def delete_user(self, username):
def mark_user_for_deletion(self, username):
user = model.user.get_nonrobot_user(username)
model.user.delete_user(user, all_queues, force=True)
model.user.mark_namespace_for_deletion(user, all_queues, namespace_gc_queue, force=True)
def create_reset_password_email_code(self, email):
code = model.user.create_reset_password_email_code(email)

View file

@ -12,7 +12,7 @@ from peewee import IntegrityError
import features
from app import (app, billing as stripe, authentication, avatar, user_analytics, all_queues,
oauth_login)
oauth_login, namespace_gc_queue)
from auth import scopes
from auth.auth_context import get_authenticated_user
@ -485,7 +485,7 @@ class User(ApiResource):
if app.config['AUTHENTICATION_TYPE'] != 'Database':
abort(404)
model.user.delete_user(get_authenticated_user(), all_queues)
model.user.mark_namespace_for_deletion(get_authenticated_user(), all_queues, namespace_gc_queue)
return '', 204

View file

@ -20,7 +20,8 @@ from data.database import (db, all_models, beta_classes, Role, TeamRole, Visibil
ExternalNotificationEvent, ExternalNotificationMethod, NotificationKind,
QuayRegion, QuayService, UserRegion, OAuthAuthorizationCode,
ServiceKeyApprovalType, MediaType, LabelSourceType, UserPromptKind,
RepositoryKind, TagKind, BlobPlacementLocation, User)
RepositoryKind, TagKind, BlobPlacementLocation, User,
DeletedNamespace)
from data import model
from data.queue import WorkQueue
from app import app, storage as store, tf
@ -893,8 +894,11 @@ def populate_database(minimal=False, with_storage=False):
model.repositoryactioncount.update_repository_score(to_count)
WHITELISTED_EMPTY_MODELS = ['DeletedNamespace']
def find_models_missing_data():
# As a sanity check we are going to make sure that all db tables have some data
# As a sanity check we are going to make sure that all db tables have some data, unless explicitly
# whitelisted.
models_missing_data = set()
for one_model in all_models:
if one_model in beta_classes:
@ -903,6 +907,7 @@ def find_models_missing_data():
try:
one_model.select().get()
except one_model.DoesNotExist:
if one_model.__name__ not in WHITELISTED_EMPTY_MODELS:
models_missing_data.add(one_model.__name__)
return models_missing_data

View file

@ -189,9 +189,6 @@ function(ApiService, CookieService, $rootScope, Config, $location, $timeout) {
}
var errorDisplay = ApiService.errorDisplay('Could not delete namespace', callback);
var deleteNamespaceItself = function() {
info.progress = 1;
info.progressMessage = 'Deleting namespace...';
var cb = function(resp) {
userService.load(function(currentUser) {
callback(true);
@ -209,41 +206,6 @@ function(ApiService, CookieService, $rootScope, Config, $location, $timeout) {
}
};
var repoIndex = 0;
var repositories = null;
var deleteAllRepos = function() {
if (repoIndex >= repositories.length) {
deleteNamespaceItself();
return;
}
var repoParams = {
'repository': namespace + '/' + repositories[repoIndex]['name']
};
info.progress = repoIndex / repositories.length;
info.progressMessage = 'Deleting repository ' + repoParams['repository'] + '...';
ApiService.deleteRepository(null, repoParams).then(function() {
repoIndex++;
deleteAllRepos();
}, errorDisplay);
};
// First delete each repo for the namespace, updating the info so it can show a progress bar.
// This is not strictly necessary (as the namespace delete call will do it as well), but it is
// a better user experience.
var params = {
'namespace': namespace,
'public': false
};
ApiService.listRepos(null, params).then(function(resp) {
repositories = resp['repositories'];
deleteAllRepos();
}, errorDisplay);
};
userService.currentUser = function() {
return userResponse;
};

Binary file not shown.

View file

@ -4013,7 +4013,7 @@ class TestSuperUserCreateInitialSuperUser(ApiTestCase):
# Delete all users in the DB.
for user in list(database.User.select()):
model.user.delete_user(user, all_queues, force=True)
model.user.delete_user(user, all_queues)
# Create the superuser.
self.postJsonResponse(SuperUserCreateInitialSuperUser, data=data)

View file

@ -95,7 +95,7 @@ class TestSuperUserCreateInitialSuperUser(ApiTestCase):
# Delete all the users in the DB.
for user in list(User.select()):
model.user.delete_user(user, all_queues, force=True)
model.user.delete_user(user, all_queues)
# This method should now succeed.
data = dict(username='cooluser', password='password', email='fake@example.com')

View file

@ -1,52 +0,0 @@
import logging
import sys
from app import app
from data import model
from data.database import Repository, User
logger = logging.getLogger(__name__)
def delete_summary(username):
found = User.get(User.username == username)
dependencies = found.dependencies(search_nullable=False)
counts = {}
for expression, field in dependencies:
use_count = field.model_class.select().where(expression).count()
if use_count > 0:
counts[field.model_class.__name__] = use_count
return counts
def delete_username(username):
# First delete the repositories owned by this user
found = User.get(User.username == username)
for repository in list(Repository.select().where(Repository.namespace_user == found)):
print 'Deleting repository: {0}/{1}'.format(found.username, repository.name)
model.repository.purge_repository(found.username, repository.name)
# Now recursively delete user, which will clean up all non-nullable referencing fields
print 'Deleting user: {0}'.format(username)
found.delete_instance(recursive=True)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
summary = delete_summary(sys.argv[1]).items()
if summary:
print 'Will delete:'
for model_name, count in summary:
print '{0}: {1}'.format(model_name, count)
else:
print 'Nothing references user.'
print 'Delete? [y/n]'
delete_yn = raw_input().lower().strip()
if delete_yn == 'y':
delete_username(sys.argv[1])
else:
print 'Aborted!'

View file

@ -15,6 +15,7 @@ INTERNAL_ONLY_PROPERTIES = {
'SECSCAN_NOTIFICATION_QUEUE_NAME',
'SECURITY_SCANNER_ISSUER_NAME',
'NOTIFICATION_QUEUE_NAME',
'NAMESPACE_GC_QUEUE_NAME',
'FEATURE_BILLING',
'FEATURE_SUPPORT_CHAT',

View file

@ -0,0 +1,28 @@
import logging
import time
from app import app, namespace_gc_queue, all_queues
from data import model
from workers.queueworker import QueueWorker, JobException
from util.log import logfile_path
logger = logging.getLogger(__name__)
POLL_PERIOD_SECONDS = 60
class NamespaceGCWorker(QueueWorker):
""" Worker which cleans up namespaces enqueued to be GCed.
"""
def process_queue_item(self, job_details):
logger.debug('Got namespace GC queue item: %s', job_details)
marker_id = job_details['marker_id']
model.user.delete_namespace_via_marker(marker_id, all_queues)
if __name__ == "__main__":
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
logger.debug('Starting namespace GC worker')
worker = NamespaceGCWorker(namespace_gc_queue, poll_period_seconds=POLL_PERIOD_SECONDS)
worker.start()