1508 lines
53 KiB
Python
1508 lines
53 KiB
Python
import unittest
|
|
import json as py_json
|
|
|
|
from flask import url_for
|
|
from endpoints.api import api
|
|
from endpoints.webhooks import webhooks
|
|
from endpoints.trigger import BuildTrigger
|
|
from app import app
|
|
from initdb import setup_database_for_testing, finished_database_for_testing
|
|
from data import model, database
|
|
|
|
app.register_blueprint(api, url_prefix='/api')
|
|
app.register_blueprint(webhooks, url_prefix='/webhooks')
|
|
|
|
NO_ACCESS_USER = 'freshuser'
|
|
READ_ACCESS_USER = 'reader'
|
|
ADMIN_ACCESS_USER = 'devtable'
|
|
PUBLIC_USER = 'public'
|
|
|
|
ORG_REPO = 'orgrepo'
|
|
|
|
ORGANIZATION = 'buynlarge'
|
|
|
|
NEW_USER_DETAILS = {
|
|
'username': 'bobby',
|
|
'password': 'password',
|
|
'email': 'bobby@tables.com',
|
|
}
|
|
|
|
|
|
class ApiTestCase(unittest.TestCase):
|
|
def setUp(self):
|
|
setup_database_for_testing(self)
|
|
self.app = app.test_client()
|
|
self.ctx = app.test_request_context()
|
|
self.ctx.__enter__()
|
|
|
|
def tearDown(self):
|
|
finished_database_for_testing(self)
|
|
self.ctx.__exit__(True, None, None)
|
|
|
|
def getJsonResponse(self, method_name, params={}):
|
|
rv = self.app.get(url_for(method_name, **params))
|
|
self.assertEquals(200, rv.status_code)
|
|
data = rv.data
|
|
parsed = py_json.loads(data)
|
|
return parsed
|
|
|
|
def postResponse(self, method_name, params={}, data={}, expected_code=200):
|
|
rv = self.app.post(url_for(method_name, **params),
|
|
data=py_json.dumps(data),
|
|
headers={"Content-Type": "application/json"})
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
return rv.data
|
|
|
|
def getResponse(self, method_name, params={}, expected_code=200):
|
|
rv = self.app.get(url_for(method_name, **params))
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
return rv.data
|
|
|
|
def deleteResponse(self, method_name, params={}, expected_code=204):
|
|
rv = self.app.delete(url_for(method_name, **params))
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
return rv.data
|
|
|
|
def postJsonResponse(self, method_name, params={}, data={},
|
|
expected_code=200):
|
|
rv = self.app.post(url_for(method_name, **params),
|
|
data=py_json.dumps(data),
|
|
headers={"Content-Type": "application/json"})
|
|
|
|
if rv.status_code != expected_code:
|
|
print 'Mismatch data for method %s: %s' % (method_name, rv.data)
|
|
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
data = rv.data
|
|
parsed = py_json.loads(data)
|
|
return parsed
|
|
|
|
def putJsonResponse(self, method_name, params={}, data={},
|
|
expected_code=200):
|
|
rv = self.app.put(url_for(method_name, **params), data=py_json.dumps(data),
|
|
headers={"Content-Type": "application/json"})
|
|
|
|
if rv.status_code != expected_code:
|
|
print 'Mismatch data for method %s: %s' % (method_name, rv.data)
|
|
|
|
self.assertEquals(rv.status_code, expected_code)
|
|
data = rv.data
|
|
parsed = py_json.loads(data)
|
|
return parsed
|
|
|
|
def login(self, username, password='password'):
|
|
return self.postJsonResponse('api.signin_user',
|
|
data=dict(username=username,
|
|
password=password))
|
|
|
|
|
|
class TestDiscovery(ApiTestCase):
|
|
def test_discovery(self):
|
|
json = self.getJsonResponse('api.discovery')
|
|
found = set([])
|
|
for method_info in json['endpoints']:
|
|
found.add(method_info['name'])
|
|
|
|
assert 'discovery' in found
|
|
|
|
|
|
class TestPlans(ApiTestCase):
|
|
def test_plans(self):
|
|
json = self.getJsonResponse('api.list_plans')
|
|
found = set([])
|
|
for method_info in json['plans']:
|
|
found.add(method_info['stripeId'])
|
|
|
|
assert 'free' in found
|
|
|
|
|
|
class TestLoggedInUser(ApiTestCase):
|
|
def test_guest(self):
|
|
json = self.getJsonResponse('api.get_logged_in_user')
|
|
assert json['anonymous'] == True
|
|
|
|
def test_user(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.getJsonResponse('api.get_logged_in_user')
|
|
assert json['anonymous'] == False
|
|
assert json['username'] == READ_ACCESS_USER
|
|
|
|
|
|
class TestGetUserPrivateAllowed(ApiTestCase):
|
|
def test_nonallowed(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.getJsonResponse('api.get_user_private_allowed')
|
|
assert json['privateCount'] == 0
|
|
assert not json['privateAllowed']
|
|
|
|
def test_allowed(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.getJsonResponse('api.get_user_private_allowed')
|
|
assert json['privateCount'] == 6
|
|
assert json['privateAllowed']
|
|
|
|
|
|
class TestConvertToOrganization(ApiTestCase):
|
|
def test_sameadminuser(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.postJsonResponse('api.convert_user_to_organization',
|
|
data={'adminUser': READ_ACCESS_USER,
|
|
'adminPassword': 'password'},
|
|
expected_code=400)
|
|
|
|
self.assertEqual('The admin user is not valid', json['message'])
|
|
|
|
def test_invalidadminuser(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.postJsonResponse('api.convert_user_to_organization',
|
|
data={'adminUser': 'unknownuser',
|
|
'adminPassword': 'password'},
|
|
expected_code=400)
|
|
|
|
self.assertEqual('The admin user credentials are not valid',
|
|
json['message'])
|
|
|
|
def test_invalidadminpassword(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.postJsonResponse('api.convert_user_to_organization',
|
|
data={'adminUser': ADMIN_ACCESS_USER,
|
|
'adminPassword': 'invalidpass'},
|
|
expected_code=400)
|
|
|
|
self.assertEqual('The admin user credentials are not valid',
|
|
json['message'])
|
|
|
|
def test_convert(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.postJsonResponse('api.convert_user_to_organization',
|
|
data={'adminUser': ADMIN_ACCESS_USER,
|
|
'adminPassword': 'password',
|
|
'plan': 'free'})
|
|
|
|
self.assertEqual(True, json['success'])
|
|
|
|
# Verify the organization exists.
|
|
organization = model.get_organization(READ_ACCESS_USER)
|
|
assert organization is not None
|
|
|
|
# Verify the admin user is the org's admin.
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.getJsonResponse('api.get_organization',
|
|
params=dict(orgname=READ_ACCESS_USER))
|
|
|
|
self.assertEquals(READ_ACCESS_USER, json['name'])
|
|
self.assertEquals(True, json['is_admin'])
|
|
|
|
|
|
class TestChangeUserDetails(ApiTestCase):
|
|
def test_changepassword(self):
|
|
self.login(READ_ACCESS_USER)
|
|
self.putJsonResponse('api.change_user_details',
|
|
data=dict(password='newpasswordiscool'))
|
|
self.login(READ_ACCESS_USER, password='newpasswordiscool')
|
|
|
|
def test_changeinvoiceemail(self):
|
|
self.login(READ_ACCESS_USER)
|
|
|
|
json = self.putJsonResponse('api.change_user_details',
|
|
data=dict(invoice_email=True))
|
|
self.assertEquals(True, json['invoice_email'])
|
|
|
|
json = self.putJsonResponse('api.change_user_details',
|
|
data=dict(invoice_email=False))
|
|
self.assertEquals(False, json['invoice_email'])
|
|
|
|
|
|
class TestCreateNewUser(ApiTestCase):
|
|
def test_existingusername(self):
|
|
json = self.postJsonResponse('api.create_new_user',
|
|
data=dict(username=READ_ACCESS_USER,
|
|
password='password',
|
|
email='test@example.com'),
|
|
expected_code=400)
|
|
|
|
self.assertEquals('The username already exists', json['message'])
|
|
|
|
def test_createuser(self):
|
|
data = self.postResponse('api.create_new_user',
|
|
data=NEW_USER_DETAILS,
|
|
expected_code=201)
|
|
self.assertEquals('Created', data)
|
|
|
|
|
|
class TestSignout(ApiTestCase):
|
|
def test_signout(self):
|
|
self.login(READ_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.get_logged_in_user')
|
|
assert json['username'] == READ_ACCESS_USER
|
|
|
|
self.postResponse('api.logout')
|
|
|
|
json = self.getJsonResponse('api.get_logged_in_user')
|
|
assert json['anonymous'] == True
|
|
|
|
|
|
class TestGetMatchingEntities(ApiTestCase):
|
|
def test_notinorg(self):
|
|
self.login(NO_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.get_matching_entities',
|
|
params=dict(prefix='o', namespace=ORGANIZATION,
|
|
includeTeams='true'))
|
|
|
|
names = set([r['name'] for r in json['results']])
|
|
assert 'outsideorg' in names
|
|
assert not 'owners' in names
|
|
|
|
def test_inorg(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.get_matching_entities',
|
|
params=dict(prefix='o', namespace=ORGANIZATION,
|
|
includeTeams='true'))
|
|
|
|
names = set([r['name'] for r in json['results']])
|
|
assert 'outsideorg' in names
|
|
assert 'owners' in names
|
|
|
|
|
|
class TestCreateOrganization(ApiTestCase):
|
|
def test_existinguser(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.postJsonResponse('api.create_organization',
|
|
data=dict(name=ADMIN_ACCESS_USER),
|
|
expected_code=400)
|
|
|
|
self.assertEquals('A user or organization with this name already exists',
|
|
json['message'])
|
|
|
|
def test_existingorg(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.postJsonResponse('api.create_organization',
|
|
data=dict(name=ORGANIZATION),
|
|
expected_code=400)
|
|
|
|
self.assertEquals('A user or organization with this name already exists',
|
|
json['message'])
|
|
|
|
def test_createorg(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
data = self.postResponse('api.create_organization',
|
|
data=dict(name='neworg',
|
|
email='test@example.com'),
|
|
expected_code=201)
|
|
|
|
self.assertEquals('Created', data)
|
|
|
|
# Ensure the org was created.
|
|
organization = model.get_organization('neworg')
|
|
assert organization is not None
|
|
|
|
# Verify the admin user is the org's admin.
|
|
json = self.getJsonResponse('api.get_organization',
|
|
params=dict(orgname='neworg'))
|
|
self.assertEquals('neworg', json['name'])
|
|
self.assertEquals(True, json['is_admin'])
|
|
|
|
|
|
class TestGetOrganization(ApiTestCase):
|
|
def test_unknownorg(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
self.getResponse('api.get_organization', params=dict(orgname='notvalid'),
|
|
expected_code=403)
|
|
|
|
def test_cannotaccess(self):
|
|
self.login(NO_ACCESS_USER)
|
|
self.getResponse('api.get_organization', params=dict(orgname=ORGANIZATION),
|
|
expected_code=403)
|
|
|
|
def test_getorganization(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.getJsonResponse('api.get_organization',
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
self.assertEquals(ORGANIZATION, json['name'])
|
|
self.assertEquals(False, json['is_admin'])
|
|
|
|
def test_getorganization_asadmin(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.getJsonResponse('api.get_organization',
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
self.assertEquals(ORGANIZATION, json['name'])
|
|
self.assertEquals(True, json['is_admin'])
|
|
|
|
|
|
class TestChangeOrganizationDetails(ApiTestCase):
|
|
def test_changeinvoiceemail(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.putJsonResponse('api.change_organization_details',
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(invoice_email=True))
|
|
|
|
self.assertEquals(True, json['invoice_email'])
|
|
|
|
json = self.putJsonResponse('api.change_organization_details',
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(invoice_email=False))
|
|
self.assertEquals(False, json['invoice_email'])
|
|
|
|
|
|
def test_changemail(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.putJsonResponse('api.change_organization_details',
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(email='newemail@example.com'))
|
|
|
|
self.assertEquals('newemail@example.com', json['email'])
|
|
|
|
|
|
class TestGetOrganizationPrototypes(ApiTestCase):
|
|
def test_getprototypes(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.getJsonResponse('api.get_organization_prototype_permissions',
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
assert len(json['prototypes']) > 0
|
|
|
|
|
|
class TestCreateOrganizationPrototypes(ApiTestCase):
|
|
def test_invaliduser(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.postJsonResponse('api.create_organization_prototype_permission',
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(activating_user={'name': 'unknownuser'},
|
|
role='read',
|
|
delegate={'kind': 'team', 'name': 'owners'}),
|
|
expected_code=400)
|
|
|
|
self.assertEquals('Unknown activating user', json['message'])
|
|
|
|
|
|
def test_missingdelegate(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.postJsonResponse('api.create_organization_prototype_permission',
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(role='read'),
|
|
expected_code=400)
|
|
|
|
self.assertEquals('Missing delegate user or team', json['message'])
|
|
|
|
def test_createprototype(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.postJsonResponse('api.create_organization_prototype_permission',
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(role='read',
|
|
delegate={'kind': 'team',
|
|
'name': 'readers'}))
|
|
|
|
self.assertEquals('read', json['role'])
|
|
pid = json['id']
|
|
|
|
# Verify the prototype exists.
|
|
json = self.getJsonResponse('api.get_organization_prototype_permissions',
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
ids = set([p['id'] for p in json['prototypes']])
|
|
assert pid in ids
|
|
|
|
|
|
class TestDeleteOrganizationPrototypes(ApiTestCase):
|
|
def test_deleteprototype(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Get the existing prototypes
|
|
json = self.getJsonResponse('api.get_organization_prototype_permissions',
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
ids = [p['id'] for p in json['prototypes']]
|
|
pid = ids[0]
|
|
|
|
# Delete a prototype.
|
|
self.deleteResponse('api.delete_organization_prototype_permission',
|
|
params=dict(orgname=ORGANIZATION, prototypeid=pid))
|
|
|
|
# Verify the prototype no longer exists.
|
|
json = self.getJsonResponse('api.get_organization_prototype_permissions',
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
newids = [p['id'] for p in json['prototypes']]
|
|
assert not pid in newids
|
|
|
|
|
|
class TestUpdateOrganizationPrototypes(ApiTestCase):
|
|
def test_updateprototype(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Get the existing prototypes
|
|
json = self.getJsonResponse('api.get_organization_prototype_permissions',
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
ids = [p['id'] for p in json['prototypes']]
|
|
pid = ids[0]
|
|
|
|
# Update a prototype.
|
|
json = self.putJsonResponse('api.delete_organization_prototype_permission',
|
|
params=dict(orgname=ORGANIZATION,
|
|
prototypeid=pid), data=dict(role='admin'))
|
|
|
|
self.assertEquals('admin', json['role'])
|
|
|
|
|
|
class TestGetOrganiaztionMembers(ApiTestCase):
|
|
def test_getmembers(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.get_organization_members',
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
assert ADMIN_ACCESS_USER in json['members']
|
|
assert READ_ACCESS_USER in json['members']
|
|
assert not NO_ACCESS_USER in json['members']
|
|
|
|
def test_getspecificmember(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.get_organization_member',
|
|
params=dict(orgname=ORGANIZATION,
|
|
membername=ADMIN_ACCESS_USER))
|
|
|
|
self.assertEquals(ADMIN_ACCESS_USER, json['member']['name'])
|
|
self.assertEquals('user', json['member']['kind'])
|
|
|
|
assert 'owners' in json['member']['teams']
|
|
|
|
|
|
class TestGetOrganizationPrivateAllowed(ApiTestCase):
|
|
def test_existingorg(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.get_organization_private_allowed',
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
self.assertEquals(True, json['privateAllowed'])
|
|
assert not 'reposAllowed' in json
|
|
|
|
|
|
def test_neworg(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
data = self.postResponse('api.create_organization',
|
|
data=dict(name='neworg',
|
|
email='test@example.com'),
|
|
expected_code=201)
|
|
|
|
json = self.getJsonResponse('api.get_organization_private_allowed',
|
|
params=dict(orgname='neworg'))
|
|
|
|
self.assertEquals(False, json['privateAllowed'])
|
|
|
|
|
|
class TestUpdateOrganizationTeam(ApiTestCase):
|
|
def test_updateexisting(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
data = self.postJsonResponse('api.update_organization_team',
|
|
params=dict(orgname=ORGANIZATION,
|
|
teamname='readers'),
|
|
data=dict(description='My cool team',
|
|
role='creator'))
|
|
|
|
self.assertEquals('My cool team', data['description'])
|
|
self.assertEquals('creator', data['role'])
|
|
|
|
def test_attemptchangeroleonowners(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.postResponse('api.update_organization_team',
|
|
params=dict(orgname=ORGANIZATION, teamname='owners'),
|
|
data=dict(role = 'creator'),
|
|
expected_code=400)
|
|
|
|
def test_createnewteam(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
data = self.putJsonResponse('api.update_organization_team',
|
|
params=dict(orgname=ORGANIZATION,
|
|
teamname='newteam'),
|
|
data=dict(description='My cool team',
|
|
role='member'),
|
|
expected_code=201)
|
|
|
|
self.assertEquals('My cool team', data['description'])
|
|
self.assertEquals('member', data['role'])
|
|
|
|
# Verify the team was created.
|
|
json = self.getJsonResponse('api.get_organization',
|
|
params=dict(orgname=ORGANIZATION))
|
|
assert 'newteam' in json['teams']
|
|
|
|
|
|
class TestDeleteOrganizationTeam(ApiTestCase):
|
|
def test_deleteteam(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.deleteResponse('api.delete_organization_team',
|
|
params=dict(orgname=ORGANIZATION, teamname='readers'))
|
|
|
|
# Make sure the team was deleted
|
|
json = self.getJsonResponse('api.get_organization',
|
|
params=dict(orgname=ORGANIZATION))
|
|
assert not 'readers' in json['teams']
|
|
|
|
def test_attemptdeleteowners(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.deleteResponse('api.delete_organization_team',
|
|
params=dict(orgname=ORGANIZATION, teamname='owners'),
|
|
expected_code=400)
|
|
|
|
|
|
class TestGetOrganizationTeamMembers(ApiTestCase):
|
|
def test_invalidteam(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.getResponse('api.get_organization_team_members',
|
|
params=dict(orgname=ORGANIZATION, teamname='notvalid'),
|
|
expected_code=404)
|
|
|
|
def test_getmembers(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.get_organization_team_members',
|
|
params=dict(orgname=ORGANIZATION,
|
|
teamname='readers'))
|
|
|
|
assert READ_ACCESS_USER in json['members']
|
|
|
|
|
|
class TestUpdateOrganizationTeamMember(ApiTestCase):
|
|
def test_addmember(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.postJsonResponse('api.update_organization_team_member',
|
|
params=dict(orgname=ORGANIZATION, teamname='readers',
|
|
membername=NO_ACCESS_USER))
|
|
|
|
|
|
# Verify the user was added to the team.
|
|
json = self.getJsonResponse('api.get_organization_team_members',
|
|
params=dict(orgname=ORGANIZATION,
|
|
teamname='readers'))
|
|
|
|
assert NO_ACCESS_USER in json['members']
|
|
|
|
|
|
class TestDeleteOrganizationTeamMember(ApiTestCase):
|
|
def test_deletemember(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.deleteResponse('api.delete_organization_team_member',
|
|
params=dict(orgname=ORGANIZATION, teamname='readers',
|
|
membername=READ_ACCESS_USER))
|
|
|
|
|
|
# Verify the user was removed from the team.
|
|
json = self.getJsonResponse('api.get_organization_team_members',
|
|
params=dict(orgname=ORGANIZATION,
|
|
teamname='readers'))
|
|
|
|
assert not READ_ACCESS_USER in json['members']
|
|
|
|
|
|
class TestCreateRepo(ApiTestCase):
|
|
def test_duplicaterepo(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.postJsonResponse('api.create_repo',
|
|
data=dict(repository='simple',
|
|
visibility='public'),
|
|
expected_code=400)
|
|
|
|
self.assertEquals('Repository already exists', json['message'])
|
|
|
|
|
|
def test_createrepo(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.postJsonResponse('api.create_repo',
|
|
data=dict(repository='newrepo',
|
|
visibility='public',
|
|
description=''))
|
|
|
|
|
|
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
|
|
self.assertEquals('newrepo', json['name'])
|
|
|
|
|
|
def test_createrepo_underorg(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.postJsonResponse('api.create_repo',
|
|
data=dict(namespace=ORGANIZATION,
|
|
repository='newrepo',
|
|
visibility='private',
|
|
description=''))
|
|
|
|
self.assertEquals(ORGANIZATION, json['namespace'])
|
|
self.assertEquals('newrepo', json['name'])
|
|
|
|
|
|
class TestFindRepos(ApiTestCase):
|
|
def test_findrepos_asguest(self):
|
|
json = self.getJsonResponse('api.find_repos', params=dict(query='p'))
|
|
self.assertEquals(len(json['repositories']), 1)
|
|
|
|
self.assertEquals(json['repositories'][0]['namespace'], 'public')
|
|
self.assertEquals(json['repositories'][0]['name'], 'publicrepo')
|
|
|
|
def test_findrepos_asuser(self):
|
|
self.login(NO_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.find_repos', params=dict(query='p'))
|
|
self.assertEquals(len(json['repositories']), 1)
|
|
|
|
self.assertEquals(json['repositories'][0]['namespace'], 'public')
|
|
self.assertEquals(json['repositories'][0]['name'], 'publicrepo')
|
|
|
|
def test_findrepos_orgmember(self):
|
|
self.login(READ_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.find_repos', params=dict(query='p'))
|
|
self.assertGreater(len(json['repositories']), 1)
|
|
|
|
|
|
class TestListRepos(ApiTestCase):
|
|
def test_listrepos_asguest(self):
|
|
json = self.getJsonResponse('api.list_repos', params=dict(public=True))
|
|
self.assertEquals(len(json['repositories']), 1)
|
|
|
|
def test_listrepos_orgmember(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.getJsonResponse('api.list_repos', params=dict(public=True))
|
|
self.assertGreater(len(json['repositories']), 1)
|
|
|
|
def test_listrepos_filter(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.getJsonResponse('api.list_repos',
|
|
params=dict(namespace=ORGANIZATION,
|
|
public=False))
|
|
|
|
for repo in json['repositories']:
|
|
self.assertEquals(ORGANIZATION, repo['namespace'])
|
|
|
|
def test_listrepos_limit(self):
|
|
self.login(READ_ACCESS_USER)
|
|
json = self.getJsonResponse('api.list_repos', params=dict(limit=2))
|
|
|
|
self.assertEquals(len(json['repositories']), 2)
|
|
|
|
|
|
class TestUpdateRepo(ApiTestCase):
|
|
SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple'
|
|
def test_updatedescription(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.putJsonResponse('api.update_repo',
|
|
params=dict(repository=self.SIMPLE_REPO),
|
|
data=dict(description='Some cool repo'))
|
|
|
|
# Verify the repo description was updated.
|
|
json = self.getJsonResponse('api.get_repo',
|
|
params=dict(repository=self.SIMPLE_REPO))
|
|
|
|
self.assertEquals('Some cool repo', json['description'])
|
|
|
|
|
|
class TestChangeRepoVisibility(ApiTestCase):
|
|
SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple'
|
|
def test_changevisibility(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Make public.
|
|
self.postJsonResponse('api.change_repo_visibility',
|
|
params=dict(repository=self.SIMPLE_REPO),
|
|
data=dict(visibility='public'))
|
|
|
|
# Verify the visibility.
|
|
json = self.getJsonResponse('api.get_repo',
|
|
params=dict(repository=self.SIMPLE_REPO))
|
|
|
|
self.assertEquals(True, json['is_public'])
|
|
|
|
# Make private.
|
|
self.postJsonResponse('api.change_repo_visibility',
|
|
params=dict(repository=self.SIMPLE_REPO),
|
|
data=dict(visibility='private'))
|
|
|
|
# Verify the visibility.
|
|
json = self.getJsonResponse('api.get_repo',
|
|
params=dict(repository=self.SIMPLE_REPO))
|
|
|
|
self.assertEquals(False, json['is_public'])
|
|
|
|
|
|
class TestDeleteRepository(ApiTestCase):
|
|
SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple'
|
|
def test_deleterepo(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
self.deleteResponse('api.delete_repository',
|
|
params=dict(repository=self.SIMPLE_REPO))
|
|
|
|
# Verify the repo was deleted.
|
|
self.getResponse('api.get_repo',
|
|
params=dict(repository=self.SIMPLE_REPO),
|
|
expected_code=404)
|
|
|
|
|
|
class TestGetRepository(ApiTestCase):
|
|
PUBLIC_REPO = PUBLIC_USER + '/publicrepo'
|
|
|
|
def test_getrepo_badnames(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
bad_names = ['logs', 'build', 'tokens', 'foo.bar', 'foo-bar', 'foo_bar']
|
|
|
|
# For each bad name, create the repo.
|
|
for bad_name in bad_names:
|
|
json = self.postJsonResponse('api.create_repo',
|
|
data=dict(repository=bad_name,
|
|
visibility='public',
|
|
description=''))
|
|
|
|
# Make sure we can retrieve its information.
|
|
json = self.getJsonResponse('api.get_repo',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/' + bad_name))
|
|
|
|
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
|
|
self.assertEquals(bad_name, json['name'])
|
|
self.assertEquals(True, json['is_public'])
|
|
|
|
|
|
def test_getrepo_public_asguest(self):
|
|
json = self.getJsonResponse('api.get_repo',
|
|
params=dict(repository=self.PUBLIC_REPO))
|
|
|
|
self.assertEquals(PUBLIC_USER, json['namespace'])
|
|
self.assertEquals('publicrepo', json['name'])
|
|
|
|
self.assertEquals(True, json['is_public'])
|
|
self.assertEquals(False, json['is_organization'])
|
|
self.assertEquals(False, json['is_building'])
|
|
|
|
self.assertEquals(False, json['can_write'])
|
|
self.assertEquals(False, json['can_admin'])
|
|
|
|
assert 'latest' in json['tags']
|
|
|
|
def test_getrepo_public_asowner(self):
|
|
self.login(PUBLIC_USER)
|
|
|
|
json = self.getJsonResponse('api.get_repo',
|
|
params=dict(repository=self.PUBLIC_REPO))
|
|
|
|
self.assertEquals(False, json['is_organization'])
|
|
self.assertEquals(True, json['can_write'])
|
|
self.assertEquals(True, json['can_admin'])
|
|
|
|
def test_getrepo_building(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.get_repo',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
|
|
|
|
self.assertEquals(True, json['can_write'])
|
|
self.assertEquals(True, json['can_admin'])
|
|
self.assertEquals(True, json['is_building'])
|
|
self.assertEquals(False, json['is_organization'])
|
|
|
|
def test_getrepo_org_asnonmember(self):
|
|
self.getResponse('api.get_repo',
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO),
|
|
expected_code=403)
|
|
|
|
def test_getrepo_org_asreader(self):
|
|
self.login(READ_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.get_repo',
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO))
|
|
|
|
self.assertEquals(ORGANIZATION, json['namespace'])
|
|
self.assertEquals(ORG_REPO, json['name'])
|
|
|
|
self.assertEquals(False, json['can_write'])
|
|
self.assertEquals(False, json['can_admin'])
|
|
|
|
self.assertEquals(True, json['is_organization'])
|
|
|
|
def test_getrepo_org_asadmin(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.get_repo',
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO))
|
|
|
|
self.assertEquals(True, json['can_write'])
|
|
self.assertEquals(True, json['can_admin'])
|
|
|
|
self.assertEquals(True, json['is_organization'])
|
|
|
|
|
|
class TestRepoBuilds(ApiTestCase):
|
|
def test_getrepo_nobuilds(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.get_repo_builds',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
|
|
assert len(json['builds']) == 0
|
|
|
|
def test_getrepobuilds(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.get_repo_builds',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
|
|
|
|
assert len(json['builds']) > 0
|
|
build = json['builds'][0]
|
|
|
|
assert 'id' in build
|
|
assert 'status' in build
|
|
|
|
# Check the status endpoint.
|
|
status_json = self.getJsonResponse('api.get_repo_build_status',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/building', build_uuid=build['id']))
|
|
|
|
self.assertEquals(status_json, build)
|
|
|
|
# Check the archive URL.
|
|
archive_json = self.getJsonResponse('api.get_repo_build_archive_url',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/building', build_uuid=build['id']))
|
|
|
|
assert archive_json['url']
|
|
|
|
# Check the logs.
|
|
logs_json = self.getJsonResponse('api.get_repo_build_logs',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/building', build_uuid=build['id']))
|
|
|
|
assert 'logs' in logs_json
|
|
|
|
class TestRequestRepoBuild(ApiTestCase):
|
|
def test_requestrepobuild(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Ensure where not yet building.
|
|
json = self.getJsonResponse('api.get_repo_builds',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
|
|
assert len(json['builds']) == 0
|
|
|
|
# Request a (fake) build.
|
|
self.postResponse('api.request_repo_build',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
|
|
data=dict(file_id='foobarbaz'),
|
|
expected_code=201)
|
|
|
|
# Check for the build.
|
|
json = self.getJsonResponse('api.get_repo_builds',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
|
|
|
|
assert len(json['builds']) > 0
|
|
|
|
|
|
class TestWebhooks(ApiTestCase):
|
|
def test_webhooks(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Add a webhook.
|
|
json = self.postJsonResponse('api.create_webhook',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
|
|
data=dict(url='http://example.com'))
|
|
|
|
self.assertEquals('http://example.com', json['parameters']['url'])
|
|
wid = json['public_id']
|
|
|
|
# Get the webhook.
|
|
json = self.getJsonResponse('api.get_webhook',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid))
|
|
|
|
self.assertEquals(wid, json['public_id'])
|
|
self.assertEquals('http://example.com', json['parameters']['url'])
|
|
|
|
# Verify the webhook is listed.
|
|
json = self.getJsonResponse('api.list_webhooks',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
|
|
ids = [w['public_id'] for w in json['webhooks']]
|
|
assert wid in ids
|
|
|
|
# Delete the webhook.
|
|
self.deleteResponse('api.delete_webhook',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid),
|
|
expected_code=204)
|
|
|
|
# Verify the webhook is gone.
|
|
self.getResponse('api.get_webhook',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid),
|
|
expected_code=404)
|
|
|
|
|
|
class TestListAndGetImage(ApiTestCase):
|
|
def test_listandgetimages(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.list_repository_images',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
|
|
assert len(json['images']) > 0
|
|
for image in json['images']:
|
|
assert 'id' in image
|
|
assert 'tags' in image
|
|
assert 'created' in image
|
|
assert 'comment' in image
|
|
assert 'command' in image
|
|
assert 'ancestors' in image
|
|
assert 'dbid' in image
|
|
assert 'size' in image
|
|
|
|
ijson = self.getJsonResponse('api.get_image',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
|
|
image_id=image['id']))
|
|
|
|
self.assertEquals(image['id'], ijson['id'])
|
|
|
|
|
|
class TestGetImageChanges(ApiTestCase):
|
|
def test_getimagechanges(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Find an image to check.
|
|
json = self.getJsonResponse('api.list_repository_images',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
|
|
image_id = json['images'][0]['id']
|
|
|
|
# Lookup the image's changes.
|
|
# TODO: Fix me once we can get fake changes into the test data
|
|
#self.getJsonResponse('api.get_image_changes',
|
|
# params=dict(repository=ADMIN_ACCESS_USER + '/simple',
|
|
# image_id=image_id))
|
|
|
|
|
|
class TestListAndDeleteTag(ApiTestCase):
|
|
def test_listtagimagesanddeletetag(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# List the images for prod.
|
|
json = self.getJsonResponse('api.list_tag_images',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'))
|
|
|
|
prod_images = json['images']
|
|
assert len(prod_images) > 0
|
|
|
|
# List the images for staging.
|
|
json = self.getJsonResponse('api.list_tag_images',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'))
|
|
|
|
staging_images = json['images']
|
|
assert len(prod_images) == len(staging_images) + 1
|
|
|
|
# Delete prod.
|
|
self.deleteResponse('api.delete_full_tag',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'),
|
|
expected_code=204)
|
|
|
|
# Make sure the tag is gone.
|
|
self.getResponse('api.list_tag_images',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'),
|
|
expected_code=404)
|
|
|
|
# Make the sure the staging images are still there.
|
|
json = self.getJsonResponse('api.list_tag_images',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'))
|
|
|
|
self.assertEquals(staging_images, json['images'])
|
|
|
|
|
|
def test_deletesubtag(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# List the images for prod.
|
|
json = self.getJsonResponse('api.list_tag_images',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'))
|
|
|
|
prod_images = json['images']
|
|
assert len(prod_images) > 0
|
|
|
|
# Delete staging.
|
|
self.deleteResponse('api.delete_full_tag',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'),
|
|
expected_code=204)
|
|
|
|
# Make sure the prod images are still around.
|
|
json = self.getJsonResponse('api.list_tag_images',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'))
|
|
|
|
self.assertEquals(prod_images, json['images'])
|
|
|
|
|
|
class TestRepoPermissions(ApiTestCase):
|
|
def listUserPermissions(self):
|
|
return self.getJsonResponse('api.list_repo_user_permissions',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))['permissions']
|
|
|
|
def listTeamPermissions(self):
|
|
return self.getJsonResponse('api.list_repo_team_permissions',
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO))['permissions']
|
|
|
|
def test_userpermissions(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# The repo should start with just the admin as a user perm.
|
|
permissions = self.listUserPermissions()
|
|
|
|
self.assertEquals(1, len(permissions))
|
|
assert ADMIN_ACCESS_USER in permissions
|
|
self.assertEquals('admin', permissions[ADMIN_ACCESS_USER]['role'])
|
|
|
|
# Add another user.
|
|
self.putJsonResponse('api.change_user_permissions',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER),
|
|
data=dict(role='read'))
|
|
|
|
# Verify the user is present.
|
|
permissions = self.listUserPermissions()
|
|
|
|
self.assertEquals(2, len(permissions))
|
|
assert NO_ACCESS_USER in permissions
|
|
self.assertEquals('read', permissions[NO_ACCESS_USER]['role'])
|
|
|
|
json = self.getJsonResponse('api.get_user_permissions',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER))
|
|
self.assertEquals('read', json['role'])
|
|
|
|
# Change the user's permissions.
|
|
self.putJsonResponse('api.change_user_permissions',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER),
|
|
data=dict(role='admin'))
|
|
|
|
# Verify.
|
|
permissions = self.listUserPermissions()
|
|
|
|
self.assertEquals(2, len(permissions))
|
|
assert NO_ACCESS_USER in permissions
|
|
self.assertEquals('admin', permissions[NO_ACCESS_USER]['role'])
|
|
|
|
# Delete the user's permission.
|
|
self.deleteResponse('api.delete_user_permissions',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER))
|
|
|
|
# Verify.
|
|
permissions = self.listUserPermissions()
|
|
|
|
self.assertEquals(1, len(permissions))
|
|
assert not NO_ACCESS_USER in permissions
|
|
|
|
|
|
def test_teampermissions(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# The repo should start with just the readers as a team perm.
|
|
permissions = self.listTeamPermissions()
|
|
|
|
self.assertEquals(1, len(permissions))
|
|
assert 'readers' in permissions
|
|
self.assertEquals('read', permissions['readers']['role'])
|
|
|
|
# Add another team.
|
|
self.putJsonResponse('api.change_team_permissions',
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'),
|
|
data=dict(role='write'))
|
|
|
|
# Verify the team is present.
|
|
permissions = self.listTeamPermissions()
|
|
|
|
self.assertEquals(2, len(permissions))
|
|
assert 'owners' in permissions
|
|
self.assertEquals('write', permissions['owners']['role'])
|
|
|
|
json = self.getJsonResponse('api.get_team_permissions',
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'))
|
|
self.assertEquals('write', json['role'])
|
|
|
|
# Change the team's permissions.
|
|
self.putJsonResponse('api.change_team_permissions',
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'),
|
|
data=dict(role='admin'))
|
|
|
|
# Verify.
|
|
permissions = self.listTeamPermissions()
|
|
|
|
self.assertEquals(2, len(permissions))
|
|
assert 'owners' in permissions
|
|
self.assertEquals('admin', permissions['owners']['role'])
|
|
|
|
# Delete the team's permission.
|
|
self.deleteResponse('api.delete_team_permissions',
|
|
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'))
|
|
|
|
# Verify.
|
|
permissions = self.listTeamPermissions()
|
|
|
|
self.assertEquals(1, len(permissions))
|
|
assert not 'owners' in permissions
|
|
|
|
|
|
class TestApiTokens(ApiTestCase):
|
|
def listTokens(self):
|
|
return self.getJsonResponse('api.list_repo_tokens',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))['tokens']
|
|
|
|
def test_tokens(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Create a new token.
|
|
json = self.postJsonResponse('api.create_token',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
|
|
data=dict(role='read', friendlyName='mytoken'),
|
|
expected_code=201)
|
|
|
|
self.assertEquals('mytoken', json['friendlyName'])
|
|
self.assertEquals('read', json['role'])
|
|
token_code = json['code']
|
|
|
|
# Verify.
|
|
tokens = self.listTokens()
|
|
assert token_code in tokens
|
|
self.assertEquals('mytoken', tokens[token_code]['friendlyName'])
|
|
|
|
json = self.getJsonResponse('api.get_tokens',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code))
|
|
self.assertEquals(tokens[token_code], json)
|
|
|
|
# Change the token's permission.
|
|
self.putJsonResponse('api.change_token',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code),
|
|
data=dict(role='write'))
|
|
|
|
# Verify.
|
|
json = self.getJsonResponse('api.get_tokens',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code))
|
|
self.assertEquals('write', json['role'])
|
|
|
|
# Delete the token.
|
|
self.deleteResponse('api.delete_token',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code))
|
|
|
|
# Verify.
|
|
self.getResponse('api.get_tokens',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code),
|
|
expected_code=404)
|
|
|
|
|
|
class TestUserCard(ApiTestCase):
|
|
def test_getusercard(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.getJsonResponse('api.get_user_card')
|
|
|
|
self.assertEquals('4242', json['card']['last4'])
|
|
self.assertEquals('Visa', json['card']['type'])
|
|
|
|
def test_setusercard_error(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.postJsonResponse('api.set_user_card',
|
|
data=dict(token='sometoken'),
|
|
expected_code=402)
|
|
assert 'carderror' in json
|
|
|
|
|
|
class TestOrgCard(ApiTestCase):
|
|
def test_getorgcard(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
json = self.getJsonResponse('api.get_org_card',
|
|
params=dict(orgname=ORGANIZATION))
|
|
|
|
self.assertEquals('4242', json['card']['last4'])
|
|
self.assertEquals('Visa', json['card']['type'])
|
|
|
|
|
|
class TestUserSubscription(ApiTestCase):
|
|
def getSubscription(self):
|
|
return self.getJsonResponse('api.get_user_subscription')
|
|
|
|
def test_updateplan(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Change the plan.
|
|
self.putJsonResponse('api.update_user_subscription',
|
|
data=dict(plan='free'))
|
|
|
|
# Verify
|
|
sub = self.getSubscription()
|
|
self.assertEquals('free', sub['plan'])
|
|
|
|
# Change the plan.
|
|
self.putJsonResponse('api.update_user_subscription',
|
|
data=dict(plan='bus-large'))
|
|
|
|
# Verify
|
|
sub = self.getSubscription()
|
|
self.assertEquals('bus-large', sub['plan'])
|
|
|
|
|
|
class TestOrgSubscription(ApiTestCase):
|
|
def getSubscription(self):
|
|
return self.getJsonResponse('api.get_org_subscription', params=dict(orgname=ORGANIZATION))
|
|
|
|
def test_updateplan(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Change the plan.
|
|
self.putJsonResponse('api.update_org_subscription',
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(plan='free'))
|
|
|
|
# Verify
|
|
sub = self.getSubscription()
|
|
self.assertEquals('free', sub['plan'])
|
|
|
|
# Change the plan.
|
|
self.putJsonResponse('api.update_org_subscription',
|
|
params=dict(orgname=ORGANIZATION),
|
|
data=dict(plan='bus-large'))
|
|
|
|
# Verify
|
|
sub = self.getSubscription()
|
|
self.assertEquals('bus-large', sub['plan'])
|
|
|
|
|
|
class TestUserRobots(ApiTestCase):
|
|
def getRobotNames(self):
|
|
return [r['name'] for r in self.getJsonResponse('api.get_user_robots')['robots']]
|
|
|
|
def test_robots(self):
|
|
self.login(NO_ACCESS_USER)
|
|
|
|
# Create a robot.
|
|
json = self.putJsonResponse('api.create_user_robot',
|
|
params=dict(robot_shortname='bender'),
|
|
expected_code=201)
|
|
|
|
self.assertEquals(NO_ACCESS_USER + '+bender', json['name'])
|
|
|
|
# Verify.
|
|
robots = self.getRobotNames()
|
|
assert NO_ACCESS_USER + '+bender' in robots
|
|
|
|
# Delete the robot.
|
|
self.deleteResponse('api.delete_user_robot',
|
|
params=dict(robot_shortname='bender'))
|
|
|
|
# Verify.
|
|
robots = self.getRobotNames()
|
|
assert not NO_ACCESS_USER + '+bender' in robots
|
|
|
|
|
|
class TestOrgRobots(ApiTestCase):
|
|
def getRobotNames(self):
|
|
return [r['name'] for r in self.getJsonResponse('api.get_org_robots',
|
|
params=dict(orgname=ORGANIZATION))['robots']]
|
|
|
|
def test_robots(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Create a robot.
|
|
json = self.putJsonResponse('api.create_org_robot',
|
|
params=dict(orgname=ORGANIZATION, robot_shortname='bender'),
|
|
expected_code=201)
|
|
|
|
self.assertEquals(ORGANIZATION + '+bender', json['name'])
|
|
|
|
# Verify.
|
|
robots = self.getRobotNames()
|
|
assert ORGANIZATION + '+bender' in robots
|
|
|
|
# Delete the robot.
|
|
self.deleteResponse('api.delete_org_robot',
|
|
params=dict(orgname=ORGANIZATION, robot_shortname='bender'))
|
|
|
|
# Verify.
|
|
robots = self.getRobotNames()
|
|
assert not ORGANIZATION + '+bender' in robots
|
|
|
|
|
|
class TestLogs(ApiTestCase):
|
|
def test_user_logs(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.list_user_logs')
|
|
assert 'logs' in json
|
|
assert 'start_time' in json
|
|
assert 'end_time' in json
|
|
|
|
def test_org_logs(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.list_org_logs', params=dict(orgname=ORGANIZATION))
|
|
assert 'logs' in json
|
|
assert 'start_time' in json
|
|
assert 'end_time' in json
|
|
|
|
def test_performer(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.list_org_logs', params=dict(orgname=ORGANIZATION))
|
|
all_logs = json['logs']
|
|
|
|
json = self.getJsonResponse('api.list_org_logs',
|
|
params=dict(performer=READ_ACCESS_USER, orgname=ORGANIZATION))
|
|
|
|
assert len(json['logs']) < len(all_logs)
|
|
for log in json['logs']:
|
|
self.assertEquals(READ_ACCESS_USER, log['performer']['name'])
|
|
|
|
|
|
class FakeBuildTrigger(BuildTrigger):
|
|
@classmethod
|
|
def service_name(cls):
|
|
return 'fakeservice'
|
|
|
|
def list_build_sources(self, auth_token):
|
|
return [{'first': 'source'}, {'second': auth_token}]
|
|
|
|
def list_build_subdirs(self, auth_token, config):
|
|
return [auth_token, 'foo', 'bar', config['somevalue']]
|
|
|
|
def handle_trigger_request(self, request, auth_token, config):
|
|
return ('foo', ['bar'], 'build-name', 'subdir')
|
|
|
|
def is_active(self, config):
|
|
return 'active' in config and config['active']
|
|
|
|
def activate(self, trigger_uuid, standard_webhook_url, auth_token, config):
|
|
config['active'] = True
|
|
return config
|
|
|
|
def deactivate(self, auth_token, config):
|
|
config['active'] = False
|
|
return config
|
|
|
|
def manual_start(self, auth_token, config):
|
|
return ('foo', ['bar'], 'build-name', 'subdir')
|
|
|
|
|
|
class TestBuildTriggers(ApiTestCase):
|
|
def test_list_build_triggers(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
# Check a repo with no known triggers.
|
|
json = self.getJsonResponse('api.list_build_triggers', params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
self.assertEquals(0, len(json['triggers']))
|
|
|
|
# Check a repo with one known trigger.
|
|
json = self.getJsonResponse('api.list_build_triggers', params=dict(repository=ADMIN_ACCESS_USER + '/building'))
|
|
self.assertEquals(1, len(json['triggers']))
|
|
|
|
trigger = json['triggers'][0]
|
|
|
|
assert 'id' in trigger
|
|
assert 'is_active' in trigger
|
|
assert 'config' in trigger
|
|
assert 'service' in trigger
|
|
|
|
# Verify the get trigger method.
|
|
trigger_json = self.getJsonResponse('api.get_build_trigger', params=dict(repository=ADMIN_ACCESS_USER + '/building',
|
|
trigger_uuid=trigger['id']))
|
|
|
|
self.assertEquals(trigger, trigger_json)
|
|
|
|
# Check the recent builds for the trigger.
|
|
builds_json = self.getJsonResponse('api.list_trigger_recent_builds', params=dict(repository=ADMIN_ACCESS_USER + '/building',
|
|
trigger_uuid=trigger['id']))
|
|
|
|
assert 'builds' in builds_json
|
|
|
|
def test_delete_build_trigger(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
json = self.getJsonResponse('api.list_build_triggers', params=dict(repository=ADMIN_ACCESS_USER + '/building'))
|
|
self.assertEquals(1, len(json['triggers']))
|
|
trigger = json['triggers'][0]
|
|
|
|
# Delete the trigger.
|
|
self.deleteResponse('api.delete_build_trigger', params=dict(repository=ADMIN_ACCESS_USER + '/building',
|
|
trigger_uuid=trigger['id']))
|
|
|
|
# Verify it was deleted.
|
|
json = self.getJsonResponse('api.list_build_triggers', params=dict(repository=ADMIN_ACCESS_USER + '/building'))
|
|
self.assertEquals(0, len(json['triggers']))
|
|
|
|
|
|
def test_fake_trigger(self):
|
|
self.login(ADMIN_ACCESS_USER)
|
|
|
|
database.BuildTriggerService.create(name='fakeservice')
|
|
|
|
# Add a new fake trigger.
|
|
repo = model.get_repository(ADMIN_ACCESS_USER, 'simple')
|
|
user = model.get_user(ADMIN_ACCESS_USER)
|
|
trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
|
|
|
|
# Verify the trigger.
|
|
json = self.getJsonResponse('api.list_build_triggers', params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
|
|
self.assertEquals(1, len(json['triggers']))
|
|
self.assertEquals(trigger.uuid, json['triggers'][0]['id'])
|
|
self.assertEquals(trigger.service.name, json['triggers'][0]['service'])
|
|
self.assertEquals(False, json['triggers'][0]['is_active'])
|
|
|
|
# List the trigger's sources.
|
|
source_json = self.getJsonResponse('api.list_trigger_build_sources', params=dict(repository=ADMIN_ACCESS_USER + '/simple',
|
|
trigger_uuid=trigger.uuid))
|
|
self.assertEquals([{'first': 'source'}, {'second': 'sometoken'}], source_json['sources'])
|
|
|
|
# List the trigger's subdirs.
|
|
subdir_json = self.postJsonResponse('api.list_build_trigger_subdirs',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data={'somevalue': 'meh'})
|
|
|
|
self.assertEquals({'status': 'success', 'subdir': ['sometoken', 'foo', 'bar', 'meh']}, subdir_json)
|
|
|
|
# Activate the trigger.
|
|
trigger_config = {}
|
|
activate_json = self.postJsonResponse('api.activate_build_trigger',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data=trigger_config)
|
|
|
|
self.assertEquals(True, activate_json['is_active'])
|
|
|
|
# Make sure the trigger has a write token.
|
|
trigger = model.get_build_trigger(ADMIN_ACCESS_USER, 'simple', trigger.uuid)
|
|
self.assertNotEquals(None, trigger.write_token)
|
|
self.assertEquals(True, py_json.loads(trigger.config)['active'])
|
|
|
|
# Make sure we cannot activate again.
|
|
self.postResponse('api.activate_build_trigger',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
data=trigger_config,
|
|
expected_code=400)
|
|
|
|
# Start a manual build.
|
|
start_json = self.postJsonResponse('api.manually_start_build_trigger',
|
|
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
|
|
expected_code=201)
|
|
|
|
assert 'id' in start_json
|
|
self.assertEquals("build-name", start_json['display_name'])
|
|
self.assertEquals(['bar'], start_json['job_config']['docker_tags'])
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|