Change repo filtering for users to use a user ID reference, rather than the username
While this means we need an additional query for initial lookup, it makes the *filtering* query (which is the heavy part) require far fewer joins, thus making it more efficient. Also adds a new unit test to verify that our filter filters to the correct set of repositories.
This commit is contained in:
parent
f2b9aa4527
commit
7604e9842b
7 changed files with 158 additions and 34 deletions
104
data/model/test/test_basequery.py
Normal file
104
data/model/test/test_basequery.py
Normal file
|
@ -0,0 +1,104 @@
|
|||
import pytest
|
||||
|
||||
from peewee import JOIN_LEFT_OUTER
|
||||
from playhouse.test_utils import assert_query_count
|
||||
|
||||
from data.database import Repository, RepositoryPermission, TeamMember, Namespace
|
||||
from data.model._basequery import filter_to_repos_for_user
|
||||
from data.model.organization import get_admin_users
|
||||
from data.model.user import get_namespace_user
|
||||
from util.names import parse_robot_username
|
||||
|
||||
from test.fixtures import *
|
||||
|
||||
def _is_team_member(team, user):
|
||||
return user.id in [member.user_id for member in
|
||||
TeamMember.select().where(TeamMember.team == team)]
|
||||
|
||||
def _get_visible_repositories_for_user(user, repo_kind='image', include_public=False,
|
||||
namespace=None):
|
||||
""" Returns all repositories directly visible to the given user, by either repo permission,
|
||||
or the user being the admin of a namespace.
|
||||
"""
|
||||
for repo in Repository.select():
|
||||
if repo_kind is not None and repo.kind.name != repo_kind:
|
||||
continue
|
||||
|
||||
if namespace is not None and repo.namespace_user.username != namespace:
|
||||
continue
|
||||
|
||||
if include_public and repo.visibility.name == 'public':
|
||||
yield repo
|
||||
continue
|
||||
|
||||
# Direct repo permission.
|
||||
try:
|
||||
RepositoryPermission.get(repository=repo, user=user).get()
|
||||
yield repo
|
||||
continue
|
||||
except RepositoryPermission.DoesNotExist:
|
||||
pass
|
||||
|
||||
# Team permission.
|
||||
found_in_team = False
|
||||
for perm in RepositoryPermission.select().where(RepositoryPermission.repository == repo):
|
||||
if perm.team and _is_team_member(perm.team, user):
|
||||
found_in_team = True
|
||||
break
|
||||
|
||||
if found_in_team:
|
||||
yield repo
|
||||
continue
|
||||
|
||||
# Org namespace admin permission.
|
||||
if user in get_admin_users(repo.namespace_user):
|
||||
yield repo
|
||||
continue
|
||||
|
||||
|
||||
@pytest.mark.parametrize('username', [
|
||||
'devtable',
|
||||
'devtable+dtrobot',
|
||||
'public',
|
||||
'reader',
|
||||
])
|
||||
@pytest.mark.parametrize('include_public', [
|
||||
True,
|
||||
False
|
||||
])
|
||||
@pytest.mark.parametrize('filter_to_namespace', [
|
||||
True,
|
||||
False
|
||||
])
|
||||
@pytest.mark.parametrize('repo_kind', [
|
||||
None,
|
||||
'image',
|
||||
'application',
|
||||
])
|
||||
def test_filter_repositories(username, include_public, filter_to_namespace, repo_kind,
|
||||
initialized_db):
|
||||
namespace = username if filter_to_namespace else None
|
||||
if '+' in username and filter_to_namespace:
|
||||
namespace, _ = parse_robot_username(username)
|
||||
|
||||
user = get_namespace_user(username)
|
||||
query = (Repository
|
||||
.select()
|
||||
.distinct()
|
||||
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
|
||||
.switch(Repository)
|
||||
.join(RepositoryPermission, JOIN_LEFT_OUTER))
|
||||
|
||||
with assert_query_count(1):
|
||||
found = list(filter_to_repos_for_user(query, user.id,
|
||||
namespace=namespace,
|
||||
include_public=include_public,
|
||||
repo_kind=repo_kind))
|
||||
|
||||
expected = list(_get_visible_repositories_for_user(user,
|
||||
repo_kind=repo_kind,
|
||||
namespace=namespace,
|
||||
include_public=include_public))
|
||||
|
||||
assert len(found) == len(expected)
|
||||
assert {r.id for r in found} == {r.id for r in expected}
|
Reference in a new issue