Add API usage tests

This commit is contained in:
Joseph Schorr 2016-04-05 15:27:45 -04:00 committed by Jimmy Zelinskie
parent 11ff3e9b59
commit fb1dca4e94
4 changed files with 194 additions and 20 deletions

View file

@ -6,12 +6,15 @@ import logging
import re
import json as py_json
from calendar import timegm
from StringIO import StringIO
from urllib import urlencode
from urlparse import urlparse, urlunparse, parse_qs
from playhouse.test_utils import assert_query_count, _QueryLogHandler
from httmock import urlmatch, HTTMock
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from endpoints.api import api_bp, api
from endpoints.building import PreparedBuild
@ -20,7 +23,7 @@ from app import app, config_provider
from buildtrigger.basehandler import BuildTriggerHandler
from initdb import setup_database_for_testing, finished_database_for_testing
from data import database, model
from data.database import RepositoryActionCount
from data.database import RepositoryActionCount, LogEntry, LogEntryKind
from endpoints.api.team import TeamMember, TeamMemberList, TeamMemberInvite, OrganizationTeam
from endpoints.api.tag import RepositoryTagImages, RepositoryTag, RevertTag, ListRepositoryTags
@ -53,7 +56,9 @@ from endpoints.api.organization import (OrganizationList, OrganizationMember,
from endpoints.api.repository import RepositoryList, RepositoryVisibility, Repository
from endpoints.api.permission import (RepositoryUserPermission, RepositoryTeamPermission,
RepositoryTeamPermissionList, RepositoryUserPermissionList)
from endpoints.api.superuser import SuperUserLogs, SuperUserList, SuperUserManagement
from endpoints.api.superuser import (SuperUserLogs, SuperUserList, SuperUserManagement,
SuperUserServiceKeyManagement, SuperUserServiceKey,
SuperUserServiceKeyApproval)
from endpoints.api.secscan import RepositoryImageSecurity
from endpoints.api.suconfig import (SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile,
SuperUserCreateInitialSuperUser)
@ -3554,6 +3559,119 @@ class TestRepositoryImageSecurity(ApiTestCase):
self.assertEquals(1, response['data']['Layer']['IndexedByVersion'])
class TestSuperUserKeyManagement(ApiTestCase):
def test_get_update_keys(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(SuperUserServiceKeyManagement)
self.assertEquals(3, len(json['keys']))
key = json['keys'][0]
self.assertTrue('name' in key)
self.assertTrue('service' in key)
self.assertTrue('kid' in key)
self.assertTrue('created_date' in key)
self.assertTrue('expiration_date' in key)
self.assertTrue('jwk' in key)
self.assertTrue('approval' in key)
self.assertTrue('metadata' in key)
# Update the key's name.
self.putJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']),
data=dict(name='somenewname'))
# Ensure the key's name has been changed.
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']))
self.assertEquals('somenewname', json['name'])
# Ensure a log was added for the modification.
kind = LogEntryKind.get(LogEntryKind.name == 'service_key_modify')
self.assertEquals(1, model.log.LogEntry.select().where(LogEntry.kind == kind).count())
# Update the key's metadata.
self.putJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']),
data=dict(metadata=dict(foo='bar')))
# Ensure the key's metadata has been changed.
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']))
self.assertEquals('bar', json['metadata']['foo'])
# Ensure a log was added for the modification.
kind = LogEntryKind.get(LogEntryKind.name == 'service_key_modify')
self.assertEquals(2, model.log.LogEntry.select().where(LogEntry.kind == kind).count())
# Change the key's expiration.
self.putJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']),
data=dict(expiration=None))
# Ensure the key's expiration has been changed.
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']))
self.assertIsNone(json['expiration_date'])
# Ensure a log was added for the modification.
kind = LogEntryKind.get(LogEntryKind.name == 'service_key_extend')
self.assertEquals(1, model.log.LogEntry.select().where(LogEntry.kind == kind).count())
# Delete the key.
self.deleteResponse(SuperUserServiceKey, params=dict(kid=key['kid']))
# Ensure the key no longer exists.
self.getResponse(SuperUserServiceKey, params=dict(kid=key['kid']), expected_code=404)
json = self.getJsonResponse(SuperUserServiceKeyManagement)
self.assertEquals(2, len(json['keys']))
# Ensure a log was added for the deletion.
kind = LogEntryKind.get(LogEntryKind.name == 'service_key_delete')
self.assertEquals(1, model.log.LogEntry.select().where(LogEntry.kind == kind).count())
def test_create_key(self):
self.login(ADMIN_ACCESS_USER)
kind = LogEntryKind.get(LogEntryKind.name == 'service_key_create')
existing_log_count = model.log.LogEntry.select().where(LogEntry.kind == kind).count()
new_key = {
'service': 'coolservice',
'name': 'mynewkey',
'metadata': dict(foo='baz'),
'notes': 'whazzup!?',
'expiration': timegm((datetime.datetime.now() + datetime.timedelta(days=1)).utctimetuple()),
}
# Create the key.
json = self.postJsonResponse(SuperUserServiceKeyManagement, data=new_key)
self.assertEquals('mynewkey', json['name'])
self.assertTrue('kid' in json)
self.assertTrue('public_key' in json)
self.assertTrue('private_key' in json)
# Verify the private key is a valid PEM.
serialization.load_pem_private_key(json['private_key'].encode('utf-8'), None, default_backend())
# Verify the key.
kid = json['kid']
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=kid))
self.assertEquals('mynewkey', json['name'])
self.assertEquals('coolservice', json['service'])
self.assertEquals('baz', json['metadata']['foo'])
self.assertEquals(kid, json['kid'])
self.assertIsNotNone(json['approval'])
self.assertEquals('ServiceKeyApprovalType.SUPERUSER', json['approval']['approval_type'])
self.assertEquals(ADMIN_ACCESS_USER, json['approval']['approver']['username'])
self.assertEquals('whazzup!?', json['approval']['notes'])
# Ensure that there are logs for the creation and auto-approval.
kind = LogEntryKind.get(LogEntryKind.name == 'service_key_create')
self.assertEquals(existing_log_count + 1, model.log.LogEntry.select().where(LogEntry.kind == kind).count())
kind = LogEntryKind.get(LogEntryKind.name == 'service_key_approve')
self.assertEquals(existing_log_count + 1, model.log.LogEntry.select().where(LogEntry.kind == kind).count())
class TestSuperUserManagement(ApiTestCase):
def test_get_user(self):
self.login(ADMIN_ACCESS_USER)