2683 lines
98 KiB
Python
2683 lines
98 KiB
Python
# coding=utf-8
|
|
|
|
import unittest
|
|
import json as py_json
|
|
|
|
from urllib import urlencode
|
|
from urlparse import urlparse, urlunparse, parse_qs
|
|
|
|
from endpoints.api import api_bp, api
|
|
from endpoints.webhooks import webhooks
|
|
from endpoints.trigger import BuildTrigger as BuildTriggerBase
|
|
from app import app
|
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
|
from data import model, database
|
|
|
|
from endpoints.api.team import TeamMember, TeamMemberList, TeamMemberInvite, OrganizationTeam
|
|
from endpoints.api.tag import RepositoryTagImages, RepositoryTag
|
|
from endpoints.api.search import FindRepositories, EntitySearch
|
|
from endpoints.api.image import RepositoryImage, RepositoryImageList
|
|
from endpoints.api.build import (RepositoryBuildStatus, RepositoryBuildLogs, RepositoryBuildList,
|
|
RepositoryBuildResource)
|
|
from endpoints.api.robot import (UserRobotList, OrgRobot, OrgRobotList, UserRobot,
|
|
RegenerateUserRobot, RegenerateOrgRobot)
|
|
from endpoints.api.trigger import (BuildTriggerActivate, BuildTriggerSources, BuildTriggerSubdirs,
|
|
TriggerBuildList, ActivateBuildTrigger, BuildTrigger,
|
|
BuildTriggerList, BuildTriggerAnalyze, BuildTriggerFieldValues)
|
|
from endpoints.api.repoemail import RepositoryAuthorizedEmail
|
|
from endpoints.api.repositorynotification import RepositoryNotification, RepositoryNotificationList
|
|
from endpoints.api.user import (PrivateRepositories, ConvertToOrganization, Signout, Signin, User,
|
|
UserAuthorizationList, UserAuthorization, UserNotification,
|
|
UserNotificationList)
|
|
|
|
from endpoints.api.repotoken import RepositoryToken, RepositoryTokenList
|
|
from endpoints.api.prototype import PermissionPrototype, PermissionPrototypeList
|
|
from endpoints.api.logs import UserLogs, OrgLogs
|
|
from endpoints.api.billing import (UserCard, UserPlan, ListPlans, OrganizationCard,
|
|
OrganizationPlan)
|
|
from endpoints.api.discovery import DiscoveryResource
|
|
from endpoints.api.organization import (OrganizationList, OrganizationMember,
|
|
OrgPrivateRepositories, OrgnaizationMemberList,
|
|
Organization, ApplicationInformation,
|
|
OrganizationApplications, OrganizationApplicationResource,
|
|
OrganizationApplicationResetClientSecret)
|
|
from endpoints.api.repository import RepositoryList, RepositoryVisibility, Repository
|
|
from endpoints.api.permission import (RepositoryUserPermission, RepositoryTeamPermission,
|
|
RepositoryTeamPermissionList, RepositoryUserPermissionList)
|
|
from endpoints.api.superuser import (SuperUserLogs, SuperUserList, SuperUserManagement,
|
|
UsageInformation)
|
|
|
|
try:
|
|
app.register_blueprint(api_bp, url_prefix='/api')
|
|
except ValueError:
|
|
# This blueprint was already registered
|
|
pass
|
|
|
|
app.register_blueprint(webhooks, url_prefix='/webhooks')
|
|
|
|
NO_ACCESS_USER = 'freshuser'
|
|
READ_ACCESS_USER = 'reader'
|
|
ADMIN_ACCESS_USER = 'devtable'
|
|
PUBLIC_USER = 'public'
|
|
|
|
ADMIN_ACCESS_EMAIL = 'jschorr@devtable.com'
|
|
|
|
ORG_REPO = 'orgrepo'
|
|
|
|
ORGANIZATION = 'buynlarge'
|
|
|
|
NEW_USER_DETAILS = {
|
|
'username': 'bobby',
|
|
'password': 'password',
|
|
'email': 'bobby@tables.com',
|
|
}
|
|
|
|
FAKE_APPLICATION_CLIENT_ID = 'deadbeef'
|
|
|
|
CSRF_TOKEN_KEY = '_csrf_token'
|
|
CSRF_TOKEN = '123csrfforme'
|
|
|
|
class ApiTestCase(unittest.TestCase):
|
|
maxDiff = None
|
|
|
|
@staticmethod
|
|
def _add_csrf(without_csrf):
|
|
parts = urlparse(without_csrf)
|
|
query = parse_qs(parts[4])
|
|
query[CSRF_TOKEN_KEY] = CSRF_TOKEN
|
|
return urlunparse(list(parts[0:4]) + [urlencode(query)] + list(parts[5:]))
|
|
|
|
def url_for(self, resource_name, params={}):
|
|
url = api.url_for(resource_name, **params)
|
|
url = ApiTestCase._add_csrf(url)
|
|
return url
|
|
|
|
def setUp(self):
|
|
setup_database_for_testing(self)
|
|
self.app = app.test_client()
|
|
self.ctx = app.test_request_context()
|
|
self.ctx.__enter__()
|
|
self.setCsrfToken(CSRF_TOKEN)
|
|
|
|
def tearDown(self):
|
|
finished_database_for_testing(self)
|
|
self.ctx.__exit__(True, None, None)
|
|
|
|
def setCsrfToken(self, token):
|
|
with self.app.session_transaction() as sess:
|
|
sess[CSRF_TOKEN_KEY] = token
|
|
|
|
def getJsonResponse(self, resource_name, params={}, expected_code=200):
|
|
rv = self.app.get(api.url_for(resource_name, **params))
|
|
self.assertEquals(expected_code, rv.status_code)
|
|
data = rv.data
|
|
parsed = py_json.loads(data)
|
|
return parsed
|
|
|
|
def postResponse(self, resource_name, params={}, data={}, expected_code=200):
|
|
rv = self.app.post(self.url_for(resource_name, params),
|
|
data=py_json.dumps(data),
|
|
headers={"Content-Type": "application/json"})
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
return rv.data
|
|
|
|
def getResponse(self, resource_name, params={}, expected_code=200):
|
|
rv = self.app.get(api.url_for(resource_name, **params))
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
return rv.data
|
|
|
|
def putResponse(self, resource_name, params={}, data={}, expected_code=200):
|
|
rv = self.app.put(self.url_for(resource_name, params),
|
|
data=py_json.dumps(data),
|
|
headers={"Content-Type": "application/json"})
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
return rv.data
|
|
|
|
def deleteResponse(self, resource_name, params={}, expected_code=204):
|
|
rv = self.app.delete(self.url_for(resource_name, params))
|
|
|
|
if rv.status_code != expected_code:
|
|
print 'Mismatch data for resource DELETE %s: %s' % (resource_name, rv.data)
|
|
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
return rv.data
|
|
|
|
def postJsonResponse(self, resource_name, params={}, data={},
|
|
expected_code=200):
|
|
rv = self.app.post(self.url_for(resource_name, params),
|
|
data=py_json.dumps(data),
|
|
headers={"Content-Type": "application/json"})
|
|
|
|
if rv.status_code != expected_code:
|
|
print 'Mismatch data for resource POST %s: %s' % (resource_name, rv.data)
|
|
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
data = rv.data
|
|
parsed = py_json.loads(data)
|
|
return parsed
|
|
|
|
def putJsonResponse(self, resource_name, params={}, data={},
|
|
expected_code=200):
|
|
rv = self.app.put(self.url_for(resource_name, params),
|
|
data=py_json.dumps(data),
|
|
headers={"Content-Type": "application/json"})
|
|
|
|
if rv.status_code != expected_code:
|
|
print 'Mismatch data for resource PUT %s: %s' % (resource_name, rv.data)
|
|
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
data = rv.data
|
|
parsed = py_json.loads(data)
|
|
return parsed
|
|
|
|
def assertInTeam(self, data, membername):
|
|
for memberData in data['members']:
|
|
if memberData['name'] == membername:
|
|
return
|
|
|
|
self.fail(membername + ' not found in team: ' + py_json.dumps(data))
|
|
|
|
def login(self, username, password='password'):
|
|
return self.postJsonResponse(Signin, data=dict(username=username, password=password))
|
|
|
|
|
|
class TestCSRFFailure(ApiTestCase):
|
|
def test_csrf_failure(self):
|
|
self.login(READ_ACCESS_USER)
|
|
|
|
# Make sure a simple post call succeeds.
|
|
self.putJsonResponse(User,
|
|
data=dict(password='newpasswordiscool'))
|
|
|
|
# Change the session's CSRF token.
|
|
self.setCsrfToken('someinvalidtoken')
|
|
|
|
# Verify that the call now fails.
|
|
self.putJsonResponse(User,
|
|
data=dict(password='newpasswordiscool'),
|
|
expected_code=403)
|
|
|
|
|
|
class TestDiscovery(ApiTestCase):
|
|
def test_discovery(self):
|
|
json = self.getJsonResponse(DiscoveryResource)
|
|
assert 'apis' in json
|
|
|
|
|
|
class TestPlans(ApiTestCase):
|
|
def test_plans(self):
|
|
json = self.getJsonResponse(ListPlans)
|
|
found = set([])
|
|
for method_info in json['plans']:
|
|
found.add(method_info['stripeId'])
|
|
|
|
assert 'free' in found
|
|
|
|
|
|
class TestLoggedInUser(ApiTestCase):
|
|
def test_guest(self):
|
|
self.getJsonResponse(User, expected_code=401)
|
|
|
|
def test_user(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.getJsonResponse(User)
|
|
assert json['anonymous'] == False
|
|
assert json['username'] == READ_ACCESS_USER
|
|
|
|
|
|
class TestUserNotification(ApiTestCase):
|
|
def test_get(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.getJsonResponse(UserNotificationList)
|
|
|
|
# Make sure each notification can be retrieved.
|
|
for notification in json['notifications']:
|
|
njson = self.getJsonResponse(UserNotification, params=dict(uuid=notification['id']))
|
|
self.assertEquals(notification['id'], njson['id'])
|
|
|
|
# Update a notification.
|
|
assert json['notifications']
|
|
assert not json['notifications'][0]['dismissed']
|
|
|
|
pjson = self.putJsonResponse(UserNotification, params=dict(uuid=notification['id']),
|
|
data=dict(dismissed=True))
|
|
|
|
self.assertEquals(True, pjson['dismissed'])
|
|
|
|
|
|
class TestGetUserPrivateAllowed(ApiTestCase):
|
|
def test_nonallowed(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.getJsonResponse(PrivateRepositories)
|
|
assert json['privateCount'] == 0
|
|
assert not json['privateAllowed']
|
|
|
|
def test_allowed(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.getJsonResponse(PrivateRepositories)
|
|
assert json['privateCount'] >= 6
|
|
assert json['privateAllowed']
|
|
|
|
|
|
class TestConvertToOrganization(ApiTestCase):
|
|
def test_sameadminuser(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.postJsonResponse(ConvertToOrganization,
|
|
data={'adminUser': READ_ACCESS_USER,
|
|
'adminPassword': 'password',
|
|
'plan': 'free'},
|
|
expected_code=400)
|
|
|
|
self.assertEqual('The admin user is not valid', json['message'])
|
|
|
|
def test_invalidadminuser(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.postJsonResponse(ConvertToOrganization,
|
|
data={'adminUser': 'unknownuser',
|
|
'adminPassword': 'password',
|
|
'plan': 'free'},
|
|
expected_code=400)
|
|
|
|
self.assertEqual('The admin user credentials are not valid',
|
|
json['message'])
|
|
|
|
def test_invalidadminpassword(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.postJsonResponse(ConvertToOrganization,
|
|
data={'adminUser': ADMIN_ACCESS_USER,
|
|
'adminPassword': 'invalidpass',
|
|
'plan': 'free'},
|
|
expected_code=400)
|
|
|
|
self.assertEqual('The admin user credentials are not valid',
|
|
json['message'])
|
|
|
|
def test_convert(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.postJsonResponse(ConvertToOrganization,
|
|
data={'adminUser': ADMIN_ACCESS_USER,
|
|
'adminPassword': 'password',
|
|
'plan': 'free'})
|
|
|
|
self.assertEqual(True, json['success'])
|
|
|
|
# Verify the organization exists.
|
|
organization = model.get_organization(READ_ACCESS_USER)
|
|
assert organization is not None
|
|
|
|
# Verify the admin user is the org's admin.
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.getJsonResponse(Organization,
|
|
params=dict(orgname=READ_ACCESS_USER))
|
|
|
|
self.assertEquals(READ_ACCESS_USER, json['name'])
|
|
self.assertEquals(True, json['is_admin'])
|
|
|
|
|
|
def test_convert_via_email(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.postJsonResponse(ConvertToOrganization,
|
|
data={'adminUser': ADMIN_ACCESS_EMAIL,
|
|
'adminPassword': 'password',
|
|
'plan': 'free'})
|
|
|
|
self.assertEqual(True, json['success'])
|
|
|
|
# Verify the organization exists.
|
|
organization = model.get_organization(READ_ACCESS_USER)
|
|
assert organization is not None
|
|
|
|
# Verify the admin user is the org's admin.
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.getJsonResponse(Organization,
|
|
params=dict(orgname=READ_ACCESS_USER))
|
|
|
|
self.assertEquals(READ_ACCESS_USER, json['name'])
|
|
self.assertEquals(True, json['is_admin'])
|
|
|
|
|
|
class TestChangeUserDetails(ApiTestCase):
|
|
def test_changepassword(self):
|
|
self.login(READ_ACCESS_USER)
|
|
self.putJsonResponse(User,
|
|
data=dict(password='newpasswordiscool'))
|
|
self.login(READ_ACCESS_USER, password='newpasswordiscool')
|
|
|
|
def test_changepassword_unicode(self):
|
|
self.login(READ_ACCESS_USER)
|
|
self.putJsonResponse(User,
|
|
data=dict(password=u'someunicode北京市pass'))
|
|
self.login(READ_ACCESS_USER, password=u'someunicode北京市pass')
|
|
|
|
def test_changeeemail(self):
|
|
self.login(READ_ACCESS_USER)
|
|
|
|
self.putJsonResponse(User,
|
|
data=dict(email='test+foo@devtable.com'))
|
|
|
|
def test_changeinvoiceemail(self):
|
|
self.login(READ_ACCESS_USER)
|
|
|
|
json = self.putJsonResponse(User,
|
|
data=dict(invoice_email=True))
|
|
self.assertEquals(True, json['invoice_email'])
|
|
|
|
json = self.putJsonResponse(User,
|
|
data=dict(invoice_email=False))
|
|
self.assertEquals(False, json['invoice_email'])
|
|
|
|
|
|
class TestCreateNewUser(ApiTestCase):
|
|
def test_existingusername(self):
|
|
json = self.postJsonResponse(User,
|
|
data=dict(username=READ_ACCESS_USER,
|
|
password='password',
|
|
email='test@example.com'),
|
|
expected_code=400)
|
|
|
|
self.assertEquals('The username already exists', json['message'])
|
|
|
|
def test_trycreatetooshort(self):
|
|
json = self.postJsonResponse(User,
|
|
data=dict(username='a',
|
|
password='password',
|
|
email='test@example.com'),
|
|
expected_code=400)
|
|
|
|
self.assertEquals('Invalid username a: Username must be between 4 and 30 characters in length', json['error_description'])
|
|
|
|
def test_trycreateregexmismatch(self):
|
|
json = self.postJsonResponse(User,
|
|
data=dict(username='auserName',
|
|
password='password',
|
|
email='test@example.com'),
|
|
expected_code=400)
|
|
|
|
self.assertEquals('Invalid username auserName: Username must match expression [a-z0-9_]+', json['error_description'])
|
|
|
|
def test_createuser(self):
|
|
data = self.postJsonResponse(User,
|
|
data=NEW_USER_DETAILS,
|
|
expected_code=200)
|
|
self.assertEquals(True, data['awaiting_verification'])
|
|
|
|
def test_createuser_withteaminvite(self):
|
|
inviter = model.get_user(ADMIN_ACCESS_USER)
|
|
team = model.get_organization_team(ORGANIZATION, 'owners')
|
|
invite = model.add_or_invite_to_team(inviter, team, None, 'foo@example.com')
|
|
|
|
details = {
|
|
'invite_code': invite.invite_token
|
|
}
|
|
details.update(NEW_USER_DETAILS);
|
|
|
|
data = self.postJsonResponse(User, data=details, expected_code=200)
|
|
self.assertEquals(True, data['awaiting_verification'])
|
|
|
|
# Make sure the user was added to the team.
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.getJsonResponse(TeamMemberList,
|
|
params=dict(orgname=ORGANIZATION,
|
|
teamname='owners'))
|
|
self.assertInTeam(json, NEW_USER_DETAILS['username'])
|
|
|
|
|
|
class TestSignout(ApiTestCase):
|
|
def test_signout(self):
|
|
self.login(READ_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(User)
|
|
assert json['username'] == READ_ACCESS_USER
|
|
|
|
self.postResponse(Signout)
|
|
|
|
# Make sure we're now signed out.
|
|
self.getJsonResponse(User, expected_code=401)
|
|
|
|
|
|
class TestGetMatchingEntities(ApiTestCase):
|
|
def test_notinorg(self):
|
|
self.login(NO_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(EntitySearch,
|
|
params=dict(prefix='o', namespace=ORGANIZATION,
|
|
includeTeams='true'))
|
|
|
|
names = set([r['name'] for r in json['results']])
|
|
assert 'outsideorg' in names
|
|
assert not 'owners' in names
|
|
|
|
def test_inorg(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(EntitySearch,
|
|
params=dict(prefix='o', namespace=ORGANIZATION,
|
|
includeTeams='true'))
|
|
|
|
names = set([r['name'] for r in json['results']])
|
|
assert 'outsideorg' in names
|
|
assert 'owners' in names
|
|
|
|
def test_inorg_withorgs(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(EntitySearch,
|
|
params=dict(prefix=ORGANIZATION[0], namespace=ORGANIZATION,
|
|
includeOrgs='true'))
|
|
|
|
names = set([r['name'] for r in json['results']])
|
|
assert ORGANIZATION in names
|
|
|
|
|
|
class TestCreateOrganization(ApiTestCase):
|
|
def test_existinguser(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.postJsonResponse(OrganizationList,
|
|
data=dict(name=ADMIN_ACCESS_USER, email='testorg@example.com'),
|
|
expected_code=400)
|
|
|
|
self.assertEquals('A user or organization with this name already exists',
|
|
json['message'])
|
|
|
|
def test_existingorg(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.postJsonResponse(OrganizationList,
|
|
data=dict(name=ORGANIZATION, email='testorg@example.com'),
|
|
expected_code=400)
|
|
|
|
self.assertEquals('A user or organization with this name already exists',
|
|
json['message'])
|
|
|
|
def test_createorg(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
data = self.postResponse(OrganizationList,
|
|
data=dict(name='neworg',
|
|
email='testorg@example.com'),
|
|
expected_code=201)
|
|
|
|
self.assertEquals('"Created"', data)
|
|
|
|
# Ensure the org was created.
|
|
organization = model.get_organization('neworg')
|
|
assert organization is not None
|
|
|
|
# Verify the admin user is the org's admin.
|
|
json = self.getJsonResponse(Organization,
|
|
params=dict(orgname='neworg'))
|
|
self.assertEquals('neworg', json['name'])
|
|
self.assertEquals(True, json['is_admin'])
|
|
|
|
|
|
class TestGetOrganization(ApiTestCase):
|
|
def test_unknownorg(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
self.getResponse(Organization, params=dict(orgname='notvalid'),
|
|
expected_code=403)
|
|
|
|
def test_cannotaccess(self):
|
|
self.login(NO_ACCESS_USER)
|
|
self.getResponse(Organization, params=dict(orgname=ORGANIZATION),
|
|
expected_code=403)
|
|
|
|
def test_getorganization(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.getJsonResponse(Organization,
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
self.assertEquals(ORGANIZATION, json['name'])
|
|
self.assertEquals(False, json['is_admin'])
|
|
|
|
def test_getorganization_asadmin(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.getJsonResponse(Organization,
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
self.assertEquals(ORGANIZATION, json['name'])
|
|
self.assertEquals(True, json['is_admin'])
|
|
|
|
|
|
class TestChangeOrganizationDetails(ApiTestCase):
|
|
def test_changeinvoiceemail(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.putJsonResponse(Organization,
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(invoice_email=True))
|
|
|
|
self.assertEquals(True, json['invoice_email'])
|
|
|
|
json = self.putJsonResponse(Organization,
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(invoice_email=False))
|
|
self.assertEquals(False, json['invoice_email'])
|
|
|
|
|
|
def test_changemail(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.putJsonResponse(Organization,
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(email='newemail@example.com'))
|
|
|
|
self.assertEquals('newemail@example.com', json['email'])
|
|
|
|
|
|
class TestGetOrganizationPrototypes(ApiTestCase):
|
|
def test_getprototypes(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.getJsonResponse(PermissionPrototypeList,
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
assert len(json['prototypes']) > 0
|
|
|
|
|
|
class TestCreateOrganizationPrototypes(ApiTestCase):
|
|
def test_invaliduser(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.postJsonResponse(PermissionPrototypeList,
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(activating_user={'name': 'unknownuser'},
|
|
role='read',
|
|
delegate={'kind': 'team', 'name': 'owners'}),
|
|
expected_code=400)
|
|
|
|
self.assertEquals('Unknown activating user', json['message'])
|
|
|
|
|
|
def test_missingdelegate(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.postJsonResponse(PermissionPrototypeList,
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(role='read'),
|
|
expected_code=400)
|
|
|
|
|
|
def test_createprototype(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.postJsonResponse(PermissionPrototypeList,
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(role='read',
|
|
delegate={'kind': 'team',
|
|
'name': 'readers'}))
|
|
|
|
self.assertEquals('read', json['role'])
|
|
pid = json['id']
|
|
|
|
# Verify the prototype exists.
|
|
json = self.getJsonResponse(PermissionPrototypeList,
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
ids = set([p['id'] for p in json['prototypes']])
|
|
assert pid in ids
|
|
|
|
|
|
class TestDeleteOrganizationPrototypes(ApiTestCase):
|
|
def test_deleteprototype(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Get the existing prototypes
|
|
json = self.getJsonResponse(PermissionPrototypeList,
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
ids = [p['id'] for p in json['prototypes']]
|
|
pid = ids[0]
|
|
|
|
# Delete a prototype.
|
|
self.deleteResponse(PermissionPrototype,
|
|
params=dict(orgname=ORGANIZATION, prototypeid=pid))
|
|
|
|
# Verify the prototype no longer exists.
|
|
json = self.getJsonResponse(PermissionPrototypeList,
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
newids = [p['id'] for p in json['prototypes']]
|
|
assert not pid in newids
|
|
|
|
|
|
class TestUpdateOrganizationPrototypes(ApiTestCase):
|
|
def test_updateprototype(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Get the existing prototypes
|
|
json = self.getJsonResponse(PermissionPrototypeList,
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
ids = [p['id'] for p in json['prototypes']]
|
|
pid = ids[0]
|
|
|
|
# Update a prototype.
|
|
json = self.putJsonResponse(PermissionPrototype,
|
|
params=dict(orgname=ORGANIZATION,
|
|
prototypeid=pid), data=dict(role='admin'))
|
|
|
|
self.assertEquals('admin', json['role'])
|
|
|
|
|
|
class TestGetOrganiaztionMembers(ApiTestCase):
|
|
def test_getmembers(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(OrgnaizationMemberList,
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
assert ADMIN_ACCESS_USER in json['members']
|
|
assert READ_ACCESS_USER in json['members']
|
|
assert not NO_ACCESS_USER in json['members']
|
|
|
|
def test_getspecificmember(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(OrganizationMember,
|
|
params=dict(orgname=ORGANIZATION,
|
|
membername=ADMIN_ACCESS_USER))
|
|
|
|
self.assertEquals(ADMIN_ACCESS_USER, json['member']['name'])
|
|
self.assertEquals('user', json['member']['kind'])
|
|
|
|
assert 'owners' in json['member']['teams']
|
|
|
|
|
|
class TestGetOrganizationPrivateAllowed(ApiTestCase):
|
|
def test_existingorg(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(OrgPrivateRepositories,
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
self.assertEquals(True, json['privateAllowed'])
|
|
assert not 'reposAllowed' in json
|
|
|
|
|
|
def test_neworg(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
data = self.postResponse(OrganizationList,
|
|
data=dict(name='neworg',
|
|
email='test@example.com'),
|
|
expected_code=201)
|
|
|
|
json = self.getJsonResponse(OrgPrivateRepositories,
|
|
params=dict(orgname='neworg'))
|
|
|
|
self.assertEquals(False, json['privateAllowed'])
|
|
|
|
|
|
class TestUpdateOrganizationTeam(ApiTestCase):
|
|
def test_updateexisting(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
data = self.putJsonResponse(OrganizationTeam,
|
|
params=dict(orgname=ORGANIZATION,
|
|
teamname='readers'),
|
|
data=dict(description='My cool team',
|
|
role='creator'))
|
|
|
|
self.assertEquals('My cool team', data['description'])
|
|
self.assertEquals('creator', data['role'])
|
|
|
|
def test_attemptchangeroleonowners(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.putJsonResponse(OrganizationTeam,
|
|
params=dict(orgname=ORGANIZATION, teamname='owners'),
|
|
data=dict(role = 'creator'),
|
|
expected_code=400)
|
|
|
|
def test_createnewteam(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
data = self.putJsonResponse(OrganizationTeam,
|
|
params=dict(orgname=ORGANIZATION,
|
|
teamname='newteam'),
|
|
data=dict(description='My cool team',
|
|
role='member'))
|
|
|
|
self.assertEquals('My cool team', data['description'])
|
|
self.assertEquals('member', data['role'])
|
|
|
|
# Verify the team was created.
|
|
json = self.getJsonResponse(Organization,
|
|
params=dict(orgname=ORGANIZATION))
|
|
assert 'newteam' in json['teams']
|
|
|
|
|
|
class TestDeleteOrganizationTeam(ApiTestCase):
|
|
def test_deleteteam(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.deleteResponse(OrganizationTeam,
|
|
params=dict(orgname=ORGANIZATION, teamname='readers'))
|
|
|
|
# Make sure the team was deleted
|
|
json = self.getJsonResponse(Organization,
|
|
params=dict(orgname=ORGANIZATION))
|
|
assert not 'readers' in json['teams']
|
|
|
|
def test_attemptdeleteowners(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.deleteResponse(OrganizationTeam,
|
|
params=dict(orgname=ORGANIZATION, teamname='owners'),
|
|
expected_code=400)
|
|
|
|
|
|
class TestGetOrganizationTeamMembers(ApiTestCase):
|
|
def test_invalidteam(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.getResponse(TeamMemberList,
|
|
params=dict(orgname=ORGANIZATION, teamname='notvalid'),
|
|
expected_code=404)
|
|
|
|
def test_getmembers(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(TeamMemberList,
|
|
params=dict(orgname=ORGANIZATION,
|
|
teamname='readers'))
|
|
|
|
self.assertEquals(READ_ACCESS_USER, json['members'][1]['name'])
|
|
|
|
|
|
class TestUpdateOrganizationTeamMember(ApiTestCase):
|
|
def test_addmember_alreadyteammember(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
membername = READ_ACCESS_USER
|
|
self.putResponse(TeamMember,
|
|
params=dict(orgname=ORGANIZATION, teamname='readers',
|
|
membername=membername),
|
|
expected_code=400)
|
|
|
|
|
|
def test_addmember_orgmember(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
membername = READ_ACCESS_USER
|
|
self.putJsonResponse(TeamMember,
|
|
params=dict(orgname=ORGANIZATION, teamname='owners',
|
|
membername=membername))
|
|
|
|
# Verify the user was added to the team.
|
|
json = self.getJsonResponse(TeamMemberList,
|
|
params=dict(orgname=ORGANIZATION,
|
|
teamname='owners'))
|
|
|
|
self.assertInTeam(json, membername)
|
|
|
|
|
|
def test_addmember_robot(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
membername = ORGANIZATION + '+coolrobot'
|
|
self.putJsonResponse(TeamMember,
|
|
params=dict(orgname=ORGANIZATION, teamname='readers',
|
|
membername=membername))
|
|
|
|
|
|
# Verify the user was added to the team.
|
|
json = self.getJsonResponse(TeamMemberList,
|
|
params=dict(orgname=ORGANIZATION,
|
|
teamname='readers'))
|
|
|
|
self.assertInTeam(json, membername)
|
|
|
|
|
|
def test_addmember_invalidrobot(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
membername = 'freshuser+anotherrobot'
|
|
self.putResponse(TeamMember,
|
|
params=dict(orgname=ORGANIZATION, teamname='readers',
|
|
membername=membername),
|
|
expected_code=400)
|
|
|
|
|
|
def test_addmember_nonorgmember(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
membername = NO_ACCESS_USER
|
|
response = self.putJsonResponse(TeamMember,
|
|
params=dict(orgname=ORGANIZATION, teamname='owners',
|
|
membername=membername))
|
|
|
|
|
|
self.assertEquals(True, response['invited'])
|
|
|
|
# Make sure the user is not (yet) part of the team.
|
|
json = self.getJsonResponse(TeamMemberList,
|
|
params=dict(orgname=ORGANIZATION,
|
|
teamname='readers'))
|
|
|
|
for member in json['members']:
|
|
self.assertNotEqual(membername, member['name'])
|
|
|
|
|
|
class TestAcceptTeamMemberInvite(ApiTestCase):
|
|
def assertInTeam(self, data, membername):
|
|
for memberData in data['members']:
|
|
if memberData['name'] == membername:
|
|
return
|
|
|
|
self.fail(membername + ' not found in team: ' + json.dumps(data))
|
|
|
|
def test_accept(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Create the invite.
|
|
membername = NO_ACCESS_USER
|
|
response = self.putJsonResponse(TeamMember,
|
|
params=dict(orgname=ORGANIZATION, teamname='owners',
|
|
membername=membername))
|
|
|
|
self.assertEquals(True, response['invited'])
|
|
|
|
# Login as the user.
|
|
self.login(membername)
|
|
|
|
# Accept the invite.
|
|
user = model.get_user(membername)
|
|
invites = list(model.lookup_team_invites(user))
|
|
self.assertEquals(1, len(invites))
|
|
|
|
self.putJsonResponse(TeamMemberInvite,
|
|
params=dict(code=invites[0].invite_token))
|
|
|
|
# Verify the user is now on the team.
|
|
json = self.getJsonResponse(TeamMemberList,
|
|
params=dict(orgname=ORGANIZATION,
|
|
teamname='owners'))
|
|
|
|
self.assertInTeam(json, membername)
|
|
|
|
# Verify the accept now fails.
|
|
self.putResponse(TeamMemberInvite,
|
|
params=dict(code=invites[0].invite_token),
|
|
expected_code=400)
|
|
|
|
|
|
|
|
class TestDeclineTeamMemberInvite(ApiTestCase):
|
|
def test_decline_wronguser(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Create the invite.
|
|
membername = NO_ACCESS_USER
|
|
response = self.putJsonResponse(TeamMember,
|
|
params=dict(orgname=ORGANIZATION, teamname='owners',
|
|
membername=membername))
|
|
|
|
self.assertEquals(True, response['invited'])
|
|
|
|
# Try to decline the invite.
|
|
user = model.get_user(membername)
|
|
invites = list(model.lookup_team_invites(user))
|
|
self.assertEquals(1, len(invites))
|
|
|
|
self.deleteResponse(TeamMemberInvite,
|
|
params=dict(code=invites[0].invite_token),
|
|
expected_code=400)
|
|
|
|
|
|
def test_decline(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Create the invite.
|
|
membername = NO_ACCESS_USER
|
|
response = self.putJsonResponse(TeamMember,
|
|
params=dict(orgname=ORGANIZATION, teamname='owners',
|
|
membername=membername))
|
|
|
|
self.assertEquals(True, response['invited'])
|
|
|
|
# Login as the user.
|
|
self.login(membername)
|
|
|
|
# Decline the invite.
|
|
user = model.get_user(membername)
|
|
invites = list(model.lookup_team_invites(user))
|
|
self.assertEquals(1, len(invites))
|
|
|
|
self.deleteResponse(TeamMemberInvite,
|
|
params=dict(code=invites[0].invite_token))
|
|
|
|
# Make sure the invite was deleted.
|
|
self.deleteResponse(TeamMemberInvite,
|
|
params=dict(code=invites[0].invite_token),
|
|
expected_code=400)
|
|
|
|
|
|
class TestDeleteOrganizationTeamMember(ApiTestCase):
|
|
def test_deletememberinvite(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
membername = NO_ACCESS_USER
|
|
response = self.putJsonResponse(TeamMember,
|
|
params=dict(orgname=ORGANIZATION, teamname='readers',
|
|
membername=membername))
|
|
|
|
|
|
self.assertEquals(True, response['invited'])
|
|
|
|
# Verify the invite was added.
|
|
json = self.getJsonResponse(TeamMemberList,
|
|
params=dict(orgname=ORGANIZATION,
|
|
teamname='readers',
|
|
includePending=True))
|
|
|
|
assert len(json['members']) == 3
|
|
|
|
# Delete the invite.
|
|
self.deleteResponse(TeamMember,
|
|
params=dict(orgname=ORGANIZATION, teamname='readers',
|
|
membername=membername))
|
|
|
|
|
|
# Verify the user was removed from the team.
|
|
json = self.getJsonResponse(TeamMemberList,
|
|
params=dict(orgname=ORGANIZATION,
|
|
teamname='readers',
|
|
includePending=True))
|
|
|
|
assert len(json['members']) == 2
|
|
|
|
|
|
def test_deletemember(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.deleteResponse(TeamMember,
|
|
params=dict(orgname=ORGANIZATION, teamname='readers',
|
|
membername=READ_ACCESS_USER))
|
|
|
|
|
|
# Verify the user was removed from the team.
|
|
json = self.getJsonResponse(TeamMemberList,
|
|
params=dict(orgname=ORGANIZATION,
|
|
teamname='readers'))
|
|
|
|
assert len(json['members']) == 1
|
|
|
|
|
|
class TestCreateRepo(ApiTestCase):
|
|
def test_duplicaterepo(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.postJsonResponse(RepositoryList,
|
|
data=dict(repository='simple',
|
|
visibility='public',
|
|
description=''),
|
|
expected_code=400)
|
|
|
|
self.assertEquals('Repository already exists', json['message'])
|
|
|
|
|
|
def test_createrepo(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.postJsonResponse(RepositoryList,
|
|
data=dict(repository='newrepo',
|
|
visibility='public',
|
|
description=''),
|
|
expected_code=201)
|
|
|
|
|
|
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
|
|
self.assertEquals('newrepo', json['name'])
|
|
|
|
|
|
def test_createrepo_underorg(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.postJsonResponse(RepositoryList,
|
|
data=dict(namespace=ORGANIZATION,
|
|
repository='newrepo',
|
|
visibility='private',
|
|
description=''),
|
|
expected_code=201)
|
|
|
|
self.assertEquals(ORGANIZATION, json['namespace'])
|
|
self.assertEquals('newrepo', json['name'])
|
|
|
|
|
|
class TestFindRepos(ApiTestCase):
|
|
def test_findrepos_asguest(self):
|
|
json = self.getJsonResponse(FindRepositories, params=dict(query='p'))
|
|
self.assertEquals(len(json['repositories']), 1)
|
|
|
|
self.assertEquals(json['repositories'][0]['namespace'], 'public')
|
|
self.assertEquals(json['repositories'][0]['name'], 'publicrepo')
|
|
|
|
def test_findrepos_asuser(self):
|
|
self.login(NO_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(FindRepositories, params=dict(query='p'))
|
|
self.assertEquals(len(json['repositories']), 1)
|
|
|
|
self.assertEquals(json['repositories'][0]['namespace'], 'public')
|
|
self.assertEquals(json['repositories'][0]['name'], 'publicrepo')
|
|
|
|
def test_findrepos_orgmember(self):
|
|
self.login(READ_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(FindRepositories, params=dict(query='p'))
|
|
self.assertGreater(len(json['repositories']), 1)
|
|
|
|
|
|
class TestListRepos(ApiTestCase):
|
|
def test_listrepos_asguest(self):
|
|
json = self.getJsonResponse(RepositoryList, params=dict(public=True))
|
|
self.assertEquals(len(json['repositories']), 1)
|
|
|
|
def test_listrepos_orgmember(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.getJsonResponse(RepositoryList, params=dict(public=True))
|
|
self.assertGreater(len(json['repositories']), 1)
|
|
|
|
def test_listrepos_filter(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.getJsonResponse(RepositoryList,
|
|
params=dict(namespace=ORGANIZATION,
|
|
public=False))
|
|
|
|
for repo in json['repositories']:
|
|
self.assertEquals(ORGANIZATION, repo['namespace'])
|
|
|
|
def test_listrepos_limit(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.getJsonResponse(RepositoryList, params=dict(limit=2))
|
|
|
|
self.assertEquals(len(json['repositories']), 2)
|
|
|
|
|
|
class TestUpdateRepo(ApiTestCase):
|
|
SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple'
|
|
def test_updatedescription(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.putJsonResponse(Repository,
|
|
params=dict(repository=self.SIMPLE_REPO),
|
|
data=dict(description='Some cool repo'))
|
|
|
|
# Verify the repo description was updated.
|
|
json = self.getJsonResponse(Repository,
|
|
params=dict(repository=self.SIMPLE_REPO))
|
|
|
|
self.assertEquals('Some cool repo', json['description'])
|
|
|
|
|
|
class TestChangeRepoVisibility(ApiTestCase):
|
|
SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple'
|
|
def test_changevisibility(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Make public.
|
|
self.postJsonResponse(RepositoryVisibility,
|
|
params=dict(repository=self.SIMPLE_REPO),
|
|
data=dict(visibility='public'))
|
|
|
|
# Verify the visibility.
|
|
json = self.getJsonResponse(Repository,
|
|
params=dict(repository=self.SIMPLE_REPO))
|
|
|
|
self.assertEquals(True, json['is_public'])
|
|
|
|
# Make private.
|
|
self.postJsonResponse(RepositoryVisibility,
|
|
params=dict(repository=self.SIMPLE_REPO),
|
|
data=dict(visibility='private'))
|
|
|
|
# Verify the visibility.
|
|
json = self.getJsonResponse(Repository,
|
|
params=dict(repository=self.SIMPLE_REPO))
|
|
|
|
self.assertEquals(False, json['is_public'])
|
|
|
|
|
|
class TestDeleteRepository(ApiTestCase):
|
|
SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple'
|
|
COMPLEX_REPO = ADMIN_ACCESS_USER + '/complex'
|
|
|
|
def test_deleterepo(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.deleteResponse(Repository,
|
|
params=dict(repository=self.SIMPLE_REPO))
|
|
|
|
# Verify the repo was deleted.
|
|
self.getResponse(Repository,
|
|
params=dict(repository=self.SIMPLE_REPO),
|
|
expected_code=404)
|
|
|
|
def test_deleterepo2(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.deleteResponse(Repository,
|
|
params=dict(repository=self.COMPLEX_REPO))
|
|
|
|
# Verify the repo was deleted.
|
|
self.getResponse(Repository,
|
|
params=dict(repository=self.COMPLEX_REPO),
|
|
expected_code=404)
|
|
|
|
def test_populate_and_delete_repo(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Make sure the repository has come images and tags.
|
|
self.assertTrue(len(list(model.get_repository_images(ADMIN_ACCESS_USER, 'complex'))) > 0)
|
|
self.assertTrue(len(list(model.list_repository_tags(ADMIN_ACCESS_USER, 'complex'))) > 0)
|
|
|
|
# Add some data for the repository, in addition to is already existing images and tags.
|
|
repository = model.get_repository(ADMIN_ACCESS_USER, 'complex')
|
|
|
|
# Create some access tokens.
|
|
access_token = model.create_access_token(repository, 'read')
|
|
model.create_access_token(repository, 'write')
|
|
|
|
delegate_token = model.create_delegate_token(ADMIN_ACCESS_USER, 'complex', 'sometoken', 'read')
|
|
model.create_delegate_token(ADMIN_ACCESS_USER, 'complex', 'sometoken', 'write')
|
|
|
|
# Create some repository builds.
|
|
model.create_repository_build(repository, access_token, {}, 'someid', 'foobar')
|
|
model.create_repository_build(repository, delegate_token, {}, 'someid2', 'foobar2')
|
|
|
|
# Create some notifications.
|
|
model.create_repo_notification(repository, 'repo_push', 'hipchat', {})
|
|
model.create_repo_notification(repository, 'build_queued', 'slack', {})
|
|
|
|
# Create some logs.
|
|
model.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository)
|
|
model.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository)
|
|
|
|
# Create some build triggers.
|
|
user = model.get_user(ADMIN_ACCESS_USER)
|
|
model.create_build_trigger(repository, 'github', 'sometoken', user)
|
|
model.create_build_trigger(repository, 'github', 'anothertoken', user)
|
|
|
|
# Create some email authorizations.
|
|
model.create_email_authorization_for_repo(ADMIN_ACCESS_USER, 'complex', 'a@b.com')
|
|
model.create_email_authorization_for_repo(ADMIN_ACCESS_USER, 'complex', 'b@c.com')
|
|
|
|
# Delete the repository.
|
|
self.deleteResponse(Repository,
|
|
params=dict(repository=self.COMPLEX_REPO))
|
|
|
|
# Verify the repo was deleted.
|
|
self.getResponse(Repository,
|
|
params=dict(repository=self.COMPLEX_REPO),
|
|
expected_code=404)
|
|
|
|
|
|
class TestGetRepository(ApiTestCase):
|
|
PUBLIC_REPO = PUBLIC_USER + '/publicrepo'
|
|
|
|
def test_getrepo_badnames(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
bad_names = ['logs', 'build', 'tokens', 'foo.bar', 'foo-bar', 'foo_bar']
|
|
|
|
# For each bad name, create the repo.
|
|
for bad_name in bad_names:
|
|
json = self.postJsonResponse(RepositoryList, expected_code=201,
|
|
data=dict(repository=bad_name, visibility='public',
|
|
description=''))
|
|
|
|
# Make sure we can retrieve its information.
|
|
json = self.getJsonResponse(Repository,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/' + bad_name))
|
|
|
|
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
|
|
self.assertEquals(bad_name, json['name'])
|
|
self.assertEquals(True, json['is_public'])
|
|
|
|
|
|
def test_getrepo_public_asguest(self):
|
|
json = self.getJsonResponse(Repository,
|
|
params=dict(repository=self.PUBLIC_REPO))
|
|
|
|
self.assertEquals(PUBLIC_USER, json['namespace'])
|
|
self.assertEquals('publicrepo', json['name'])
|
|
|
|
self.assertEquals(True, json['is_public'])
|
|
self.assertEquals(False, json['is_organization'])
|
|
self.assertEquals(False, json['is_building'])
|
|
|
|
self.assertEquals(False, json['can_write'])
|
|
self.assertEquals(False, json['can_admin'])
|
|
|
|
assert 'latest' in json['tags']
|
|
|
|
def test_getrepo_public_asowner(self):
|
|
self.login(PUBLIC_USER)
|
|
|
|
json = self.getJsonResponse(Repository,
|
|
params=dict(repository=self.PUBLIC_REPO))
|
|
|
|
self.assertEquals(False, json['is_organization'])
|
|
self.assertEquals(True, json['can_write'])
|
|
self.assertEquals(True, json['can_admin'])
|
|
|
|
def test_getrepo_building(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(Repository,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
|
|
|
|
self.assertEquals(True, json['can_write'])
|
|
self.assertEquals(True, json['can_admin'])
|
|
self.assertEquals(True, json['is_building'])
|
|
self.assertEquals(False, json['is_organization'])
|
|
|
|
def test_getrepo_org_asnonmember(self):
|
|
self.getResponse(Repository,
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO),
|
|
expected_code=401)
|
|
|
|
def test_getrepo_org_asreader(self):
|
|
self.login(READ_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(Repository,
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO))
|
|
|
|
self.assertEquals(ORGANIZATION, json['namespace'])
|
|
self.assertEquals(ORG_REPO, json['name'])
|
|
|
|
self.assertEquals(False, json['can_write'])
|
|
self.assertEquals(False, json['can_admin'])
|
|
|
|
self.assertEquals(True, json['is_organization'])
|
|
|
|
def test_getrepo_org_asadmin(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(Repository,
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO))
|
|
|
|
self.assertEquals(True, json['can_write'])
|
|
self.assertEquals(True, json['can_admin'])
|
|
|
|
self.assertEquals(True, json['is_organization'])
|
|
|
|
|
|
|
|
class TestRepositoryBuildResource(ApiTestCase):
|
|
def test_cancel_invalidbuild(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.deleteResponse(RepositoryBuildResource,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid='invalid'),
|
|
expected_code=404)
|
|
|
|
def test_cancel_waitingbuild(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Request a (fake) build.
|
|
json = self.postJsonResponse(RepositoryBuildList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
|
|
data=dict(file_id='foobarbaz'),
|
|
expected_code=201)
|
|
|
|
uuid = json['id']
|
|
|
|
# Check for the build.
|
|
json = self.getJsonResponse(RepositoryBuildList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
|
|
self.assertEquals(1, len(json['builds']))
|
|
self.assertEquals(uuid, json['builds'][0]['id'])
|
|
|
|
# Find the build's queue item.
|
|
build_ref = database.RepositoryBuild.get(uuid=uuid)
|
|
queue_item = database.QueueItem.get(id=build_ref.queue_id)
|
|
|
|
self.assertTrue(queue_item.available)
|
|
self.assertTrue(queue_item.retries_remaining > 0)
|
|
|
|
# Cancel the build.
|
|
self.deleteResponse(RepositoryBuildResource,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid=uuid),
|
|
expected_code=201)
|
|
|
|
# Check for the build.
|
|
json = self.getJsonResponse(RepositoryBuildList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
|
|
self.assertEquals(0, len(json['builds']))
|
|
|
|
# Check for the build's queue item.
|
|
try:
|
|
database.QueueItem.get(id=build_ref.queue_id)
|
|
self.fail('QueueItem still exists for build')
|
|
except database.QueueItem.DoesNotExist:
|
|
pass
|
|
|
|
def test_attemptcancel_scheduledbuild(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Request a (fake) build.
|
|
json = self.postJsonResponse(RepositoryBuildList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
|
|
data=dict(file_id='foobarbaz'),
|
|
expected_code=201)
|
|
|
|
uuid = json['id']
|
|
|
|
# Check for the build.
|
|
json = self.getJsonResponse(RepositoryBuildList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
|
|
self.assertEquals(1, len(json['builds']))
|
|
self.assertEquals(uuid, json['builds'][0]['id'])
|
|
|
|
# Set queue item to be picked up.
|
|
build_ref = database.RepositoryBuild.get(uuid=uuid)
|
|
qi = database.QueueItem.get(id=build_ref.queue_id)
|
|
qi.available = False
|
|
qi.save()
|
|
|
|
# Try to cancel the build.
|
|
self.deleteResponse(RepositoryBuildResource,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid=uuid),
|
|
expected_code=400)
|
|
|
|
|
|
def test_attemptcancel_workingbuild(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Request a (fake) build.
|
|
json = self.postJsonResponse(RepositoryBuildList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
|
|
data=dict(file_id='foobarbaz'),
|
|
expected_code=201)
|
|
|
|
uuid = json['id']
|
|
|
|
# Check for the build.
|
|
json = self.getJsonResponse(RepositoryBuildList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
|
|
self.assertEquals(1, len(json['builds']))
|
|
self.assertEquals(uuid, json['builds'][0]['id'])
|
|
|
|
# Set the build to a different phase.
|
|
rb = database.RepositoryBuild.get(uuid=uuid)
|
|
rb.phase = database.BUILD_PHASE.BUILDING
|
|
rb.save()
|
|
|
|
# Try to cancel the build.
|
|
self.deleteResponse(RepositoryBuildResource,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid=uuid),
|
|
expected_code=400)
|
|
|
|
|
|
class TestRepoBuilds(ApiTestCase):
|
|
def test_getrepo_nobuilds(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(RepositoryBuildList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
|
|
assert len(json['builds']) == 0
|
|
|
|
def test_getrepobuilds(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(RepositoryBuildList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
|
|
|
|
assert len(json['builds']) > 0
|
|
build = json['builds'][-1]
|
|
|
|
assert 'id' in build
|
|
assert 'status' in build
|
|
|
|
# Check the status endpoint.
|
|
status_json = self.getJsonResponse(RepositoryBuildStatus,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/building', build_uuid=build['id']))
|
|
|
|
self.assertEquals(status_json['id'], build['id'])
|
|
self.assertEquals(status_json['resource_key'], build['resource_key'])
|
|
self.assertEquals(status_json['trigger'], build['trigger'])
|
|
|
|
class TestRequestRepoBuild(ApiTestCase):
|
|
def test_requestrepobuild(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Ensure we are not yet building.
|
|
json = self.getJsonResponse(RepositoryBuildList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
|
|
assert len(json['builds']) == 0
|
|
|
|
# Request a (fake) build.
|
|
self.postResponse(RepositoryBuildList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
|
|
data=dict(file_id='foobarbaz'),
|
|
expected_code=201)
|
|
|
|
# Check for the build.
|
|
json = self.getJsonResponse(RepositoryBuildList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
|
|
|
|
assert len(json['builds']) > 0
|
|
|
|
def test_requestrepobuild_with_robot(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Ensure we are not yet building.
|
|
json = self.getJsonResponse(RepositoryBuildList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
|
|
assert len(json['builds']) == 0
|
|
|
|
# Request a (fake) build.
|
|
pull_robot = ADMIN_ACCESS_USER + '+dtrobot'
|
|
self.postResponse(RepositoryBuildList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
|
|
data=dict(file_id='foobarbaz', pull_robot=pull_robot),
|
|
expected_code=201)
|
|
|
|
# Check for the build.
|
|
json = self.getJsonResponse(RepositoryBuildList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
|
|
|
|
assert len(json['builds']) > 0
|
|
|
|
|
|
def test_requestrepobuild_with_invalid_robot(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Request a (fake) build.
|
|
pull_robot = ADMIN_ACCESS_USER + '+invalidrobot'
|
|
self.postResponse(RepositoryBuildList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
|
|
data=dict(file_id='foobarbaz', pull_robot=pull_robot),
|
|
expected_code=404)
|
|
|
|
def test_requestrepobuild_with_unauthorized_robot(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Request a (fake) build.
|
|
pull_robot = 'freshuser+anotherrobot'
|
|
self.postResponse(RepositoryBuildList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
|
|
data=dict(file_id='foobarbaz', pull_robot=pull_robot),
|
|
expected_code=403)
|
|
|
|
|
|
class TestRepositoryEmail(ApiTestCase):
|
|
def test_emailnotauthorized(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Verify the e-mail address is not authorized.
|
|
json = self.getResponse(RepositoryAuthorizedEmail,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', email='test@example.com'),
|
|
expected_code=404)
|
|
|
|
def test_emailnotauthorized_butsent(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Verify the e-mail address is not authorized.
|
|
json = self.getJsonResponse(RepositoryAuthorizedEmail,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', email='jschorr+other@devtable.com'))
|
|
|
|
self.assertEquals(False, json['confirmed'])
|
|
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
|
|
self.assertEquals('simple', json['repository'])
|
|
|
|
|
|
def test_emailauthorized(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Verify the e-mail address is authorized.
|
|
json = self.getJsonResponse(RepositoryAuthorizedEmail,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', email='jschorr@devtable.com'))
|
|
|
|
self.assertEquals(True, json['confirmed'])
|
|
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
|
|
self.assertEquals('simple', json['repository'])
|
|
|
|
|
|
def test_send_email_authorization(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Send the email.
|
|
json = self.postJsonResponse(RepositoryAuthorizedEmail,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', email='jschorr+foo@devtable.com'))
|
|
|
|
self.assertEquals(False, json['confirmed'])
|
|
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
|
|
self.assertEquals('simple', json['repository'])
|
|
|
|
|
|
class TestRepositoryNotifications(ApiTestCase):
|
|
def test_webhooks(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Add a notification.
|
|
json = self.postJsonResponse(RepositoryNotificationList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
|
|
data=dict(config={'url': 'http://example.com'}, event='repo_push', method='webhook'),
|
|
expected_code=201)
|
|
|
|
self.assertEquals('repo_push', json['event'])
|
|
self.assertEquals('webhook', json['method'])
|
|
self.assertEquals('http://example.com', json['config']['url'])
|
|
wid = json['uuid']
|
|
|
|
# Get the notification.
|
|
json = self.getJsonResponse(RepositoryNotification,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=wid))
|
|
|
|
self.assertEquals(wid, json['uuid'])
|
|
self.assertEquals('repo_push', json['event'])
|
|
self.assertEquals('webhook', json['method'])
|
|
|
|
# Verify the notification is listed.
|
|
json = self.getJsonResponse(RepositoryNotificationList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
|
|
ids = [w['uuid'] for w in json['notifications']]
|
|
assert wid in ids
|
|
|
|
# Delete the notification.
|
|
self.deleteResponse(RepositoryNotification,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=wid),
|
|
expected_code=204)
|
|
|
|
# Verify the notification is gone.
|
|
self.getResponse(RepositoryNotification,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=wid),
|
|
expected_code=404)
|
|
|
|
|
|
class TestListAndGetImage(ApiTestCase):
|
|
def test_listandgetimages(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(RepositoryImageList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
|
|
assert len(json['images']) > 0
|
|
|
|
for image in json['images']:
|
|
assert 'id' in image
|
|
assert 'tags' in image
|
|
assert 'created' in image
|
|
assert 'comment' in image
|
|
assert 'command' in image
|
|
assert 'ancestors' in image
|
|
assert 'size' in image
|
|
|
|
ijson = self.getJsonResponse(RepositoryImage,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
|
|
image_id=image['id']))
|
|
|
|
self.assertEquals(image['id'], ijson['id'])
|
|
|
|
|
|
class TestGetImageChanges(ApiTestCase):
|
|
def test_getimagechanges(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Find an image to check.
|
|
json = self.getJsonResponse(RepositoryImageList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
|
|
image_id = json['images'][0]['id']
|
|
|
|
# Lookup the image's changes.
|
|
# TODO: Fix me once we can get fake changes into the test data
|
|
#self.getJsonResponse(RepositoryImageChanges,
|
|
# params=dict(repository=ADMIN_ACCESS_USER + '/simple',
|
|
# image_id=image_id))
|
|
|
|
|
|
class TestListAndDeleteTag(ApiTestCase):
|
|
def test_listdeletecreateandmovetag(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# List the images for prod.
|
|
json = self.getJsonResponse(RepositoryTagImages,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'))
|
|
|
|
prod_images = json['images']
|
|
assert len(prod_images) > 0
|
|
|
|
# List the images for staging.
|
|
json = self.getJsonResponse(RepositoryTagImages,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'))
|
|
|
|
staging_images = json['images']
|
|
assert len(prod_images) == len(staging_images) + 1
|
|
|
|
# Delete prod.
|
|
self.deleteResponse(RepositoryTag,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'),
|
|
expected_code=204)
|
|
|
|
# Make sure the tag is gone.
|
|
self.getResponse(RepositoryTagImages,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'),
|
|
expected_code=404)
|
|
|
|
# Make the sure the staging images are still there.
|
|
json = self.getJsonResponse(RepositoryTagImages,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'))
|
|
|
|
self.assertEquals(staging_images, json['images'])
|
|
|
|
# Add a new tag to the staging image.
|
|
self.putResponse(RepositoryTag,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'),
|
|
data=dict(image=staging_images[0]['id']),
|
|
expected_code=201)
|
|
|
|
# Make sure the tag is present.
|
|
json = self.getJsonResponse(RepositoryTagImages,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'))
|
|
|
|
sometag_images = json['images']
|
|
self.assertEquals(sometag_images, staging_images)
|
|
|
|
# Move the tag.
|
|
self.putResponse(RepositoryTag,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'),
|
|
data=dict(image=staging_images[-1]['id']),
|
|
expected_code=201)
|
|
|
|
# Make sure the tag has moved.
|
|
json = self.getJsonResponse(RepositoryTagImages,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'))
|
|
|
|
sometag_new_images = json['images']
|
|
self.assertEquals(1, len(sometag_new_images))
|
|
self.assertEquals(staging_images[-1], sometag_new_images[0])
|
|
|
|
|
|
def test_deletesubtag(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# List the images for prod.
|
|
json = self.getJsonResponse(RepositoryTagImages,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'))
|
|
|
|
prod_images = json['images']
|
|
assert len(prod_images) > 0
|
|
|
|
# Delete staging.
|
|
self.deleteResponse(RepositoryTag,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'),
|
|
expected_code=204)
|
|
|
|
# Make sure the prod images are still around.
|
|
json = self.getJsonResponse(RepositoryTagImages,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'))
|
|
|
|
self.assertEquals(prod_images, json['images'])
|
|
|
|
|
|
class TestRepoPermissions(ApiTestCase):
|
|
def listUserPermissions(self, namespace=ADMIN_ACCESS_USER, repo='simple'):
|
|
return self.getJsonResponse(RepositoryUserPermissionList,
|
|
params=dict(repository=namespace + '/' + repo))['permissions']
|
|
|
|
def listTeamPermissions(self):
|
|
return self.getJsonResponse(RepositoryTeamPermissionList,
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO))['permissions']
|
|
|
|
def test_userpermissions_underorg(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
permissions = self.listUserPermissions(namespace=ORGANIZATION, repo=ORG_REPO)
|
|
|
|
self.assertEquals(1, len(permissions))
|
|
assert 'outsideorg' in permissions
|
|
self.assertEquals('read', permissions['outsideorg']['role'])
|
|
self.assertEquals(False, permissions['outsideorg']['is_org_member'])
|
|
|
|
# Add another user.
|
|
self.putJsonResponse(RepositoryUserPermission,
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, username=ADMIN_ACCESS_USER),
|
|
data=dict(role='admin'))
|
|
|
|
# Verify the user is present.
|
|
permissions = self.listUserPermissions(namespace=ORGANIZATION, repo=ORG_REPO)
|
|
|
|
self.assertEquals(2, len(permissions))
|
|
assert ADMIN_ACCESS_USER in permissions
|
|
self.assertEquals('admin', permissions[ADMIN_ACCESS_USER]['role'])
|
|
self.assertEquals(True, permissions[ADMIN_ACCESS_USER]['is_org_member'])
|
|
|
|
|
|
def test_userpermissions(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# The repo should start with just the admin as a user perm.
|
|
permissions = self.listUserPermissions()
|
|
|
|
self.assertEquals(1, len(permissions))
|
|
assert ADMIN_ACCESS_USER in permissions
|
|
self.assertEquals('admin', permissions[ADMIN_ACCESS_USER]['role'])
|
|
self.assertFalse('is_org_member' in permissions[ADMIN_ACCESS_USER])
|
|
|
|
# Add another user.
|
|
self.putJsonResponse(RepositoryUserPermission,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER),
|
|
data=dict(role='read'))
|
|
|
|
# Verify the user is present.
|
|
permissions = self.listUserPermissions()
|
|
|
|
self.assertEquals(2, len(permissions))
|
|
assert NO_ACCESS_USER in permissions
|
|
self.assertEquals('read', permissions[NO_ACCESS_USER]['role'])
|
|
self.assertFalse('is_org_member' in permissions[NO_ACCESS_USER])
|
|
|
|
json = self.getJsonResponse(RepositoryUserPermission,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER))
|
|
self.assertEquals('read', json['role'])
|
|
|
|
# Change the user's permissions.
|
|
self.putJsonResponse(RepositoryUserPermission,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER),
|
|
data=dict(role='admin'))
|
|
|
|
# Verify.
|
|
permissions = self.listUserPermissions()
|
|
|
|
self.assertEquals(2, len(permissions))
|
|
assert NO_ACCESS_USER in permissions
|
|
self.assertEquals('admin', permissions[NO_ACCESS_USER]['role'])
|
|
|
|
# Delete the user's permission.
|
|
self.deleteResponse(RepositoryUserPermission,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER))
|
|
|
|
# Verify.
|
|
permissions = self.listUserPermissions()
|
|
|
|
self.assertEquals(1, len(permissions))
|
|
assert not NO_ACCESS_USER in permissions
|
|
|
|
|
|
def test_teampermissions(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# The repo should start with just the readers as a team perm.
|
|
permissions = self.listTeamPermissions()
|
|
|
|
self.assertEquals(1, len(permissions))
|
|
assert 'readers' in permissions
|
|
self.assertEquals('read', permissions['readers']['role'])
|
|
|
|
# Add another team.
|
|
self.putJsonResponse(RepositoryTeamPermission,
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'),
|
|
data=dict(role='write'))
|
|
|
|
# Verify the team is present.
|
|
permissions = self.listTeamPermissions()
|
|
|
|
self.assertEquals(2, len(permissions))
|
|
assert 'owners' in permissions
|
|
self.assertEquals('write', permissions['owners']['role'])
|
|
|
|
json = self.getJsonResponse(RepositoryTeamPermission,
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'))
|
|
self.assertEquals('write', json['role'])
|
|
|
|
# Change the team's permissions.
|
|
self.putJsonResponse(RepositoryTeamPermission,
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'),
|
|
data=dict(role='admin'))
|
|
|
|
# Verify.
|
|
permissions = self.listTeamPermissions()
|
|
|
|
self.assertEquals(2, len(permissions))
|
|
assert 'owners' in permissions
|
|
self.assertEquals('admin', permissions['owners']['role'])
|
|
|
|
# Delete the team's permission.
|
|
self.deleteResponse(RepositoryTeamPermission,
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'))
|
|
|
|
# Verify.
|
|
permissions = self.listTeamPermissions()
|
|
|
|
self.assertEquals(1, len(permissions))
|
|
assert not 'owners' in permissions
|
|
|
|
|
|
class TestApiTokens(ApiTestCase):
|
|
def listTokens(self):
|
|
return self.getJsonResponse(RepositoryTokenList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))['tokens']
|
|
|
|
def test_tokens(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Create a new token.
|
|
json = self.postJsonResponse(RepositoryTokenList,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
|
|
data=dict(role='read', friendlyName='mytoken'),
|
|
expected_code=201)
|
|
|
|
self.assertEquals('mytoken', json['friendlyName'])
|
|
self.assertEquals('read', json['role'])
|
|
token_code = json['code']
|
|
|
|
# Verify.
|
|
tokens = self.listTokens()
|
|
assert token_code in tokens
|
|
self.assertEquals('mytoken', tokens[token_code]['friendlyName'])
|
|
|
|
json = self.getJsonResponse(RepositoryToken,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code))
|
|
self.assertEquals(tokens[token_code], json)
|
|
|
|
# Change the token's permission.
|
|
self.putJsonResponse(RepositoryToken,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code),
|
|
data=dict(role='write'))
|
|
|
|
# Verify.
|
|
json = self.getJsonResponse(RepositoryToken,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code))
|
|
self.assertEquals('write', json['role'])
|
|
|
|
# Delete the token.
|
|
self.deleteResponse(RepositoryToken,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code))
|
|
|
|
# Verify.
|
|
self.getResponse(RepositoryToken,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code),
|
|
expected_code=404)
|
|
|
|
|
|
class TestUserCard(ApiTestCase):
|
|
def test_getusercard(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.getJsonResponse(UserCard)
|
|
|
|
self.assertEquals('4242', json['card']['last4'])
|
|
self.assertEquals('Visa', json['card']['type'])
|
|
|
|
def test_setusercard_error(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.postJsonResponse(UserCard,
|
|
data=dict(token='sometoken'),
|
|
expected_code=402)
|
|
assert 'carderror' in json
|
|
|
|
|
|
class TestOrgCard(ApiTestCase):
|
|
def test_getorgcard(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.getJsonResponse(OrganizationCard,
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
self.assertEquals('4242', json['card']['last4'])
|
|
self.assertEquals('Visa', json['card']['type'])
|
|
|
|
|
|
class TestUserSubscription(ApiTestCase):
|
|
def getSubscription(self):
|
|
return self.getJsonResponse(UserPlan)
|
|
|
|
def test_updateplan(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Change the plan.
|
|
self.putJsonResponse(UserPlan,
|
|
data=dict(plan='free'))
|
|
|
|
# Verify
|
|
sub = self.getSubscription()
|
|
self.assertEquals('free', sub['plan'])
|
|
|
|
# Change the plan.
|
|
self.putJsonResponse(UserPlan,
|
|
data=dict(plan='bus-large'))
|
|
|
|
# Verify
|
|
sub = self.getSubscription()
|
|
self.assertEquals('bus-large', sub['plan'])
|
|
|
|
|
|
class TestOrgSubscription(ApiTestCase):
|
|
def getSubscription(self):
|
|
return self.getJsonResponse(OrganizationPlan, params=dict(orgname=ORGANIZATION))
|
|
|
|
def test_updateplan(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Change the plan.
|
|
self.putJsonResponse(OrganizationPlan,
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(plan='free'))
|
|
|
|
# Verify
|
|
sub = self.getSubscription()
|
|
self.assertEquals('free', sub['plan'])
|
|
|
|
# Change the plan.
|
|
self.putJsonResponse(OrganizationPlan,
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(plan='bus-large'))
|
|
|
|
# Verify
|
|
sub = self.getSubscription()
|
|
self.assertEquals('bus-large', sub['plan'])
|
|
|
|
|
|
class TestUserRobots(ApiTestCase):
|
|
def getRobotNames(self):
|
|
return [r['name'] for r in self.getJsonResponse(UserRobotList)['robots']]
|
|
|
|
def test_robots(self):
|
|
self.login(NO_ACCESS_USER)
|
|
|
|
# Create a robot.
|
|
json = self.putJsonResponse(UserRobot,
|
|
params=dict(robot_shortname='bender'),
|
|
expected_code=201)
|
|
|
|
self.assertEquals(NO_ACCESS_USER + '+bender', json['name'])
|
|
|
|
# Verify.
|
|
robots = self.getRobotNames()
|
|
assert NO_ACCESS_USER + '+bender' in robots
|
|
|
|
# Delete the robot.
|
|
self.deleteResponse(UserRobot,
|
|
params=dict(robot_shortname='bender'))
|
|
|
|
# Verify.
|
|
robots = self.getRobotNames()
|
|
assert not NO_ACCESS_USER + '+bender' in robots
|
|
|
|
def test_regenerate(self):
|
|
self.login(NO_ACCESS_USER)
|
|
|
|
# Create a robot.
|
|
json = self.putJsonResponse(UserRobot,
|
|
params=dict(robot_shortname='bender'),
|
|
expected_code=201)
|
|
|
|
token = json['token']
|
|
|
|
# Regenerate the robot.
|
|
json = self.postJsonResponse(RegenerateUserRobot,
|
|
params=dict(robot_shortname='bender'),
|
|
expected_code=200)
|
|
|
|
# Verify the token changed.
|
|
self.assertNotEquals(token, json['token'])
|
|
|
|
json2 = self.getJsonResponse(UserRobot,
|
|
params=dict(robot_shortname='bender'),
|
|
expected_code=200)
|
|
|
|
self.assertEquals(json['token'], json2['token'])
|
|
|
|
|
|
class TestOrgRobots(ApiTestCase):
|
|
def getRobotNames(self):
|
|
return [r['name'] for r in self.getJsonResponse(OrgRobotList,
|
|
params=dict(orgname=ORGANIZATION))['robots']]
|
|
|
|
def test_delete_robot_after_use(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Create the robot.
|
|
self.putJsonResponse(OrgRobot,
|
|
params=dict(orgname=ORGANIZATION, robot_shortname='bender'),
|
|
expected_code=201)
|
|
|
|
# Add the robot to a team.
|
|
membername = ORGANIZATION + '+bender'
|
|
self.putJsonResponse(TeamMember,
|
|
params=dict(orgname=ORGANIZATION, teamname='readers',
|
|
membername=membername))
|
|
|
|
# Add a repository permission.
|
|
self.putJsonResponse(RepositoryUserPermission,
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, username=membername),
|
|
data=dict(role='read'))
|
|
|
|
# Add a permission prototype with the robot as the activating user.
|
|
self.postJsonResponse(PermissionPrototypeList,
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(role='read',
|
|
activating_user={'name': membername},
|
|
delegate={'kind': 'user',
|
|
'name': membername}))
|
|
|
|
# Add a permission prototype with the robot as the delegating user.
|
|
self.postJsonResponse(PermissionPrototypeList,
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(role='read',
|
|
delegate={'kind': 'user',
|
|
'name': membername}))
|
|
|
|
# Add a build trigger with the robot as the pull robot.
|
|
database.BuildTriggerService.create(name='fakeservice')
|
|
|
|
# Add a new fake trigger.
|
|
repo = model.get_repository(ORGANIZATION, ORG_REPO)
|
|
user = model.get_user(ADMIN_ACCESS_USER)
|
|
pull_robot = model.get_user(membername)
|
|
model.create_build_trigger(repo, 'fakeservice', 'sometoken', user, pull_robot=pull_robot)
|
|
|
|
# Add some log entries for the robot.
|
|
model.log_action('pull_repo', ORGANIZATION, performer=pull_robot, repository=repo)
|
|
|
|
# Delete the robot and verify it works.
|
|
self.deleteResponse(OrgRobot,
|
|
params=dict(orgname=ORGANIZATION, robot_shortname='bender'))
|
|
|
|
# All the above records should now be deleted, along with the robot. We verify a few of the
|
|
# critical ones below.
|
|
|
|
# Check the team.
|
|
team = model.get_organization_team(ORGANIZATION, 'readers')
|
|
members = [member.username for member in model.get_organization_team_members(team.id)]
|
|
self.assertFalse(membername in members)
|
|
|
|
# Check the robot itself.
|
|
self.assertIsNone(model.get_user(membername))
|
|
|
|
|
|
def test_robots(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Create a robot.
|
|
json = self.putJsonResponse(OrgRobot,
|
|
params=dict(orgname=ORGANIZATION, robot_shortname='bender'),
|
|
expected_code=201)
|
|
|
|
self.assertEquals(ORGANIZATION + '+bender', json['name'])
|
|
|
|
# Verify.
|
|
robots = self.getRobotNames()
|
|
assert ORGANIZATION + '+bender' in robots
|
|
|
|
# Delete the robot.
|
|
self.deleteResponse(OrgRobot,
|
|
params=dict(orgname=ORGANIZATION, robot_shortname='bender'))
|
|
|
|
# Verify.
|
|
robots = self.getRobotNames()
|
|
assert not ORGANIZATION + '+bender' in robots
|
|
|
|
|
|
def test_regenerate(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Create a robot.
|
|
json = self.putJsonResponse(OrgRobot,
|
|
params=dict(orgname=ORGANIZATION, robot_shortname='bender'),
|
|
expected_code=201)
|
|
|
|
token = json['token']
|
|
|
|
# Regenerate the robot.
|
|
json = self.postJsonResponse(RegenerateOrgRobot,
|
|
params=dict(orgname=ORGANIZATION, robot_shortname='bender'),
|
|
expected_code=200)
|
|
|
|
# Verify the token changed.
|
|
self.assertNotEquals(token, json['token'])
|
|
|
|
json2 = self.getJsonResponse(OrgRobot,
|
|
params=dict(orgname=ORGANIZATION, robot_shortname='bender'),
|
|
expected_code=200)
|
|
|
|
self.assertEquals(json['token'], json2['token'])
|
|
|
|
|
|
class TestLogs(ApiTestCase):
|
|
def test_user_logs(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(UserLogs)
|
|
assert 'logs' in json
|
|
assert 'start_time' in json
|
|
assert 'end_time' in json
|
|
|
|
def test_org_logs(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(OrgLogs, params=dict(orgname=ORGANIZATION))
|
|
assert 'logs' in json
|
|
assert 'start_time' in json
|
|
assert 'end_time' in json
|
|
|
|
def test_performer(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(OrgLogs, params=dict(orgname=ORGANIZATION))
|
|
all_logs = json['logs']
|
|
|
|
json = self.getJsonResponse(OrgLogs,
|
|
params=dict(performer=READ_ACCESS_USER, orgname=ORGANIZATION))
|
|
|
|
assert len(json['logs']) < len(all_logs)
|
|
for log in json['logs']:
|
|
self.assertEquals(READ_ACCESS_USER, log['performer']['name'])
|
|
|
|
|
|
class TestApplicationInformation(ApiTestCase):
|
|
def test_get_info(self):
|
|
json = self.getJsonResponse(ApplicationInformation, params=dict(client_id=FAKE_APPLICATION_CLIENT_ID))
|
|
assert 'name' in json
|
|
assert 'uri' in json
|
|
assert 'organization' in json
|
|
|
|
def test_get_invalid_info(self):
|
|
self.getJsonResponse(ApplicationInformation, params=dict(client_id='invalid-code'),
|
|
expected_code=404)
|
|
|
|
|
|
class TestOrganizationApplications(ApiTestCase):
|
|
def test_list_create_applications(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION))
|
|
|
|
self.assertEquals(2, len(json['applications']))
|
|
|
|
found = False
|
|
for application in json['applications']:
|
|
if application['client_id'] == FAKE_APPLICATION_CLIENT_ID:
|
|
found = True
|
|
break
|
|
|
|
self.assertTrue(found)
|
|
|
|
# Add a new application.
|
|
json = self.postJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION),
|
|
data=dict(name="Some cool app", description="foo"))
|
|
|
|
self.assertEquals("Some cool app", json['name'])
|
|
self.assertEquals("foo", json['description'])
|
|
|
|
# Retrieve the apps list again
|
|
list_json = self.getJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION))
|
|
self.assertEquals(3, len(list_json['applications']))
|
|
|
|
|
|
class TestOrganizationApplicationResource(ApiTestCase):
|
|
def test_get_edit_delete_application(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Retrieve the application.
|
|
json = self.getJsonResponse(OrganizationApplicationResource,
|
|
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
|
|
|
|
self.assertEquals(FAKE_APPLICATION_CLIENT_ID, json['client_id'])
|
|
|
|
# Edit the application.
|
|
edit_json = self.putJsonResponse(OrganizationApplicationResource,
|
|
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID),
|
|
data=dict(name="Some App", description="foo", application_uri="bar",
|
|
redirect_uri="baz", avatar_email="meh"))
|
|
|
|
self.assertEquals(FAKE_APPLICATION_CLIENT_ID, edit_json['client_id'])
|
|
self.assertEquals("Some App", edit_json['name'])
|
|
self.assertEquals("foo", edit_json['description'])
|
|
self.assertEquals("bar", edit_json['application_uri'])
|
|
self.assertEquals("baz", edit_json['redirect_uri'])
|
|
self.assertEquals("meh", edit_json['avatar_email'])
|
|
|
|
# Retrieve the application again.
|
|
json = self.getJsonResponse(OrganizationApplicationResource,
|
|
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
|
|
|
|
self.assertEquals(json, edit_json)
|
|
|
|
# Delete the application.
|
|
self.deleteResponse(OrganizationApplicationResource,
|
|
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
|
|
|
|
# Make sure the application is gone.
|
|
self.getJsonResponse(OrganizationApplicationResource,
|
|
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID),
|
|
expected_code=404)
|
|
|
|
|
|
class TestOrganizationApplicationResetClientSecret(ApiTestCase):
|
|
def test_reset_client_secret(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Retrieve the application.
|
|
json = self.getJsonResponse(OrganizationApplicationResource,
|
|
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
|
|
|
|
self.assertEquals(FAKE_APPLICATION_CLIENT_ID, json['client_id'])
|
|
|
|
# Reset the client secret.
|
|
reset_json = self.postJsonResponse(OrganizationApplicationResetClientSecret,
|
|
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
|
|
|
|
self.assertEquals(FAKE_APPLICATION_CLIENT_ID, reset_json['client_id'])
|
|
self.assertNotEquals(reset_json['client_secret'], json['client_secret'])
|
|
|
|
# Verify it was changed in the DB.
|
|
json = self.getJsonResponse(OrganizationApplicationResource,
|
|
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
|
|
self.assertEquals(reset_json['client_secret'], json['client_secret'])
|
|
|
|
|
|
|
|
class FakeBuildTrigger(BuildTriggerBase):
|
|
@classmethod
|
|
def service_name(cls):
|
|
return 'fakeservice'
|
|
|
|
def list_build_sources(self, auth_token):
|
|
return [{'first': 'source'}, {'second': auth_token}]
|
|
|
|
def list_build_subdirs(self, auth_token, config):
|
|
return [auth_token, 'foo', 'bar', config['somevalue']]
|
|
|
|
def handle_trigger_request(self, request, auth_token, config):
|
|
return ('foo', ['bar'], 'build-name', 'subdir', {'foo': 'bar'})
|
|
|
|
def is_active(self, config):
|
|
return 'active' in config and config['active']
|
|
|
|
def activate(self, trigger_uuid, standard_webhook_url, auth_token, config):
|
|
config['active'] = True
|
|
return config
|
|
|
|
def deactivate(self, auth_token, config):
|
|
config['active'] = False
|
|
return config
|
|
|
|
def manual_start(self, auth_token, config, run_parameters=None):
|
|
return ('foo', ['bar'], 'build-name', 'subdir', {'foo': 'bar'})
|
|
|
|
def dockerfile_url(self, auth_token, config):
|
|
return 'http://some/url'
|
|
|
|
def load_dockerfile_contents(self, auth_token, config):
|
|
if not 'dockerfile' in config:
|
|
return None
|
|
|
|
return config['dockerfile']
|
|
|
|
def list_field_values(self, auth_token, config, field_name):
|
|
if field_name == 'test_field':
|
|
return [1, 2, 3]
|
|
|
|
return None
|
|
|
|
|
|
class TestBuildTriggers(ApiTestCase):
|
|
def test_list_build_triggers(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Check a repo with no known triggers.
|
|
json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
self.assertEquals(0, len(json['triggers']))
|
|
|
|
# Check a repo with one known trigger.
|
|
json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building'))
|
|
self.assertEquals(1, len(json['triggers']))
|
|
|
|
trigger = json['triggers'][0]
|
|
|
|
assert 'id' in trigger
|
|
assert 'is_active' in trigger
|
|
assert 'config' in trigger
|
|
assert 'service' in trigger
|
|
|
|
# Verify the get trigger method.
|
|
trigger_json = self.getJsonResponse(BuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/building',
|
|
trigger_uuid=trigger['id']))
|
|
|
|
self.assertEquals(trigger, trigger_json)
|
|
|
|
# Check the recent builds for the trigger.
|
|
builds_json = self.getJsonResponse(TriggerBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/building',
|
|
trigger_uuid=trigger['id']))
|
|
|
|
assert 'builds' in builds_json
|
|
|
|
def test_delete_build_trigger(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building'))
|
|
self.assertEquals(1, len(json['triggers']))
|
|
trigger = json['triggers'][0]
|
|
|
|
# Delete the trigger.
|
|
self.deleteResponse(BuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/building',
|
|
trigger_uuid=trigger['id']))
|
|
|
|
# Verify it was deleted.
|
|
json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building'))
|
|
self.assertEquals(0, len(json['triggers']))
|
|
|
|
|
|
def test_analyze_fake_trigger(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
database.BuildTriggerService.create(name='fakeservice')
|
|
|
|
# Add a new fake trigger.
|
|
repo = model.get_repository(ADMIN_ACCESS_USER, 'simple')
|
|
user = model.get_user(ADMIN_ACCESS_USER)
|
|
trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
|
|
|
|
# Analyze the trigger's dockerfile: First, no dockerfile.
|
|
trigger_config = {}
|
|
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data={'config': trigger_config})
|
|
|
|
self.assertEquals('error', analyze_json['status'])
|
|
self.assertEquals('Could not read the Dockerfile for the trigger', analyze_json['message'])
|
|
|
|
# Analyze the trigger's dockerfile: Second, missing FROM in dockerfile.
|
|
trigger_config = {'dockerfile': 'MAINTAINER me'}
|
|
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data={'config': trigger_config})
|
|
|
|
self.assertEquals('warning', analyze_json['status'])
|
|
self.assertEquals('No FROM line found in the Dockerfile', analyze_json['message'])
|
|
|
|
# Analyze the trigger's dockerfile: Third, dockerfile with public repo.
|
|
trigger_config = {'dockerfile': 'FROM somerepo'}
|
|
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data={'config': trigger_config})
|
|
|
|
self.assertEquals('publicbase', analyze_json['status'])
|
|
|
|
# Analyze the trigger's dockerfile: Fourth, dockerfile with private repo with an invalid path.
|
|
trigger_config = {'dockerfile': 'FROM localhost:5000/somepath'}
|
|
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data={'config': trigger_config})
|
|
|
|
self.assertEquals('warning', analyze_json['status'])
|
|
self.assertEquals('"localhost:5000/somepath" is not a valid Quay repository path', analyze_json['message'])
|
|
|
|
# Analyze the trigger's dockerfile: Fifth, dockerfile with private repo that does not exist.
|
|
trigger_config = {'dockerfile': 'FROM localhost:5000/nothere/randomrepo'}
|
|
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data={'config': trigger_config})
|
|
|
|
self.assertEquals('error', analyze_json['status'])
|
|
self.assertEquals('Repository "localhost:5000/nothere/randomrepo" referenced by the Dockerfile was not found', analyze_json['message'])
|
|
|
|
# Analyze the trigger's dockerfile: Sixth, dockerfile with private repo that the user cannot see.
|
|
trigger_config = {'dockerfile': 'FROM localhost:5000/randomuser/randomrepo'}
|
|
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data={'config': trigger_config})
|
|
|
|
self.assertEquals('error', analyze_json['status'])
|
|
self.assertEquals('Repository "localhost:5000/randomuser/randomrepo" referenced by the Dockerfile was not found', analyze_json['message'])
|
|
|
|
# Analyze the trigger's dockerfile: Seventh, dockerfile with private repo that the user see.
|
|
trigger_config = {'dockerfile': 'FROM localhost:5000/devtable/complex'}
|
|
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data={'config': trigger_config})
|
|
|
|
self.assertEquals('analyzed', analyze_json['status'])
|
|
self.assertEquals('devtable', analyze_json['namespace'])
|
|
self.assertEquals('complex', analyze_json['name'])
|
|
self.assertEquals(False, analyze_json['is_public'])
|
|
self.assertEquals(ADMIN_ACCESS_USER + '+dtrobot', analyze_json['robots'][0]['name'])
|
|
|
|
|
|
def test_fake_trigger(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
database.BuildTriggerService.create(name='fakeservice')
|
|
|
|
# Add a new fake trigger.
|
|
repo = model.get_repository(ADMIN_ACCESS_USER, 'simple')
|
|
user = model.get_user(ADMIN_ACCESS_USER)
|
|
trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
|
|
|
|
# Verify the trigger.
|
|
json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
self.assertEquals(1, len(json['triggers']))
|
|
self.assertEquals(trigger.uuid, json['triggers'][0]['id'])
|
|
self.assertEquals(trigger.service.name, json['triggers'][0]['service'])
|
|
self.assertEquals(False, json['triggers'][0]['is_active'])
|
|
|
|
# List the trigger's sources.
|
|
source_json = self.getJsonResponse(BuildTriggerSources, params=dict(repository=ADMIN_ACCESS_USER + '/simple',
|
|
trigger_uuid=trigger.uuid))
|
|
self.assertEquals([{'first': 'source'}, {'second': 'sometoken'}], source_json['sources'])
|
|
|
|
# List the trigger's subdirs.
|
|
subdir_json = self.postJsonResponse(BuildTriggerSubdirs,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data={'somevalue': 'meh'})
|
|
|
|
self.assertEquals({'status': 'success', 'subdir': ['sometoken', 'foo', 'bar', 'meh']}, subdir_json)
|
|
|
|
# Activate the trigger.
|
|
trigger_config = {}
|
|
activate_json = self.postJsonResponse(BuildTriggerActivate,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data={'config': trigger_config})
|
|
|
|
self.assertEquals(True, activate_json['is_active'])
|
|
|
|
# Make sure the trigger has a write token.
|
|
trigger = model.get_build_trigger(trigger.uuid)
|
|
self.assertNotEquals(None, trigger.write_token)
|
|
self.assertEquals(True, py_json.loads(trigger.config)['active'])
|
|
|
|
# Make sure we cannot activate again.
|
|
self.postResponse(BuildTriggerActivate,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data={'config': trigger_config},
|
|
expected_code=400)
|
|
|
|
# Retrieve values for a field.
|
|
result = self.postJsonResponse(BuildTriggerFieldValues,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
|
|
trigger_uuid=trigger.uuid, field_name="test_field"))
|
|
|
|
self.assertEquals(result['values'], [1, 2, 3])
|
|
|
|
self.postResponse(BuildTriggerFieldValues,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
|
|
trigger_uuid=trigger.uuid, field_name="another_field"),
|
|
expected_code = 404)
|
|
|
|
# Start a manual build.
|
|
start_json = self.postJsonResponse(ActivateBuildTrigger,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data=dict(),
|
|
expected_code=201)
|
|
|
|
assert 'id' in start_json
|
|
self.assertEquals("build-name", start_json['display_name'])
|
|
self.assertEquals(['bar'], start_json['job_config']['docker_tags'])
|
|
|
|
# Verify the metadata was added.
|
|
build_obj = database.RepositoryBuild.get(database.RepositoryBuild.uuid == start_json['id'])
|
|
self.assertEquals('bar', py_json.loads(build_obj.job_config)['trigger_metadata']['foo'])
|
|
|
|
def test_invalid_robot_account(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
database.BuildTriggerService.create(name='fakeservice')
|
|
|
|
# Add a new fake trigger.
|
|
repo = model.get_repository(ADMIN_ACCESS_USER, 'simple')
|
|
user = model.get_user(ADMIN_ACCESS_USER)
|
|
trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
|
|
|
|
# Try to activate it with an invalid robot account.
|
|
trigger_config = {}
|
|
activate_json = self.postJsonResponse(BuildTriggerActivate,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data={'config': trigger_config, 'pull_robot': 'someinvalidrobot'},
|
|
expected_code=404)
|
|
|
|
def test_unauthorized_robot_account(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
database.BuildTriggerService.create(name='fakeservice')
|
|
|
|
# Add a new fake trigger.
|
|
repo = model.get_repository(ADMIN_ACCESS_USER, 'simple')
|
|
user = model.get_user(ADMIN_ACCESS_USER)
|
|
trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
|
|
|
|
# Try to activate it with a robot account in the wrong namespace.
|
|
trigger_config = {}
|
|
activate_json = self.postJsonResponse(BuildTriggerActivate,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data={'config': trigger_config, 'pull_robot': 'freshuser+anotherrobot'},
|
|
expected_code=403)
|
|
|
|
def test_robot_account(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
database.BuildTriggerService.create(name='fakeservice')
|
|
|
|
# Add a new fake trigger.
|
|
repo = model.get_repository(ADMIN_ACCESS_USER, 'simple')
|
|
user = model.get_user(ADMIN_ACCESS_USER)
|
|
trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
|
|
|
|
# Try to activate it with a robot account.
|
|
trigger_config = {}
|
|
activate_json = self.postJsonResponse(BuildTriggerActivate,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data={'config': trigger_config, 'pull_robot': ADMIN_ACCESS_USER + '+dtrobot'})
|
|
|
|
# Verify that the robot was saved.
|
|
self.assertEquals(True, activate_json['is_active'])
|
|
self.assertEquals(ADMIN_ACCESS_USER + '+dtrobot', activate_json['pull_robot']['name'])
|
|
|
|
# Start a manual build.
|
|
start_json = self.postJsonResponse(ActivateBuildTrigger,
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data=dict(),
|
|
expected_code=201)
|
|
|
|
assert 'id' in start_json
|
|
self.assertEquals("build-name", start_json['display_name'])
|
|
self.assertEquals(['bar'], start_json['job_config']['docker_tags'])
|
|
|
|
|
|
class TestUserAuthorizations(ApiTestCase):
|
|
def test_list_get_delete_user_authorizations(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(UserAuthorizationList)
|
|
|
|
self.assertEquals(1, len(json['authorizations']))
|
|
|
|
authorization = json['authorizations'][0]
|
|
|
|
assert 'uuid' in authorization
|
|
assert 'scopes' in authorization
|
|
assert 'application' in authorization
|
|
|
|
# Retrieve the authorization.
|
|
get_json = self.getJsonResponse(UserAuthorization, params=dict(access_token_uuid = authorization['uuid']))
|
|
self.assertEquals(authorization, get_json)
|
|
|
|
# Delete the authorization.
|
|
self.deleteResponse(UserAuthorization, params=dict(access_token_uuid = authorization['uuid']))
|
|
|
|
# Verify it has been deleted.
|
|
self.getJsonResponse(UserAuthorization, params=dict(access_token_uuid = authorization['uuid']),
|
|
expected_code=404)
|
|
|
|
|
|
class TestSuperUserLogs(ApiTestCase):
|
|
def test_get_logs(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(SuperUserLogs)
|
|
|
|
assert 'logs' in json
|
|
assert len(json['logs']) > 0
|
|
|
|
|
|
class TestSuperUserList(ApiTestCase):
|
|
def test_get_users(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(SuperUserList)
|
|
|
|
assert 'users' in json
|
|
assert len(json['users']) > 0
|
|
|
|
|
|
class TestUsageInformation(ApiTestCase):
|
|
def test_get_usage(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.getJsonResponse(UsageInformation)
|
|
|
|
assert 'usage' in json
|
|
assert 'allowed' in json
|
|
|
|
|
|
class TestSuperUserManagement(ApiTestCase):
|
|
def test_get_user(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse(SuperUserManagement, params=dict(username = 'freshuser'))
|
|
self.assertEquals('freshuser', json['username'])
|
|
self.assertEquals('jschorr+test@devtable.com', json['email'])
|
|
self.assertEquals(False, json['super_user'])
|
|
|
|
def test_delete_user(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Verify the user exists.
|
|
json = self.getJsonResponse(SuperUserManagement, params=dict(username = 'freshuser'))
|
|
self.assertEquals('freshuser', json['username'])
|
|
|
|
# Delete the user.
|
|
self.deleteResponse(SuperUserManagement, params=dict(username = 'freshuser'), expected_code=204)
|
|
|
|
# Verify the user no longer exists.
|
|
self.getResponse(SuperUserManagement, params=dict(username = 'freshuser'), expected_code=404)
|
|
|
|
|
|
def test_update_user(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Verify the user exists.
|
|
json = self.getJsonResponse(SuperUserManagement, params=dict(username = 'freshuser'))
|
|
self.assertEquals('freshuser', json['username'])
|
|
self.assertEquals('jschorr+test@devtable.com', json['email'])
|
|
|
|
# Update the user.
|
|
self.putJsonResponse(SuperUserManagement, params=dict(username='freshuser'), data=dict(email='foo@bar.com'))
|
|
|
|
# Verify the user was updated.
|
|
json = self.getJsonResponse(SuperUserManagement, params=dict(username = 'freshuser'))
|
|
self.assertEquals('freshuser', json['username'])
|
|
self.assertEquals('foo@bar.com', json['email'])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|