Add feature flag to enable team syncing setup when not a superuser

This commit is contained in:
Joseph Schorr 2017-07-21 11:06:21 -04:00
parent d7b094f65c
commit 8a96647d6e
4 changed files with 51 additions and 6 deletions

View file

@ -462,6 +462,9 @@ class DefaultConfig(ImmutableConfig):
TEAM_RESYNC_STALE_TIME = '30m'
TEAM_SYNC_WORKER_FREQUENCY = 60 # seconds
# Feature Flag: If enabled, non-superusers can setup team syncing.
FEATURE_NONSUPERUSER_TEAM_SYNCING_SETUP = False
# The default configurable tag expiration time for time machine.
DEFAULT_TAG_EXPIRATION = '2w'

View file

@ -209,6 +209,14 @@ class OrganizationTeam(ApiResource):
raise Unauthorized()
def _syncing_setup_allowed(orgname):
""" Returns whether syncing setup is allowed for the current user over the matching org. """
if not features.NONSUPERUSER_TEAM_SYNCING_SETUP and not SuperUserPermission().can():
return False
return AdministerOrganizationPermission(orgname).can()
@resource('/v1/organization/<orgname>/team/<teamname>/syncing')
@path_param('orgname', 'The name of the organization')
@path_param('teamname', 'The name of the team')
@ -221,8 +229,7 @@ class OrganizationTeamSyncing(ApiResource):
@verify_not_prod
@require_fresh_login
def post(self, orgname, teamname):
# User must be both the org admin AND a superuser.
if SuperUserPermission().can() and AdministerOrganizationPermission(orgname).can():
if _syncing_setup_allowed(orgname):
try:
team = model.team.get_organization_team(orgname, teamname)
except model.InvalidTeamException:
@ -248,8 +255,7 @@ class OrganizationTeamSyncing(ApiResource):
@verify_not_prod
@require_fresh_login
def delete(self, orgname, teamname):
# User must be both the org admin AND a superuser.
if SuperUserPermission().can() and AdministerOrganizationPermission(orgname).can():
if _syncing_setup_allowed(orgname):
try:
team = model.team.get_organization_team(orgname, teamname)
except model.InvalidTeamException:
@ -296,7 +302,7 @@ class TeamMemberList(ApiResource):
}
if features.TEAM_SYNCING and authentication.federated_service:
if SuperUserPermission().can() and AdministerOrganizationPermission(orgname).can():
if _syncing_setup_allowed(orgname):
data['can_sync'] = {
'service': authentication.federated_service,
}

View file

@ -1,3 +1,5 @@
from mock import patch
import pytest
from flask_principal import AnonymousIdentity
@ -10,7 +12,7 @@ from endpoints.api.signing import RepositorySignatures
from endpoints.api.search import ConductRepositorySearch
from endpoints.api.superuser import SuperUserRepositoryBuildLogs, SuperUserRepositoryBuildResource
from endpoints.api.superuser import SuperUserRepositoryBuildStatus
from endpoints.test.shared import client_with_identity
from endpoints.test.shared import client_with_identity, toggle_feature
from test.fixtures import *
@ -69,3 +71,27 @@ NOTIFICATION_PARAMS = {'namespace': 'devtable', 'repository': 'devtable/simple',
def test_api_security(resource, method, params, body, identity, expected, client):
with client_with_identity(identity, client) as cl:
conduct_api_call(cl, resource, method, params, body, expected)
@pytest.mark.parametrize('is_superuser', [
(True),
(False),
])
@pytest.mark.parametrize('allow_nonsuperuser', [
(True),
(False),
])
@pytest.mark.parametrize('method, expected', [
('POST', 400),
('DELETE', 200),
])
def test_team_sync_security(is_superuser, allow_nonsuperuser, method, expected, client):
def is_superuser_method(_):
return is_superuser
with patch('auth.permissions.superusers.is_superuser', is_superuser_method):
with toggle_feature('NONSUPERUSER_TEAM_SYNCING_SETUP', allow_nonsuperuser):
with client_with_identity('devtable', client) as cl:
expect_success = is_superuser or allow_nonsuperuser
expected_status = expected if expect_success else 403
conduct_api_call(cl, OrganizationTeamSyncing, method, TEAM_PARAMS, {}, expected_status)

View file

@ -30,6 +30,16 @@ def client_with_identity(auth_username, client):
sess[CSRF_TOKEN_KEY] = None
@contextmanager
def toggle_feature(name, enabled):
""" Context manager which temporarily toggles a feature. """
import features
previous_value = getattr(features, name)
setattr(features, name, enabled)
yield
setattr(features, name, previous_value)
def add_csrf_param(params):
""" Returns a params dict with the CSRF parameter added. """
params = params or {}