Add ability for super users to take ownership of namespaces

Fixes #1395
This commit is contained in:
Joseph Schorr 2016-06-07 18:12:11 -04:00
parent f75949d533
commit 20816804e5
14 changed files with 280 additions and 94 deletions

View file

@ -24,7 +24,8 @@ from app import app, config_provider
from buildtrigger.basehandler import BuildTriggerHandler
from initdb import setup_database_for_testing, finished_database_for_testing
from data import database, model
from data.database import RepositoryActionCount, LogEntry, LogEntryKind
from data.database import RepositoryActionCount
from test.helpers import assert_action_logged
from endpoints.api.team import TeamMember, TeamMemberList, TeamMemberInvite, OrganizationTeam
from endpoints.api.tag import RepositoryTagImages, RepositoryTag, RevertTag, ListRepositoryTags
@ -59,7 +60,7 @@ from endpoints.api.permission import (RepositoryUserPermission, RepositoryTeamPe
RepositoryTeamPermissionList, RepositoryUserPermissionList)
from endpoints.api.superuser import (SuperUserLogs, SuperUserList, SuperUserManagement,
SuperUserServiceKeyManagement, SuperUserServiceKey,
SuperUserServiceKeyApproval)
SuperUserServiceKeyApproval, SuperUserTakeOwnership)
from endpoints.api.secscan import RepositoryImageSecurity
from endpoints.api.suconfig import (SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile,
SuperUserCreateInitialSuperUser)
@ -3650,13 +3651,62 @@ class TestRepositoryImageSecurity(ApiTestCase):
self.assertEquals(1, response['data']['Layer']['IndexedByVersion'])
class TestSuperUserTakeOwnership(ApiTestCase):
def test_take_ownership_superuser(self):
self.login(ADMIN_ACCESS_USER)
# Should fail to take ownership of a superuser.
self.postResponse(SuperUserTakeOwnership, params=dict(namespace=ADMIN_ACCESS_USER),
expected_code=400)
def test_take_ownership_invalid_namespace(self):
self.login(ADMIN_ACCESS_USER)
self.postResponse(SuperUserTakeOwnership, params=dict(namespace='invalid'),
expected_code=404)
def test_take_ownership_non_superuser(self):
self.login(READ_ACCESS_USER)
self.postResponse(SuperUserTakeOwnership, params=dict(namespace='freshuser'),
expected_code=403)
def test_take_ownership_user(self):
self.login(ADMIN_ACCESS_USER)
with assert_action_logged('take_ownership'):
# Take ownership of the read user.
self.postResponse(SuperUserTakeOwnership, params=dict(namespace=READ_ACCESS_USER))
# Ensure that the read access user is now an org, with the superuser as the owner.
reader = model.user.get_user_or_org(READ_ACCESS_USER)
self.assertTrue(reader.organization)
usernames = [admin.username for admin in model.organization.get_admin_users(reader)]
self.assertIn(ADMIN_ACCESS_USER, usernames)
def test_take_ownership_org(self):
# Create a new org with another user as owner.
public_user = model.user.get_user(PUBLIC_USER)
org = model.organization.create_organization('someorg', 'some@example.com', public_user)
# Ensure that the admin is not yet owner of the org.
usernames = [admin.username for admin in model.organization.get_admin_users(org)]
self.assertNotIn(ADMIN_ACCESS_USER, usernames)
with assert_action_logged('take_ownership'):
# Take ownership.
self.login(ADMIN_ACCESS_USER)
self.postResponse(SuperUserTakeOwnership, params=dict(namespace='someorg'))
# Ensure now in the admin users.
usernames = [admin.username for admin in model.organization.get_admin_users(org)]
self.assertIn(ADMIN_ACCESS_USER, usernames)
class TestSuperUserKeyManagement(ApiTestCase):
def test_get_update_keys(self):
self.login(ADMIN_ACCESS_USER)
kind = LogEntryKind.get(LogEntryKind.name == 'service_key_modify')
existing_modify = model.log.LogEntry.select().where(LogEntry.kind == kind).count()
json = self.getJsonResponse(SuperUserServiceKeyManagement)
key_count = len(json['keys'])
@ -3670,81 +3720,65 @@ class TestSuperUserKeyManagement(ApiTestCase):
self.assertTrue('approval' in key)
self.assertTrue('metadata' in key)
# Update the key's name.
self.putJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']),
data=dict(name='somenewname'))
with assert_action_logged('service_key_modify'):
# Update the key's name.
self.putJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']),
data=dict(name='somenewname'))
# Ensure the key's name has been changed.
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']))
self.assertEquals('somenewname', json['name'])
# Ensure the key's name has been changed.
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']))
self.assertEquals('somenewname', json['name'])
# Ensure a log was added for the modification.
kind = LogEntryKind.get(LogEntryKind.name == 'service_key_modify')
self.assertEquals(existing_modify + 1, model.log.LogEntry.select().where(LogEntry.kind == kind).count())
with assert_action_logged('service_key_modify'):
# Update the key's metadata.
self.putJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']),
data=dict(metadata=dict(foo='bar')))
# Update the key's metadata.
self.putJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']),
data=dict(metadata=dict(foo='bar')))
# Ensure the key's metadata has been changed.
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']))
self.assertEquals('bar', json['metadata']['foo'])
# Ensure the key's metadata has been changed.
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']))
self.assertEquals('bar', json['metadata']['foo'])
with assert_action_logged('service_key_extend'):
# Change the key's expiration.
self.putJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']),
data=dict(expiration=None))
# Ensure a log was added for the modification.
kind = LogEntryKind.get(LogEntryKind.name == 'service_key_modify')
self.assertEquals(existing_modify + 2, model.log.LogEntry.select().where(LogEntry.kind == kind).count())
# Ensure the key's expiration has been changed.
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']))
self.assertIsNone(json['expiration_date'])
# Change the key's expiration.
self.putJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']),
data=dict(expiration=None))
with assert_action_logged('service_key_delete'):
# Delete the key.
self.deleteResponse(SuperUserServiceKey, params=dict(kid=key['kid']))
# Ensure the key's expiration has been changed.
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']))
self.assertIsNone(json['expiration_date'])
# Ensure the key no longer exists.
self.getResponse(SuperUserServiceKey, params=dict(kid=key['kid']), expected_code=404)
# Ensure a log was added for the modification.
kind = LogEntryKind.get(LogEntryKind.name == 'service_key_extend')
self.assertEquals(1, model.log.LogEntry.select().where(LogEntry.kind == kind).count())
json = self.getJsonResponse(SuperUserServiceKeyManagement)
self.assertEquals(key_count - 1, len(json['keys']))
# Delete the key.
self.deleteResponse(SuperUserServiceKey, params=dict(kid=key['kid']))
# Ensure the key no longer exists.
self.getResponse(SuperUserServiceKey, params=dict(kid=key['kid']), expected_code=404)
json = self.getJsonResponse(SuperUserServiceKeyManagement)
self.assertEquals(key_count - 1, len(json['keys']))
# Ensure a log was added for the deletion.
kind = LogEntryKind.get(LogEntryKind.name == 'service_key_delete')
self.assertEquals(1, model.log.LogEntry.select().where(LogEntry.kind == kind).count())
def test_approve_key(self):
self.login(ADMIN_ACCESS_USER)
kind = LogEntryKind.get(LogEntryKind.name == 'service_key_approve')
existing_log_count = model.log.LogEntry.select().where(LogEntry.kind == kind).count()
# Ensure the key is not yet approved.
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid='kid3'))
self.assertEquals('unapprovedkey', json['name'])
self.assertIsNone(json['approval'])
# Approve the key.
self.postResponse(SuperUserServiceKeyApproval, params=dict(kid='kid3'),
data=dict(notes='testapprove'), expected_code=201)
with assert_action_logged('service_key_approve'):
self.postResponse(SuperUserServiceKeyApproval, params=dict(kid='kid3'),
data=dict(notes='testapprove'), expected_code=201)
# Ensure the key is approved.
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid='kid3'))
self.assertEquals('unapprovedkey', json['name'])
self.assertIsNotNone(json['approval'])
self.assertEquals('ServiceKeyApprovalType.SUPERUSER', json['approval']['approval_type'])
self.assertEquals(ADMIN_ACCESS_USER, json['approval']['approver']['username'])
self.assertEquals('testapprove', json['approval']['notes'])
# Ensure the key is approved.
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid='kid3'))
self.assertEquals('unapprovedkey', json['name'])
self.assertIsNotNone(json['approval'])
self.assertEquals('ServiceKeyApprovalType.SUPERUSER', json['approval']['approval_type'])
self.assertEquals(ADMIN_ACCESS_USER, json['approval']['approver']['username'])
self.assertEquals('testapprove', json['approval']['notes'])
# Ensure the approval was logged.
kind = LogEntryKind.get(LogEntryKind.name == 'service_key_approve')
self.assertEquals(existing_log_count + 1, model.log.LogEntry.select().where(LogEntry.kind == kind).count())
def test_approve_preapproved(self):
self.login(ADMIN_ACCESS_USER)
@ -3766,9 +3800,6 @@ class TestSuperUserKeyManagement(ApiTestCase):
def test_create_key(self):
self.login(ADMIN_ACCESS_USER)
kind = LogEntryKind.get(LogEntryKind.name == 'service_key_create')
existing_log_count = model.log.LogEntry.select().where(LogEntry.kind == kind).count()
new_key = {
'service': 'coolservice',
'name': 'mynewkey',
@ -3777,36 +3808,30 @@ class TestSuperUserKeyManagement(ApiTestCase):
'expiration': timegm((datetime.datetime.now() + datetime.timedelta(days=1)).utctimetuple()),
}
# Create the key.
json = self.postJsonResponse(SuperUserServiceKeyManagement, data=new_key)
self.assertEquals('mynewkey', json['name'])
self.assertTrue('kid' in json)
self.assertTrue('public_key' in json)
self.assertTrue('private_key' in json)
with assert_action_logged('service_key_create'):
# Create the key.
json = self.postJsonResponse(SuperUserServiceKeyManagement, data=new_key)
self.assertEquals('mynewkey', json['name'])
self.assertTrue('kid' in json)
self.assertTrue('public_key' in json)
self.assertTrue('private_key' in json)
# Verify the private key is a valid PEM.
serialization.load_pem_private_key(json['private_key'].encode('utf-8'), None, default_backend())
# Verify the private key is a valid PEM.
serialization.load_pem_private_key(json['private_key'].encode('utf-8'), None, default_backend())
# Verify the key.
kid = json['kid']
# Verify the key.
kid = json['kid']
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=kid))
self.assertEquals('mynewkey', json['name'])
self.assertEquals('coolservice', json['service'])
self.assertEquals('baz', json['metadata']['foo'])
self.assertEquals(kid, json['kid'])
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=kid))
self.assertEquals('mynewkey', json['name'])
self.assertEquals('coolservice', json['service'])
self.assertEquals('baz', json['metadata']['foo'])
self.assertEquals(kid, json['kid'])
self.assertIsNotNone(json['approval'])
self.assertEquals('ServiceKeyApprovalType.SUPERUSER', json['approval']['approval_type'])
self.assertEquals(ADMIN_ACCESS_USER, json['approval']['approver']['username'])
self.assertEquals('whazzup!?', json['approval']['notes'])
# Ensure that there are logs for the creation and auto-approval.
kind = LogEntryKind.get(LogEntryKind.name == 'service_key_create')
self.assertEquals(existing_log_count + 1, model.log.LogEntry.select().where(LogEntry.kind == kind).count())
kind = LogEntryKind.get(LogEntryKind.name == 'service_key_approve')
self.assertEquals(existing_log_count + 1, model.log.LogEntry.select().where(LogEntry.kind == kind).count())
self.assertIsNotNone(json['approval'])
self.assertEquals('ServiceKeyApprovalType.SUPERUSER', json['approval']['approval_type'])
self.assertEquals(ADMIN_ACCESS_USER, json['approval']['approver']['username'])
self.assertEquals('whazzup!?', json['approval']['notes'])
class TestSuperUserManagement(ApiTestCase):