This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/test_api_usage.py

2682 lines
98 KiB
Python

# coding=utf-8
import unittest
import json as py_json
from urllib import urlencode
from urlparse import urlparse, urlunparse, parse_qs
from endpoints.api import api_bp, api
from endpoints.webhooks import webhooks
from endpoints.trigger import BuildTrigger as BuildTriggerBase
from app import app
from initdb import setup_database_for_testing, finished_database_for_testing
from data import model, database
from endpoints.api.team import TeamMember, TeamMemberList, TeamMemberInvite, OrganizationTeam
from endpoints.api.tag import RepositoryTagImages, RepositoryTag
from endpoints.api.search import FindRepositories, EntitySearch
from endpoints.api.image import RepositoryImage, RepositoryImageList
from endpoints.api.build import (RepositoryBuildStatus, RepositoryBuildLogs, RepositoryBuildList,
RepositoryBuildResource)
from endpoints.api.robot import (UserRobotList, OrgRobot, OrgRobotList, UserRobot,
RegenerateUserRobot, RegenerateOrgRobot)
from endpoints.api.trigger import (BuildTriggerActivate, BuildTriggerSources, BuildTriggerSubdirs,
TriggerBuildList, ActivateBuildTrigger, BuildTrigger,
BuildTriggerList, BuildTriggerAnalyze, BuildTriggerFieldValues)
from endpoints.api.repoemail import RepositoryAuthorizedEmail
from endpoints.api.repositorynotification import RepositoryNotification, RepositoryNotificationList
from endpoints.api.user import (PrivateRepositories, ConvertToOrganization, Signout, Signin, User,
UserAuthorizationList, UserAuthorization, UserNotification,
UserNotificationList)
from endpoints.api.repotoken import RepositoryToken, RepositoryTokenList
from endpoints.api.prototype import PermissionPrototype, PermissionPrototypeList
from endpoints.api.logs import UserLogs, OrgLogs
from endpoints.api.billing import (UserCard, UserPlan, ListPlans, OrganizationCard,
OrganizationPlan)
from endpoints.api.discovery import DiscoveryResource
from endpoints.api.organization import (OrganizationList, OrganizationMember,
OrgPrivateRepositories, OrgnaizationMemberList,
Organization, ApplicationInformation,
OrganizationApplications, OrganizationApplicationResource,
OrganizationApplicationResetClientSecret)
from endpoints.api.repository import RepositoryList, RepositoryVisibility, Repository
from endpoints.api.permission import (RepositoryUserPermission, RepositoryTeamPermission,
RepositoryTeamPermissionList, RepositoryUserPermissionList)
from endpoints.api.superuser import (SuperUserLogs, SuperUserList, SuperUserManagement,
UsageInformation)
try:
app.register_blueprint(api_bp, url_prefix='/api')
except ValueError:
# This blueprint was already registered
pass
app.register_blueprint(webhooks, url_prefix='/webhooks')
NO_ACCESS_USER = 'freshuser'
READ_ACCESS_USER = 'reader'
ADMIN_ACCESS_USER = 'devtable'
PUBLIC_USER = 'public'
ADMIN_ACCESS_EMAIL = 'jschorr@devtable.com'
ORG_REPO = 'orgrepo'
ORGANIZATION = 'buynlarge'
NEW_USER_DETAILS = {
'username': 'bobby',
'password': 'password',
'email': 'bobby@tables.com',
}
FAKE_APPLICATION_CLIENT_ID = 'deadbeef'
CSRF_TOKEN_KEY = '_csrf_token'
CSRF_TOKEN = '123csrfforme'
class ApiTestCase(unittest.TestCase):
maxDiff = None
@staticmethod
def _add_csrf(without_csrf):
parts = urlparse(without_csrf)
query = parse_qs(parts[4])
query[CSRF_TOKEN_KEY] = CSRF_TOKEN
return urlunparse(list(parts[0:4]) + [urlencode(query)] + list(parts[5:]))
def url_for(self, resource_name, params={}):
url = api.url_for(resource_name, **params)
url = ApiTestCase._add_csrf(url)
return url
def setUp(self):
setup_database_for_testing(self)
self.app = app.test_client()
self.ctx = app.test_request_context()
self.ctx.__enter__()
self.setCsrfToken(CSRF_TOKEN)
def tearDown(self):
finished_database_for_testing(self)
self.ctx.__exit__(True, None, None)
def setCsrfToken(self, token):
with self.app.session_transaction() as sess:
sess[CSRF_TOKEN_KEY] = token
def getJsonResponse(self, resource_name, params={}, expected_code=200):
rv = self.app.get(api.url_for(resource_name, **params))
self.assertEquals(expected_code, rv.status_code)
data = rv.data
parsed = py_json.loads(data)
return parsed
def postResponse(self, resource_name, params={}, data={}, expected_code=200):
rv = self.app.post(self.url_for(resource_name, params),
data=py_json.dumps(data),
headers={"Content-Type": "application/json"})
self.assertEquals(rv.status_code, expected_code)
return rv.data
def getResponse(self, resource_name, params={}, expected_code=200):
rv = self.app.get(api.url_for(resource_name, **params))
self.assertEquals(rv.status_code, expected_code)
return rv.data
def putResponse(self, resource_name, params={}, data={}, expected_code=200):
rv = self.app.put(self.url_for(resource_name, params),
data=py_json.dumps(data),
headers={"Content-Type": "application/json"})
self.assertEquals(rv.status_code, expected_code)
return rv.data
def deleteResponse(self, resource_name, params={}, expected_code=204):
rv = self.app.delete(self.url_for(resource_name, params))
if rv.status_code != expected_code:
print 'Mismatch data for resource DELETE %s: %s' % (resource_name, rv.data)
self.assertEquals(rv.status_code, expected_code)
return rv.data
def postJsonResponse(self, resource_name, params={}, data={},
expected_code=200):
rv = self.app.post(self.url_for(resource_name, params),
data=py_json.dumps(data),
headers={"Content-Type": "application/json"})
if rv.status_code != expected_code:
print 'Mismatch data for resource POST %s: %s' % (resource_name, rv.data)
self.assertEquals(rv.status_code, expected_code)
data = rv.data
parsed = py_json.loads(data)
return parsed
def putJsonResponse(self, resource_name, params={}, data={},
expected_code=200):
rv = self.app.put(self.url_for(resource_name, params),
data=py_json.dumps(data),
headers={"Content-Type": "application/json"})
if rv.status_code != expected_code:
print 'Mismatch data for resource PUT %s: %s' % (resource_name, rv.data)
self.assertEquals(rv.status_code, expected_code)
data = rv.data
parsed = py_json.loads(data)
return parsed
def assertInTeam(self, data, membername):
for memberData in data['members']:
if memberData['name'] == membername:
return
self.fail(membername + ' not found in team: ' + py_json.dumps(data))
def login(self, username, password='password'):
return self.postJsonResponse(Signin, data=dict(username=username, password=password))
class TestCSRFFailure(ApiTestCase):
def test_csrf_failure(self):
self.login(READ_ACCESS_USER)
# Make sure a simple post call succeeds.
self.putJsonResponse(User,
data=dict(password='newpasswordiscool'))
# Change the session's CSRF token.
self.setCsrfToken('someinvalidtoken')
# Verify that the call now fails.
self.putJsonResponse(User,
data=dict(password='newpasswordiscool'),
expected_code=403)
class TestDiscovery(ApiTestCase):
def test_discovery(self):
json = self.getJsonResponse(DiscoveryResource)
assert 'apis' in json
class TestPlans(ApiTestCase):
def test_plans(self):
json = self.getJsonResponse(ListPlans)
found = set([])
for method_info in json['plans']:
found.add(method_info['stripeId'])
assert 'free' in found
class TestLoggedInUser(ApiTestCase):
def test_guest(self):
self.getJsonResponse(User, expected_code=401)
def test_user(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(User)
assert json['anonymous'] == False
assert json['username'] == READ_ACCESS_USER
class TestUserNotification(ApiTestCase):
def test_get(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(UserNotificationList)
# Make sure each notification can be retrieved.
for notification in json['notifications']:
njson = self.getJsonResponse(UserNotification, params=dict(uuid=notification['id']))
self.assertEquals(notification['id'], njson['id'])
# Update a notification.
assert json['notifications']
assert not json['notifications'][0]['dismissed']
pjson = self.putJsonResponse(UserNotification, params=dict(uuid=notification['id']),
data=dict(dismissed=True))
self.assertEquals(True, pjson['dismissed'])
class TestGetUserPrivateAllowed(ApiTestCase):
def test_nonallowed(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(PrivateRepositories)
assert json['privateCount'] == 0
assert not json['privateAllowed']
def test_allowed(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(PrivateRepositories)
assert json['privateCount'] >= 6
assert json['privateAllowed']
class TestConvertToOrganization(ApiTestCase):
def test_sameadminuser(self):
self.login(READ_ACCESS_USER)
json = self.postJsonResponse(ConvertToOrganization,
data={'adminUser': READ_ACCESS_USER,
'adminPassword': 'password',
'plan': 'free'},
expected_code=400)
self.assertEqual('The admin user is not valid', json['message'])
def test_invalidadminuser(self):
self.login(READ_ACCESS_USER)
json = self.postJsonResponse(ConvertToOrganization,
data={'adminUser': 'unknownuser',
'adminPassword': 'password',
'plan': 'free'},
expected_code=400)
self.assertEqual('The admin user credentials are not valid',
json['message'])
def test_invalidadminpassword(self):
self.login(READ_ACCESS_USER)
json = self.postJsonResponse(ConvertToOrganization,
data={'adminUser': ADMIN_ACCESS_USER,
'adminPassword': 'invalidpass',
'plan': 'free'},
expected_code=400)
self.assertEqual('The admin user credentials are not valid',
json['message'])
def test_convert(self):
self.login(READ_ACCESS_USER)
json = self.postJsonResponse(ConvertToOrganization,
data={'adminUser': ADMIN_ACCESS_USER,
'adminPassword': 'password',
'plan': 'free'})
self.assertEqual(True, json['success'])
# Verify the organization exists.
organization = model.get_organization(READ_ACCESS_USER)
assert organization is not None
# Verify the admin user is the org's admin.
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(Organization,
params=dict(orgname=READ_ACCESS_USER))
self.assertEquals(READ_ACCESS_USER, json['name'])
self.assertEquals(True, json['is_admin'])
def test_convert_via_email(self):
self.login(READ_ACCESS_USER)
json = self.postJsonResponse(ConvertToOrganization,
data={'adminUser': ADMIN_ACCESS_EMAIL,
'adminPassword': 'password',
'plan': 'free'})
self.assertEqual(True, json['success'])
# Verify the organization exists.
organization = model.get_organization(READ_ACCESS_USER)
assert organization is not None
# Verify the admin user is the org's admin.
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(Organization,
params=dict(orgname=READ_ACCESS_USER))
self.assertEquals(READ_ACCESS_USER, json['name'])
self.assertEquals(True, json['is_admin'])
class TestChangeUserDetails(ApiTestCase):
def test_changepassword(self):
self.login(READ_ACCESS_USER)
self.putJsonResponse(User,
data=dict(password='newpasswordiscool'))
self.login(READ_ACCESS_USER, password='newpasswordiscool')
def test_changepassword_unicode(self):
self.login(READ_ACCESS_USER)
self.putJsonResponse(User,
data=dict(password=u'someunicode北京市pass'))
self.login(READ_ACCESS_USER, password=u'someunicode北京市pass')
def test_changeeemail(self):
self.login(READ_ACCESS_USER)
self.putJsonResponse(User,
data=dict(email='test+foo@devtable.com'))
def test_changeinvoiceemail(self):
self.login(READ_ACCESS_USER)
json = self.putJsonResponse(User,
data=dict(invoice_email=True))
self.assertEquals(True, json['invoice_email'])
json = self.putJsonResponse(User,
data=dict(invoice_email=False))
self.assertEquals(False, json['invoice_email'])
class TestCreateNewUser(ApiTestCase):
def test_existingusername(self):
json = self.postJsonResponse(User,
data=dict(username=READ_ACCESS_USER,
password='password',
email='test@example.com'),
expected_code=400)
self.assertEquals('The username already exists', json['message'])
def test_trycreatetooshort(self):
json = self.postJsonResponse(User,
data=dict(username='a',
password='password',
email='test@example.com'),
expected_code=400)
self.assertEquals('Invalid username a: Username must be between 4 and 30 characters in length', json['error_description'])
def test_trycreateregexmismatch(self):
json = self.postJsonResponse(User,
data=dict(username='auserName',
password='password',
email='test@example.com'),
expected_code=400)
self.assertEquals('Invalid username auserName: Username must match expression [a-z0-9_]+', json['error_description'])
def test_createuser(self):
data = self.postJsonResponse(User,
data=NEW_USER_DETAILS,
expected_code=200)
self.assertEquals(True, data['awaiting_verification'])
def test_createuser_withteaminvite(self):
inviter = model.get_user(ADMIN_ACCESS_USER)
team = model.get_organization_team(ORGANIZATION, 'owners')
invite = model.add_or_invite_to_team(inviter, team, None, 'foo@example.com')
details = {
'invite_code': invite.invite_token
}
details.update(NEW_USER_DETAILS);
data = self.postJsonResponse(User, data=details, expected_code=200)
self.assertEquals(True, data['awaiting_verification'])
# Make sure the user was added to the team.
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='owners'))
self.assertInTeam(json, NEW_USER_DETAILS['username'])
class TestSignout(ApiTestCase):
def test_signout(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(User)
assert json['username'] == READ_ACCESS_USER
self.postResponse(Signout)
# Make sure we're now signed out.
self.getJsonResponse(User, expected_code=401)
class TestGetMatchingEntities(ApiTestCase):
def test_notinorg(self):
self.login(NO_ACCESS_USER)
json = self.getJsonResponse(EntitySearch,
params=dict(prefix='o', namespace=ORGANIZATION,
includeTeams='true'))
names = set([r['name'] for r in json['results']])
assert 'outsideorg' in names
assert not 'owners' in names
def test_inorg(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(EntitySearch,
params=dict(prefix='o', namespace=ORGANIZATION,
includeTeams='true'))
names = set([r['name'] for r in json['results']])
assert 'outsideorg' in names
assert 'owners' in names
def test_inorg_withorgs(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(EntitySearch,
params=dict(prefix=ORGANIZATION[0], namespace=ORGANIZATION,
includeOrgs='true'))
names = set([r['name'] for r in json['results']])
assert ORGANIZATION in names
class TestCreateOrganization(ApiTestCase):
def test_existinguser(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(OrganizationList,
data=dict(name=ADMIN_ACCESS_USER, email='testorg@example.com'),
expected_code=400)
self.assertEquals('A user or organization with this name already exists',
json['message'])
def test_existingorg(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(OrganizationList,
data=dict(name=ORGANIZATION, email='testorg@example.com'),
expected_code=400)
self.assertEquals('A user or organization with this name already exists',
json['message'])
def test_createorg(self):
self.login(ADMIN_ACCESS_USER)
data = self.postResponse(OrganizationList,
data=dict(name='neworg',
email='testorg@example.com'),
expected_code=201)
self.assertEquals('"Created"', data)
# Ensure the org was created.
organization = model.get_organization('neworg')
assert organization is not None
# Verify the admin user is the org's admin.
json = self.getJsonResponse(Organization,
params=dict(orgname='neworg'))
self.assertEquals('neworg', json['name'])
self.assertEquals(True, json['is_admin'])
class TestGetOrganization(ApiTestCase):
def test_unknownorg(self):
self.login(ADMIN_ACCESS_USER)
self.getResponse(Organization, params=dict(orgname='notvalid'),
expected_code=403)
def test_cannotaccess(self):
self.login(NO_ACCESS_USER)
self.getResponse(Organization, params=dict(orgname=ORGANIZATION),
expected_code=403)
def test_getorganization(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(Organization,
params=dict(orgname=ORGANIZATION))
self.assertEquals(ORGANIZATION, json['name'])
self.assertEquals(False, json['is_admin'])
def test_getorganization_asadmin(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(Organization,
params=dict(orgname=ORGANIZATION))
self.assertEquals(ORGANIZATION, json['name'])
self.assertEquals(True, json['is_admin'])
class TestChangeOrganizationDetails(ApiTestCase):
def test_changeinvoiceemail(self):
self.login(ADMIN_ACCESS_USER)
json = self.putJsonResponse(Organization,
params=dict(orgname=ORGANIZATION),
data=dict(invoice_email=True))
self.assertEquals(True, json['invoice_email'])
json = self.putJsonResponse(Organization,
params=dict(orgname=ORGANIZATION),
data=dict(invoice_email=False))
self.assertEquals(False, json['invoice_email'])
def test_changemail(self):
self.login(ADMIN_ACCESS_USER)
json = self.putJsonResponse(Organization,
params=dict(orgname=ORGANIZATION),
data=dict(email='newemail@example.com'))
self.assertEquals('newemail@example.com', json['email'])
class TestGetOrganizationPrototypes(ApiTestCase):
def test_getprototypes(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION))
assert len(json['prototypes']) > 0
class TestCreateOrganizationPrototypes(ApiTestCase):
def test_invaliduser(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION),
data=dict(activating_user={'name': 'unknownuser'},
role='read',
delegate={'kind': 'team', 'name': 'owners'}),
expected_code=400)
self.assertEquals('Unknown activating user', json['message'])
def test_missingdelegate(self):
self.login(ADMIN_ACCESS_USER)
self.postJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION),
data=dict(role='read'),
expected_code=400)
def test_createprototype(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION),
data=dict(role='read',
delegate={'kind': 'team',
'name': 'readers'}))
self.assertEquals('read', json['role'])
pid = json['id']
# Verify the prototype exists.
json = self.getJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION))
ids = set([p['id'] for p in json['prototypes']])
assert pid in ids
class TestDeleteOrganizationPrototypes(ApiTestCase):
def test_deleteprototype(self):
self.login(ADMIN_ACCESS_USER)
# Get the existing prototypes
json = self.getJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION))
ids = [p['id'] for p in json['prototypes']]
pid = ids[0]
# Delete a prototype.
self.deleteResponse(PermissionPrototype,
params=dict(orgname=ORGANIZATION, prototypeid=pid))
# Verify the prototype no longer exists.
json = self.getJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION))
newids = [p['id'] for p in json['prototypes']]
assert not pid in newids
class TestUpdateOrganizationPrototypes(ApiTestCase):
def test_updateprototype(self):
self.login(ADMIN_ACCESS_USER)
# Get the existing prototypes
json = self.getJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION))
ids = [p['id'] for p in json['prototypes']]
pid = ids[0]
# Update a prototype.
json = self.putJsonResponse(PermissionPrototype,
params=dict(orgname=ORGANIZATION,
prototypeid=pid), data=dict(role='admin'))
self.assertEquals('admin', json['role'])
class TestGetOrganiaztionMembers(ApiTestCase):
def test_getmembers(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrgnaizationMemberList,
params=dict(orgname=ORGANIZATION))
assert ADMIN_ACCESS_USER in json['members']
assert READ_ACCESS_USER in json['members']
assert not NO_ACCESS_USER in json['members']
def test_getspecificmember(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrganizationMember,
params=dict(orgname=ORGANIZATION,
membername=ADMIN_ACCESS_USER))
self.assertEquals(ADMIN_ACCESS_USER, json['member']['name'])
self.assertEquals('user', json['member']['kind'])
assert 'owners' in json['member']['teams']
class TestGetOrganizationPrivateAllowed(ApiTestCase):
def test_existingorg(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrgPrivateRepositories,
params=dict(orgname=ORGANIZATION))
self.assertEquals(True, json['privateAllowed'])
assert not 'reposAllowed' in json
def test_neworg(self):
self.login(ADMIN_ACCESS_USER)
data = self.postResponse(OrganizationList,
data=dict(name='neworg',
email='test@example.com'),
expected_code=201)
json = self.getJsonResponse(OrgPrivateRepositories,
params=dict(orgname='neworg'))
self.assertEquals(False, json['privateAllowed'])
class TestUpdateOrganizationTeam(ApiTestCase):
def test_updateexisting(self):
self.login(ADMIN_ACCESS_USER)
data = self.putJsonResponse(OrganizationTeam,
params=dict(orgname=ORGANIZATION,
teamname='readers'),
data=dict(description='My cool team',
role='creator'))
self.assertEquals('My cool team', data['description'])
self.assertEquals('creator', data['role'])
def test_attemptchangeroleonowners(self):
self.login(ADMIN_ACCESS_USER)
self.putJsonResponse(OrganizationTeam,
params=dict(orgname=ORGANIZATION, teamname='owners'),
data=dict(role = 'creator'),
expected_code=400)
def test_createnewteam(self):
self.login(ADMIN_ACCESS_USER)
data = self.putJsonResponse(OrganizationTeam,
params=dict(orgname=ORGANIZATION,
teamname='newteam'),
data=dict(description='My cool team',
role='member'))
self.assertEquals('My cool team', data['description'])
self.assertEquals('member', data['role'])
# Verify the team was created.
json = self.getJsonResponse(Organization,
params=dict(orgname=ORGANIZATION))
assert 'newteam' in json['teams']
class TestDeleteOrganizationTeam(ApiTestCase):
def test_deleteteam(self):
self.login(ADMIN_ACCESS_USER)
self.deleteResponse(OrganizationTeam,
params=dict(orgname=ORGANIZATION, teamname='readers'))
# Make sure the team was deleted
json = self.getJsonResponse(Organization,
params=dict(orgname=ORGANIZATION))
assert not 'readers' in json['teams']
def test_attemptdeleteowners(self):
self.login(ADMIN_ACCESS_USER)
self.deleteResponse(OrganizationTeam,
params=dict(orgname=ORGANIZATION, teamname='owners'),
expected_code=400)
class TestGetOrganizationTeamMembers(ApiTestCase):
def test_invalidteam(self):
self.login(ADMIN_ACCESS_USER)
self.getResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION, teamname='notvalid'),
expected_code=404)
def test_getmembers(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers'))
self.assertEquals(READ_ACCESS_USER, json['members'][1]['name'])
class TestUpdateOrganizationTeamMember(ApiTestCase):
def test_addmember_alreadyteammember(self):
self.login(ADMIN_ACCESS_USER)
membername = READ_ACCESS_USER
self.putResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='readers',
membername=membername),
expected_code=400)
def test_addmember_orgmember(self):
self.login(ADMIN_ACCESS_USER)
membername = READ_ACCESS_USER
self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='owners',
membername=membername))
# Verify the user was added to the team.
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='owners'))
self.assertInTeam(json, membername)
def test_addmember_robot(self):
self.login(ADMIN_ACCESS_USER)
membername = ORGANIZATION + '+coolrobot'
self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='readers',
membername=membername))
# Verify the user was added to the team.
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers'))
self.assertInTeam(json, membername)
def test_addmember_invalidrobot(self):
self.login(ADMIN_ACCESS_USER)
membername = 'freshuser+anotherrobot'
self.putResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='readers',
membername=membername),
expected_code=400)
def test_addmember_nonorgmember(self):
self.login(ADMIN_ACCESS_USER)
membername = NO_ACCESS_USER
response = self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='owners',
membername=membername))
self.assertEquals(True, response['invited'])
# Make sure the user is not (yet) part of the team.
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers'))
for member in json['members']:
self.assertNotEqual(membername, member['name'])
class TestAcceptTeamMemberInvite(ApiTestCase):
def assertInTeam(self, data, membername):
for memberData in data['members']:
if memberData['name'] == membername:
return
self.fail(membername + ' not found in team: ' + json.dumps(data))
def test_accept(self):
self.login(ADMIN_ACCESS_USER)
# Create the invite.
membername = NO_ACCESS_USER
response = self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='owners',
membername=membername))
self.assertEquals(True, response['invited'])
# Login as the user.
self.login(membername)
# Accept the invite.
user = model.get_user(membername)
invites = list(model.lookup_team_invites(user))
self.assertEquals(1, len(invites))
self.putJsonResponse(TeamMemberInvite,
params=dict(code=invites[0].invite_token))
# Verify the user is now on the team.
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='owners'))
self.assertInTeam(json, membername)
# Verify the accept now fails.
self.putResponse(TeamMemberInvite,
params=dict(code=invites[0].invite_token),
expected_code=400)
class TestDeclineTeamMemberInvite(ApiTestCase):
def test_decline_wronguser(self):
self.login(ADMIN_ACCESS_USER)
# Create the invite.
membername = NO_ACCESS_USER
response = self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='owners',
membername=membername))
self.assertEquals(True, response['invited'])
# Try to decline the invite.
user = model.get_user(membername)
invites = list(model.lookup_team_invites(user))
self.assertEquals(1, len(invites))
self.deleteResponse(TeamMemberInvite,
params=dict(code=invites[0].invite_token),
expected_code=400)
def test_decline(self):
self.login(ADMIN_ACCESS_USER)
# Create the invite.
membername = NO_ACCESS_USER
response = self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='owners',
membername=membername))
self.assertEquals(True, response['invited'])
# Login as the user.
self.login(membername)
# Decline the invite.
user = model.get_user(membername)
invites = list(model.lookup_team_invites(user))
self.assertEquals(1, len(invites))
self.deleteResponse(TeamMemberInvite,
params=dict(code=invites[0].invite_token))
# Make sure the invite was deleted.
self.deleteResponse(TeamMemberInvite,
params=dict(code=invites[0].invite_token),
expected_code=400)
class TestDeleteOrganizationTeamMember(ApiTestCase):
def test_deletememberinvite(self):
self.login(ADMIN_ACCESS_USER)
membername = NO_ACCESS_USER
response = self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='readers',
membername=membername))
self.assertEquals(True, response['invited'])
# Verify the invite was added.
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers',
includePending=True))
assert len(json['members']) == 3
# Delete the invite.
self.deleteResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='readers',
membername=membername))
# Verify the user was removed from the team.
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers',
includePending=True))
assert len(json['members']) == 2
def test_deletemember(self):
self.login(ADMIN_ACCESS_USER)
self.deleteResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='readers',
membername=READ_ACCESS_USER))
# Verify the user was removed from the team.
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers'))
assert len(json['members']) == 1
class TestCreateRepo(ApiTestCase):
def test_duplicaterepo(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(RepositoryList,
data=dict(repository='simple',
visibility='public',
description=''),
expected_code=400)
self.assertEquals('Repository already exists', json['message'])
def test_createrepo(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(RepositoryList,
data=dict(repository='newrepo',
visibility='public',
description=''),
expected_code=201)
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
self.assertEquals('newrepo', json['name'])
def test_createrepo_underorg(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(RepositoryList,
data=dict(namespace=ORGANIZATION,
repository='newrepo',
visibility='private',
description=''),
expected_code=201)
self.assertEquals(ORGANIZATION, json['namespace'])
self.assertEquals('newrepo', json['name'])
class TestFindRepos(ApiTestCase):
def test_findrepos_asguest(self):
json = self.getJsonResponse(FindRepositories, params=dict(query='p'))
self.assertEquals(len(json['repositories']), 1)
self.assertEquals(json['repositories'][0]['namespace'], 'public')
self.assertEquals(json['repositories'][0]['name'], 'publicrepo')
def test_findrepos_asuser(self):
self.login(NO_ACCESS_USER)
json = self.getJsonResponse(FindRepositories, params=dict(query='p'))
self.assertEquals(len(json['repositories']), 1)
self.assertEquals(json['repositories'][0]['namespace'], 'public')
self.assertEquals(json['repositories'][0]['name'], 'publicrepo')
def test_findrepos_orgmember(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(FindRepositories, params=dict(query='p'))
self.assertGreater(len(json['repositories']), 1)
class TestListRepos(ApiTestCase):
def test_listrepos_asguest(self):
json = self.getJsonResponse(RepositoryList, params=dict(public=True))
self.assertEquals(len(json['repositories']), 1)
def test_listrepos_orgmember(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(RepositoryList, params=dict(public=True))
self.assertGreater(len(json['repositories']), 1)
def test_listrepos_filter(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(RepositoryList,
params=dict(namespace=ORGANIZATION,
public=False))
for repo in json['repositories']:
self.assertEquals(ORGANIZATION, repo['namespace'])
def test_listrepos_limit(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(RepositoryList, params=dict(limit=2))
self.assertEquals(len(json['repositories']), 2)
class TestUpdateRepo(ApiTestCase):
SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple'
def test_updatedescription(self):
self.login(ADMIN_ACCESS_USER)
self.putJsonResponse(Repository,
params=dict(repository=self.SIMPLE_REPO),
data=dict(description='Some cool repo'))
# Verify the repo description was updated.
json = self.getJsonResponse(Repository,
params=dict(repository=self.SIMPLE_REPO))
self.assertEquals('Some cool repo', json['description'])
class TestChangeRepoVisibility(ApiTestCase):
SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple'
def test_changevisibility(self):
self.login(ADMIN_ACCESS_USER)
# Make public.
self.postJsonResponse(RepositoryVisibility,
params=dict(repository=self.SIMPLE_REPO),
data=dict(visibility='public'))
# Verify the visibility.
json = self.getJsonResponse(Repository,
params=dict(repository=self.SIMPLE_REPO))
self.assertEquals(True, json['is_public'])
# Make private.
self.postJsonResponse(RepositoryVisibility,
params=dict(repository=self.SIMPLE_REPO),
data=dict(visibility='private'))
# Verify the visibility.
json = self.getJsonResponse(Repository,
params=dict(repository=self.SIMPLE_REPO))
self.assertEquals(False, json['is_public'])
class TestDeleteRepository(ApiTestCase):
SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple'
COMPLEX_REPO = ADMIN_ACCESS_USER + '/complex'
def test_deleterepo(self):
self.login(ADMIN_ACCESS_USER)
self.deleteResponse(Repository,
params=dict(repository=self.SIMPLE_REPO))
# Verify the repo was deleted.
self.getResponse(Repository,
params=dict(repository=self.SIMPLE_REPO),
expected_code=404)
def test_deleterepo2(self):
self.login(ADMIN_ACCESS_USER)
self.deleteResponse(Repository,
params=dict(repository=self.COMPLEX_REPO))
# Verify the repo was deleted.
self.getResponse(Repository,
params=dict(repository=self.COMPLEX_REPO),
expected_code=404)
def test_populate_and_delete_repo(self):
self.login(ADMIN_ACCESS_USER)
# Make sure the repository has come images and tags.
self.assertTrue(len(list(model.get_repository_images(ADMIN_ACCESS_USER, 'complex'))) > 0)
self.assertTrue(len(list(model.list_repository_tags(ADMIN_ACCESS_USER, 'complex'))) > 0)
# Add some data for the repository, in addition to is already existing images and tags.
repository = model.get_repository(ADMIN_ACCESS_USER, 'complex')
# Create some access tokens.
access_token = model.create_access_token(repository, 'read')
model.create_access_token(repository, 'write')
delegate_token = model.create_delegate_token(ADMIN_ACCESS_USER, 'complex', 'sometoken', 'read')
model.create_delegate_token(ADMIN_ACCESS_USER, 'complex', 'sometoken', 'write')
# Create some repository builds.
model.create_repository_build(repository, access_token, {}, 'someid', 'foobar')
model.create_repository_build(repository, delegate_token, {}, 'someid2', 'foobar2')
# Create some notifications.
model.create_repo_notification(repository, 'repo_push', 'hipchat', {})
model.create_repo_notification(repository, 'build_queued', 'slack', {})
# Create some logs.
model.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository)
model.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository)
# Create some build triggers.
user = model.get_user(ADMIN_ACCESS_USER)
model.create_build_trigger(repository, 'github', 'sometoken', user)
model.create_build_trigger(repository, 'github', 'anothertoken', user)
# Create some email authorizations.
model.create_email_authorization_for_repo(ADMIN_ACCESS_USER, 'complex', 'a@b.com')
model.create_email_authorization_for_repo(ADMIN_ACCESS_USER, 'complex', 'b@c.com')
# Delete the repository.
self.deleteResponse(Repository,
params=dict(repository=self.COMPLEX_REPO))
# Verify the repo was deleted.
self.getResponse(Repository,
params=dict(repository=self.COMPLEX_REPO),
expected_code=404)
class TestGetRepository(ApiTestCase):
PUBLIC_REPO = PUBLIC_USER + '/publicrepo'
def test_getrepo_badnames(self):
self.login(ADMIN_ACCESS_USER)
bad_names = ['logs', 'build', 'tokens', 'foo.bar', 'foo-bar', 'foo_bar']
# For each bad name, create the repo.
for bad_name in bad_names:
json = self.postJsonResponse(RepositoryList, expected_code=201,
data=dict(repository=bad_name, visibility='public',
description=''))
# Make sure we can retrieve its information.
json = self.getJsonResponse(Repository,
params=dict(repository=ADMIN_ACCESS_USER + '/' + bad_name))
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
self.assertEquals(bad_name, json['name'])
self.assertEquals(True, json['is_public'])
def test_getrepo_public_asguest(self):
json = self.getJsonResponse(Repository,
params=dict(repository=self.PUBLIC_REPO))
self.assertEquals(PUBLIC_USER, json['namespace'])
self.assertEquals('publicrepo', json['name'])
self.assertEquals(True, json['is_public'])
self.assertEquals(False, json['is_organization'])
self.assertEquals(False, json['is_building'])
self.assertEquals(False, json['can_write'])
self.assertEquals(False, json['can_admin'])
assert 'latest' in json['tags']
def test_getrepo_public_asowner(self):
self.login(PUBLIC_USER)
json = self.getJsonResponse(Repository,
params=dict(repository=self.PUBLIC_REPO))
self.assertEquals(False, json['is_organization'])
self.assertEquals(True, json['can_write'])
self.assertEquals(True, json['can_admin'])
def test_getrepo_building(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(Repository,
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
self.assertEquals(True, json['can_write'])
self.assertEquals(True, json['can_admin'])
self.assertEquals(True, json['is_building'])
self.assertEquals(False, json['is_organization'])
def test_getrepo_org_asnonmember(self):
self.getResponse(Repository,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO),
expected_code=401)
def test_getrepo_org_asreader(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(Repository,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO))
self.assertEquals(ORGANIZATION, json['namespace'])
self.assertEquals(ORG_REPO, json['name'])
self.assertEquals(False, json['can_write'])
self.assertEquals(False, json['can_admin'])
self.assertEquals(True, json['is_organization'])
def test_getrepo_org_asadmin(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(Repository,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO))
self.assertEquals(True, json['can_write'])
self.assertEquals(True, json['can_admin'])
self.assertEquals(True, json['is_organization'])
class TestRepositoryBuildResource(ApiTestCase):
def test_cancel_invalidbuild(self):
self.login(ADMIN_ACCESS_USER)
self.deleteResponse(RepositoryBuildResource,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid='invalid'),
expected_code=404)
def test_cancel_waitingbuild(self):
self.login(ADMIN_ACCESS_USER)
# Request a (fake) build.
json = self.postJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz'),
expected_code=201)
uuid = json['id']
# Check for the build.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
self.assertEquals(1, len(json['builds']))
self.assertEquals(uuid, json['builds'][0]['id'])
# Find the build's queue item.
build_ref = database.RepositoryBuild.get(uuid=uuid)
queue_item = database.QueueItem.get(id=build_ref.queue_id)
self.assertTrue(queue_item.available)
self.assertTrue(queue_item.retries_remaining > 0)
# Cancel the build.
self.deleteResponse(RepositoryBuildResource,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid=uuid),
expected_code=201)
# Check for the build.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
self.assertEquals(0, len(json['builds']))
# Check for the build's queue item.
try:
database.QueueItem.get(id=build_ref.queue_id)
self.fail('QueueItem still exists for build')
except database.QueueItem.DoesNotExist:
pass
def test_attemptcancel_scheduledbuild(self):
self.login(ADMIN_ACCESS_USER)
# Request a (fake) build.
json = self.postJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz'),
expected_code=201)
uuid = json['id']
# Check for the build.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
self.assertEquals(1, len(json['builds']))
self.assertEquals(uuid, json['builds'][0]['id'])
# Set queue item to be picked up.
qi = database.QueueItem.get(id=1)
qi.available = False
qi.save()
# Try to cancel the build.
self.deleteResponse(RepositoryBuildResource,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid=uuid),
expected_code=400)
def test_attemptcancel_workingbuild(self):
self.login(ADMIN_ACCESS_USER)
# Request a (fake) build.
json = self.postJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz'),
expected_code=201)
uuid = json['id']
# Check for the build.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
self.assertEquals(1, len(json['builds']))
self.assertEquals(uuid, json['builds'][0]['id'])
# Set the build to a different phase.
rb = database.RepositoryBuild.get(uuid=uuid)
rb.phase = database.BUILD_PHASE.BUILDING
rb.save()
# Try to cancel the build.
self.deleteResponse(RepositoryBuildResource,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid=uuid),
expected_code=400)
class TestRepoBuilds(ApiTestCase):
def test_getrepo_nobuilds(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
assert len(json['builds']) == 0
def test_getrepobuilds(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
assert len(json['builds']) > 0
build = json['builds'][-1]
assert 'id' in build
assert 'status' in build
# Check the status endpoint.
status_json = self.getJsonResponse(RepositoryBuildStatus,
params=dict(repository=ADMIN_ACCESS_USER + '/building', build_uuid=build['id']))
self.assertEquals(status_json['id'], build['id'])
self.assertEquals(status_json['resource_key'], build['resource_key'])
self.assertEquals(status_json['trigger'], build['trigger'])
class TestRequestRepoBuild(ApiTestCase):
def test_requestrepobuild(self):
self.login(ADMIN_ACCESS_USER)
# Ensure we are not yet building.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
assert len(json['builds']) == 0
# Request a (fake) build.
self.postResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz'),
expected_code=201)
# Check for the build.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
assert len(json['builds']) > 0
def test_requestrepobuild_with_robot(self):
self.login(ADMIN_ACCESS_USER)
# Ensure we are not yet building.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
assert len(json['builds']) == 0
# Request a (fake) build.
pull_robot = ADMIN_ACCESS_USER + '+dtrobot'
self.postResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz', pull_robot=pull_robot),
expected_code=201)
# Check for the build.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
assert len(json['builds']) > 0
def test_requestrepobuild_with_invalid_robot(self):
self.login(ADMIN_ACCESS_USER)
# Request a (fake) build.
pull_robot = ADMIN_ACCESS_USER + '+invalidrobot'
self.postResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz', pull_robot=pull_robot),
expected_code=404)
def test_requestrepobuild_with_unauthorized_robot(self):
self.login(ADMIN_ACCESS_USER)
# Request a (fake) build.
pull_robot = 'freshuser+anotherrobot'
self.postResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz', pull_robot=pull_robot),
expected_code=403)
class TestRepositoryEmail(ApiTestCase):
def test_emailnotauthorized(self):
self.login(ADMIN_ACCESS_USER)
# Verify the e-mail address is not authorized.
json = self.getResponse(RepositoryAuthorizedEmail,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', email='test@example.com'),
expected_code=404)
def test_emailnotauthorized_butsent(self):
self.login(ADMIN_ACCESS_USER)
# Verify the e-mail address is not authorized.
json = self.getJsonResponse(RepositoryAuthorizedEmail,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', email='jschorr+other@devtable.com'))
self.assertEquals(False, json['confirmed'])
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
self.assertEquals('simple', json['repository'])
def test_emailauthorized(self):
self.login(ADMIN_ACCESS_USER)
# Verify the e-mail address is authorized.
json = self.getJsonResponse(RepositoryAuthorizedEmail,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', email='jschorr@devtable.com'))
self.assertEquals(True, json['confirmed'])
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
self.assertEquals('simple', json['repository'])
def test_send_email_authorization(self):
self.login(ADMIN_ACCESS_USER)
# Send the email.
json = self.postJsonResponse(RepositoryAuthorizedEmail,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', email='jschorr+foo@devtable.com'))
self.assertEquals(False, json['confirmed'])
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
self.assertEquals('simple', json['repository'])
class TestRepositoryNotifications(ApiTestCase):
def test_webhooks(self):
self.login(ADMIN_ACCESS_USER)
# Add a notification.
json = self.postJsonResponse(RepositoryNotificationList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(config={'url': 'http://example.com'}, event='repo_push', method='webhook'),
expected_code=201)
self.assertEquals('repo_push', json['event'])
self.assertEquals('webhook', json['method'])
self.assertEquals('http://example.com', json['config']['url'])
wid = json['uuid']
# Get the notification.
json = self.getJsonResponse(RepositoryNotification,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=wid))
self.assertEquals(wid, json['uuid'])
self.assertEquals('repo_push', json['event'])
self.assertEquals('webhook', json['method'])
# Verify the notification is listed.
json = self.getJsonResponse(RepositoryNotificationList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
ids = [w['uuid'] for w in json['notifications']]
assert wid in ids
# Delete the notification.
self.deleteResponse(RepositoryNotification,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=wid),
expected_code=204)
# Verify the notification is gone.
self.getResponse(RepositoryNotification,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=wid),
expected_code=404)
class TestListAndGetImage(ApiTestCase):
def test_listandgetimages(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(RepositoryImageList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
assert len(json['images']) > 0
for image in json['images']:
assert 'id' in image
assert 'tags' in image
assert 'created' in image
assert 'comment' in image
assert 'command' in image
assert 'ancestors' in image
assert 'size' in image
ijson = self.getJsonResponse(RepositoryImage,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
image_id=image['id']))
self.assertEquals(image['id'], ijson['id'])
class TestGetImageChanges(ApiTestCase):
def test_getimagechanges(self):
self.login(ADMIN_ACCESS_USER)
# Find an image to check.
json = self.getJsonResponse(RepositoryImageList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
image_id = json['images'][0]['id']
# Lookup the image's changes.
# TODO: Fix me once we can get fake changes into the test data
#self.getJsonResponse(RepositoryImageChanges,
# params=dict(repository=ADMIN_ACCESS_USER + '/simple',
# image_id=image_id))
class TestListAndDeleteTag(ApiTestCase):
def test_listdeletecreateandmovetag(self):
self.login(ADMIN_ACCESS_USER)
# List the images for prod.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'))
prod_images = json['images']
assert len(prod_images) > 0
# List the images for staging.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'))
staging_images = json['images']
assert len(prod_images) == len(staging_images) + 1
# Delete prod.
self.deleteResponse(RepositoryTag,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'),
expected_code=204)
# Make sure the tag is gone.
self.getResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'),
expected_code=404)
# Make the sure the staging images are still there.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'))
self.assertEquals(staging_images, json['images'])
# Add a new tag to the staging image.
self.putResponse(RepositoryTag,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'),
data=dict(image=staging_images[0]['id']),
expected_code=201)
# Make sure the tag is present.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'))
sometag_images = json['images']
self.assertEquals(sometag_images, staging_images)
# Move the tag.
self.putResponse(RepositoryTag,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'),
data=dict(image=staging_images[-1]['id']),
expected_code=201)
# Make sure the tag has moved.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'))
sometag_new_images = json['images']
self.assertEquals(1, len(sometag_new_images))
self.assertEquals(staging_images[-1], sometag_new_images[0])
def test_deletesubtag(self):
self.login(ADMIN_ACCESS_USER)
# List the images for prod.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'))
prod_images = json['images']
assert len(prod_images) > 0
# Delete staging.
self.deleteResponse(RepositoryTag,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'),
expected_code=204)
# Make sure the prod images are still around.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'))
self.assertEquals(prod_images, json['images'])
class TestRepoPermissions(ApiTestCase):
def listUserPermissions(self, namespace=ADMIN_ACCESS_USER, repo='simple'):
return self.getJsonResponse(RepositoryUserPermissionList,
params=dict(repository=namespace + '/' + repo))['permissions']
def listTeamPermissions(self):
return self.getJsonResponse(RepositoryTeamPermissionList,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO))['permissions']
def test_userpermissions_underorg(self):
self.login(ADMIN_ACCESS_USER)
permissions = self.listUserPermissions(namespace=ORGANIZATION, repo=ORG_REPO)
self.assertEquals(1, len(permissions))
assert 'outsideorg' in permissions
self.assertEquals('read', permissions['outsideorg']['role'])
self.assertEquals(False, permissions['outsideorg']['is_org_member'])
# Add another user.
self.putJsonResponse(RepositoryUserPermission,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, username=ADMIN_ACCESS_USER),
data=dict(role='admin'))
# Verify the user is present.
permissions = self.listUserPermissions(namespace=ORGANIZATION, repo=ORG_REPO)
self.assertEquals(2, len(permissions))
assert ADMIN_ACCESS_USER in permissions
self.assertEquals('admin', permissions[ADMIN_ACCESS_USER]['role'])
self.assertEquals(True, permissions[ADMIN_ACCESS_USER]['is_org_member'])
def test_userpermissions(self):
self.login(ADMIN_ACCESS_USER)
# The repo should start with just the admin as a user perm.
permissions = self.listUserPermissions()
self.assertEquals(1, len(permissions))
assert ADMIN_ACCESS_USER in permissions
self.assertEquals('admin', permissions[ADMIN_ACCESS_USER]['role'])
self.assertFalse('is_org_member' in permissions[ADMIN_ACCESS_USER])
# Add another user.
self.putJsonResponse(RepositoryUserPermission,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER),
data=dict(role='read'))
# Verify the user is present.
permissions = self.listUserPermissions()
self.assertEquals(2, len(permissions))
assert NO_ACCESS_USER in permissions
self.assertEquals('read', permissions[NO_ACCESS_USER]['role'])
self.assertFalse('is_org_member' in permissions[NO_ACCESS_USER])
json = self.getJsonResponse(RepositoryUserPermission,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER))
self.assertEquals('read', json['role'])
# Change the user's permissions.
self.putJsonResponse(RepositoryUserPermission,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER),
data=dict(role='admin'))
# Verify.
permissions = self.listUserPermissions()
self.assertEquals(2, len(permissions))
assert NO_ACCESS_USER in permissions
self.assertEquals('admin', permissions[NO_ACCESS_USER]['role'])
# Delete the user's permission.
self.deleteResponse(RepositoryUserPermission,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER))
# Verify.
permissions = self.listUserPermissions()
self.assertEquals(1, len(permissions))
assert not NO_ACCESS_USER in permissions
def test_teampermissions(self):
self.login(ADMIN_ACCESS_USER)
# The repo should start with just the readers as a team perm.
permissions = self.listTeamPermissions()
self.assertEquals(1, len(permissions))
assert 'readers' in permissions
self.assertEquals('read', permissions['readers']['role'])
# Add another team.
self.putJsonResponse(RepositoryTeamPermission,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'),
data=dict(role='write'))
# Verify the team is present.
permissions = self.listTeamPermissions()
self.assertEquals(2, len(permissions))
assert 'owners' in permissions
self.assertEquals('write', permissions['owners']['role'])
json = self.getJsonResponse(RepositoryTeamPermission,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'))
self.assertEquals('write', json['role'])
# Change the team's permissions.
self.putJsonResponse(RepositoryTeamPermission,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'),
data=dict(role='admin'))
# Verify.
permissions = self.listTeamPermissions()
self.assertEquals(2, len(permissions))
assert 'owners' in permissions
self.assertEquals('admin', permissions['owners']['role'])
# Delete the team's permission.
self.deleteResponse(RepositoryTeamPermission,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'))
# Verify.
permissions = self.listTeamPermissions()
self.assertEquals(1, len(permissions))
assert not 'owners' in permissions
class TestApiTokens(ApiTestCase):
def listTokens(self):
return self.getJsonResponse(RepositoryTokenList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))['tokens']
def test_tokens(self):
self.login(ADMIN_ACCESS_USER)
# Create a new token.
json = self.postJsonResponse(RepositoryTokenList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(role='read', friendlyName='mytoken'),
expected_code=201)
self.assertEquals('mytoken', json['friendlyName'])
self.assertEquals('read', json['role'])
token_code = json['code']
# Verify.
tokens = self.listTokens()
assert token_code in tokens
self.assertEquals('mytoken', tokens[token_code]['friendlyName'])
json = self.getJsonResponse(RepositoryToken,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code))
self.assertEquals(tokens[token_code], json)
# Change the token's permission.
self.putJsonResponse(RepositoryToken,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code),
data=dict(role='write'))
# Verify.
json = self.getJsonResponse(RepositoryToken,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code))
self.assertEquals('write', json['role'])
# Delete the token.
self.deleteResponse(RepositoryToken,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code))
# Verify.
self.getResponse(RepositoryToken,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code),
expected_code=404)
class TestUserCard(ApiTestCase):
def test_getusercard(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(UserCard)
self.assertEquals('4242', json['card']['last4'])
self.assertEquals('Visa', json['card']['type'])
def test_setusercard_error(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(UserCard,
data=dict(token='sometoken'),
expected_code=402)
assert 'carderror' in json
class TestOrgCard(ApiTestCase):
def test_getorgcard(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrganizationCard,
params=dict(orgname=ORGANIZATION))
self.assertEquals('4242', json['card']['last4'])
self.assertEquals('Visa', json['card']['type'])
class TestUserSubscription(ApiTestCase):
def getSubscription(self):
return self.getJsonResponse(UserPlan)
def test_updateplan(self):
self.login(ADMIN_ACCESS_USER)
# Change the plan.
self.putJsonResponse(UserPlan,
data=dict(plan='free'))
# Verify
sub = self.getSubscription()
self.assertEquals('free', sub['plan'])
# Change the plan.
self.putJsonResponse(UserPlan,
data=dict(plan='bus-large'))
# Verify
sub = self.getSubscription()
self.assertEquals('bus-large', sub['plan'])
class TestOrgSubscription(ApiTestCase):
def getSubscription(self):
return self.getJsonResponse(OrganizationPlan, params=dict(orgname=ORGANIZATION))
def test_updateplan(self):
self.login(ADMIN_ACCESS_USER)
# Change the plan.
self.putJsonResponse(OrganizationPlan,
params=dict(orgname=ORGANIZATION),
data=dict(plan='free'))
# Verify
sub = self.getSubscription()
self.assertEquals('free', sub['plan'])
# Change the plan.
self.putJsonResponse(OrganizationPlan,
params=dict(orgname=ORGANIZATION),
data=dict(plan='bus-large'))
# Verify
sub = self.getSubscription()
self.assertEquals('bus-large', sub['plan'])
class TestUserRobots(ApiTestCase):
def getRobotNames(self):
return [r['name'] for r in self.getJsonResponse(UserRobotList)['robots']]
def test_robots(self):
self.login(NO_ACCESS_USER)
# Create a robot.
json = self.putJsonResponse(UserRobot,
params=dict(robot_shortname='bender'),
expected_code=201)
self.assertEquals(NO_ACCESS_USER + '+bender', json['name'])
# Verify.
robots = self.getRobotNames()
assert NO_ACCESS_USER + '+bender' in robots
# Delete the robot.
self.deleteResponse(UserRobot,
params=dict(robot_shortname='bender'))
# Verify.
robots = self.getRobotNames()
assert not NO_ACCESS_USER + '+bender' in robots
def test_regenerate(self):
self.login(NO_ACCESS_USER)
# Create a robot.
json = self.putJsonResponse(UserRobot,
params=dict(robot_shortname='bender'),
expected_code=201)
token = json['token']
# Regenerate the robot.
json = self.postJsonResponse(RegenerateUserRobot,
params=dict(robot_shortname='bender'),
expected_code=200)
# Verify the token changed.
self.assertNotEquals(token, json['token'])
json2 = self.getJsonResponse(UserRobot,
params=dict(robot_shortname='bender'),
expected_code=200)
self.assertEquals(json['token'], json2['token'])
class TestOrgRobots(ApiTestCase):
def getRobotNames(self):
return [r['name'] for r in self.getJsonResponse(OrgRobotList,
params=dict(orgname=ORGANIZATION))['robots']]
def test_delete_robot_after_use(self):
self.login(ADMIN_ACCESS_USER)
# Create the robot.
self.putJsonResponse(OrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='bender'),
expected_code=201)
# Add the robot to a team.
membername = ORGANIZATION + '+bender'
self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='readers',
membername=membername))
# Add a repository permission.
self.putJsonResponse(RepositoryUserPermission,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, username=membername),
data=dict(role='read'))
# Add a permission prototype with the robot as the activating user.
self.postJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION),
data=dict(role='read',
activating_user={'name': membername},
delegate={'kind': 'user',
'name': membername}))
# Add a permission prototype with the robot as the delegating user.
self.postJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION),
data=dict(role='read',
delegate={'kind': 'user',
'name': membername}))
# Add a build trigger with the robot as the pull robot.
database.BuildTriggerService.create(name='fakeservice')
# Add a new fake trigger.
repo = model.get_repository(ORGANIZATION, ORG_REPO)
user = model.get_user(ADMIN_ACCESS_USER)
pull_robot = model.get_user(membername)
model.create_build_trigger(repo, 'fakeservice', 'sometoken', user, pull_robot=pull_robot)
# Add some log entries for the robot.
model.log_action('pull_repo', ORGANIZATION, performer=pull_robot, repository=repo)
# Delete the robot and verify it works.
self.deleteResponse(OrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='bender'))
# All the above records should now be deleted, along with the robot. We verify a few of the
# critical ones below.
# Check the team.
team = model.get_organization_team(ORGANIZATION, 'readers')
members = [member.username for member in model.get_organization_team_members(team.id)]
self.assertFalse(membername in members)
# Check the robot itself.
self.assertIsNone(model.get_user(membername))
def test_robots(self):
self.login(ADMIN_ACCESS_USER)
# Create a robot.
json = self.putJsonResponse(OrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='bender'),
expected_code=201)
self.assertEquals(ORGANIZATION + '+bender', json['name'])
# Verify.
robots = self.getRobotNames()
assert ORGANIZATION + '+bender' in robots
# Delete the robot.
self.deleteResponse(OrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='bender'))
# Verify.
robots = self.getRobotNames()
assert not ORGANIZATION + '+bender' in robots
def test_regenerate(self):
self.login(ADMIN_ACCESS_USER)
# Create a robot.
json = self.putJsonResponse(OrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='bender'),
expected_code=201)
token = json['token']
# Regenerate the robot.
json = self.postJsonResponse(RegenerateOrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='bender'),
expected_code=200)
# Verify the token changed.
self.assertNotEquals(token, json['token'])
json2 = self.getJsonResponse(OrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='bender'),
expected_code=200)
self.assertEquals(json['token'], json2['token'])
class TestLogs(ApiTestCase):
def test_user_logs(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(UserLogs)
assert 'logs' in json
assert 'start_time' in json
assert 'end_time' in json
def test_org_logs(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrgLogs, params=dict(orgname=ORGANIZATION))
assert 'logs' in json
assert 'start_time' in json
assert 'end_time' in json
def test_performer(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrgLogs, params=dict(orgname=ORGANIZATION))
all_logs = json['logs']
json = self.getJsonResponse(OrgLogs,
params=dict(performer=READ_ACCESS_USER, orgname=ORGANIZATION))
assert len(json['logs']) < len(all_logs)
for log in json['logs']:
self.assertEquals(READ_ACCESS_USER, log['performer']['name'])
class TestApplicationInformation(ApiTestCase):
def test_get_info(self):
json = self.getJsonResponse(ApplicationInformation, params=dict(client_id=FAKE_APPLICATION_CLIENT_ID))
assert 'name' in json
assert 'uri' in json
assert 'organization' in json
def test_get_invalid_info(self):
self.getJsonResponse(ApplicationInformation, params=dict(client_id='invalid-code'),
expected_code=404)
class TestOrganizationApplications(ApiTestCase):
def test_list_create_applications(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION))
self.assertEquals(2, len(json['applications']))
found = False
for application in json['applications']:
if application['client_id'] == FAKE_APPLICATION_CLIENT_ID:
found = True
break
self.assertTrue(found)
# Add a new application.
json = self.postJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION),
data=dict(name="Some cool app", description="foo"))
self.assertEquals("Some cool app", json['name'])
self.assertEquals("foo", json['description'])
# Retrieve the apps list again
list_json = self.getJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION))
self.assertEquals(3, len(list_json['applications']))
class TestOrganizationApplicationResource(ApiTestCase):
def test_get_edit_delete_application(self):
self.login(ADMIN_ACCESS_USER)
# Retrieve the application.
json = self.getJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
self.assertEquals(FAKE_APPLICATION_CLIENT_ID, json['client_id'])
# Edit the application.
edit_json = self.putJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID),
data=dict(name="Some App", description="foo", application_uri="bar",
redirect_uri="baz", avatar_email="meh"))
self.assertEquals(FAKE_APPLICATION_CLIENT_ID, edit_json['client_id'])
self.assertEquals("Some App", edit_json['name'])
self.assertEquals("foo", edit_json['description'])
self.assertEquals("bar", edit_json['application_uri'])
self.assertEquals("baz", edit_json['redirect_uri'])
self.assertEquals("meh", edit_json['avatar_email'])
# Retrieve the application again.
json = self.getJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
self.assertEquals(json, edit_json)
# Delete the application.
self.deleteResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
# Make sure the application is gone.
self.getJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID),
expected_code=404)
class TestOrganizationApplicationResetClientSecret(ApiTestCase):
def test_reset_client_secret(self):
self.login(ADMIN_ACCESS_USER)
# Retrieve the application.
json = self.getJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
self.assertEquals(FAKE_APPLICATION_CLIENT_ID, json['client_id'])
# Reset the client secret.
reset_json = self.postJsonResponse(OrganizationApplicationResetClientSecret,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
self.assertEquals(FAKE_APPLICATION_CLIENT_ID, reset_json['client_id'])
self.assertNotEquals(reset_json['client_secret'], json['client_secret'])
# Verify it was changed in the DB.
json = self.getJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
self.assertEquals(reset_json['client_secret'], json['client_secret'])
class FakeBuildTrigger(BuildTriggerBase):
@classmethod
def service_name(cls):
return 'fakeservice'
def list_build_sources(self, auth_token):
return [{'first': 'source'}, {'second': auth_token}]
def list_build_subdirs(self, auth_token, config):
return [auth_token, 'foo', 'bar', config['somevalue']]
def handle_trigger_request(self, request, auth_token, config):
return ('foo', ['bar'], 'build-name', 'subdir', {'foo': 'bar'})
def is_active(self, config):
return 'active' in config and config['active']
def activate(self, trigger_uuid, standard_webhook_url, auth_token, config):
config['active'] = True
return config
def deactivate(self, auth_token, config):
config['active'] = False
return config
def manual_start(self, auth_token, config, run_parameters=None):
return ('foo', ['bar'], 'build-name', 'subdir', {'foo': 'bar'})
def dockerfile_url(self, auth_token, config):
return 'http://some/url'
def load_dockerfile_contents(self, auth_token, config):
if not 'dockerfile' in config:
return None
return config['dockerfile']
def list_field_values(self, auth_token, config, field_name):
if field_name == 'test_field':
return [1, 2, 3]
return None
class TestBuildTriggers(ApiTestCase):
def test_list_build_triggers(self):
self.login(ADMIN_ACCESS_USER)
# Check a repo with no known triggers.
json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
self.assertEquals(0, len(json['triggers']))
# Check a repo with one known trigger.
json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building'))
self.assertEquals(1, len(json['triggers']))
trigger = json['triggers'][0]
assert 'id' in trigger
assert 'is_active' in trigger
assert 'config' in trigger
assert 'service' in trigger
# Verify the get trigger method.
trigger_json = self.getJsonResponse(BuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/building',
trigger_uuid=trigger['id']))
self.assertEquals(trigger, trigger_json)
# Check the recent builds for the trigger.
builds_json = self.getJsonResponse(TriggerBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/building',
trigger_uuid=trigger['id']))
assert 'builds' in builds_json
def test_delete_build_trigger(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building'))
self.assertEquals(1, len(json['triggers']))
trigger = json['triggers'][0]
# Delete the trigger.
self.deleteResponse(BuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/building',
trigger_uuid=trigger['id']))
# Verify it was deleted.
json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building'))
self.assertEquals(0, len(json['triggers']))
def test_analyze_fake_trigger(self):
self.login(ADMIN_ACCESS_USER)
database.BuildTriggerService.create(name='fakeservice')
# Add a new fake trigger.
repo = model.get_repository(ADMIN_ACCESS_USER, 'simple')
user = model.get_user(ADMIN_ACCESS_USER)
trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
# Analyze the trigger's dockerfile: First, no dockerfile.
trigger_config = {}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('error', analyze_json['status'])
self.assertEquals('Could not read the Dockerfile for the trigger', analyze_json['message'])
# Analyze the trigger's dockerfile: Second, missing FROM in dockerfile.
trigger_config = {'dockerfile': 'MAINTAINER me'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('warning', analyze_json['status'])
self.assertEquals('No FROM line found in the Dockerfile', analyze_json['message'])
# Analyze the trigger's dockerfile: Third, dockerfile with public repo.
trigger_config = {'dockerfile': 'FROM somerepo'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('publicbase', analyze_json['status'])
# Analyze the trigger's dockerfile: Fourth, dockerfile with private repo with an invalid path.
trigger_config = {'dockerfile': 'FROM localhost:5000/somepath'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('warning', analyze_json['status'])
self.assertEquals('"localhost:5000/somepath" is not a valid Quay repository path', analyze_json['message'])
# Analyze the trigger's dockerfile: Fifth, dockerfile with private repo that does not exist.
trigger_config = {'dockerfile': 'FROM localhost:5000/nothere/randomrepo'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('error', analyze_json['status'])
self.assertEquals('Repository "localhost:5000/nothere/randomrepo" referenced by the Dockerfile was not found', analyze_json['message'])
# Analyze the trigger's dockerfile: Sixth, dockerfile with private repo that the user cannot see.
trigger_config = {'dockerfile': 'FROM localhost:5000/randomuser/randomrepo'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('error', analyze_json['status'])
self.assertEquals('Repository "localhost:5000/randomuser/randomrepo" referenced by the Dockerfile was not found', analyze_json['message'])
# Analyze the trigger's dockerfile: Seventh, dockerfile with private repo that the user see.
trigger_config = {'dockerfile': 'FROM localhost:5000/devtable/complex'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('analyzed', analyze_json['status'])
self.assertEquals('devtable', analyze_json['namespace'])
self.assertEquals('complex', analyze_json['name'])
self.assertEquals(False, analyze_json['is_public'])
self.assertEquals(ADMIN_ACCESS_USER + '+dtrobot', analyze_json['robots'][0]['name'])
def test_fake_trigger(self):
self.login(ADMIN_ACCESS_USER)
database.BuildTriggerService.create(name='fakeservice')
# Add a new fake trigger.
repo = model.get_repository(ADMIN_ACCESS_USER, 'simple')
user = model.get_user(ADMIN_ACCESS_USER)
trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
# Verify the trigger.
json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
self.assertEquals(1, len(json['triggers']))
self.assertEquals(trigger.uuid, json['triggers'][0]['id'])
self.assertEquals(trigger.service.name, json['triggers'][0]['service'])
self.assertEquals(False, json['triggers'][0]['is_active'])
# List the trigger's sources.
source_json = self.getJsonResponse(BuildTriggerSources, params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid))
self.assertEquals([{'first': 'source'}, {'second': 'sometoken'}], source_json['sources'])
# List the trigger's subdirs.
subdir_json = self.postJsonResponse(BuildTriggerSubdirs,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'somevalue': 'meh'})
self.assertEquals({'status': 'success', 'subdir': ['sometoken', 'foo', 'bar', 'meh']}, subdir_json)
# Activate the trigger.
trigger_config = {}
activate_json = self.postJsonResponse(BuildTriggerActivate,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals(True, activate_json['is_active'])
# Make sure the trigger has a write token.
trigger = model.get_build_trigger(trigger.uuid)
self.assertNotEquals(None, trigger.write_token)
self.assertEquals(True, py_json.loads(trigger.config)['active'])
# Make sure we cannot activate again.
self.postResponse(BuildTriggerActivate,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config},
expected_code=400)
# Retrieve values for a field.
result = self.postJsonResponse(BuildTriggerFieldValues,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid, field_name="test_field"))
self.assertEquals(result['values'], [1, 2, 3])
self.postResponse(BuildTriggerFieldValues,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid, field_name="another_field"),
expected_code = 404)
# Start a manual build.
start_json = self.postJsonResponse(ActivateBuildTrigger,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data=dict(),
expected_code=201)
assert 'id' in start_json
self.assertEquals("build-name", start_json['display_name'])
self.assertEquals(['bar'], start_json['job_config']['docker_tags'])
# Verify the metadata was added.
build_obj = database.RepositoryBuild.get(database.RepositoryBuild.uuid == start_json['id'])
self.assertEquals('bar', py_json.loads(build_obj.job_config)['trigger_metadata']['foo'])
def test_invalid_robot_account(self):
self.login(ADMIN_ACCESS_USER)
database.BuildTriggerService.create(name='fakeservice')
# Add a new fake trigger.
repo = model.get_repository(ADMIN_ACCESS_USER, 'simple')
user = model.get_user(ADMIN_ACCESS_USER)
trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
# Try to activate it with an invalid robot account.
trigger_config = {}
activate_json = self.postJsonResponse(BuildTriggerActivate,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config, 'pull_robot': 'someinvalidrobot'},
expected_code=404)
def test_unauthorized_robot_account(self):
self.login(ADMIN_ACCESS_USER)
database.BuildTriggerService.create(name='fakeservice')
# Add a new fake trigger.
repo = model.get_repository(ADMIN_ACCESS_USER, 'simple')
user = model.get_user(ADMIN_ACCESS_USER)
trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
# Try to activate it with a robot account in the wrong namespace.
trigger_config = {}
activate_json = self.postJsonResponse(BuildTriggerActivate,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config, 'pull_robot': 'freshuser+anotherrobot'},
expected_code=403)
def test_robot_account(self):
self.login(ADMIN_ACCESS_USER)
database.BuildTriggerService.create(name='fakeservice')
# Add a new fake trigger.
repo = model.get_repository(ADMIN_ACCESS_USER, 'simple')
user = model.get_user(ADMIN_ACCESS_USER)
trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
# Try to activate it with a robot account.
trigger_config = {}
activate_json = self.postJsonResponse(BuildTriggerActivate,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config, 'pull_robot': ADMIN_ACCESS_USER + '+dtrobot'})
# Verify that the robot was saved.
self.assertEquals(True, activate_json['is_active'])
self.assertEquals(ADMIN_ACCESS_USER + '+dtrobot', activate_json['pull_robot']['name'])
# Start a manual build.
start_json = self.postJsonResponse(ActivateBuildTrigger,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data=dict(),
expected_code=201)
assert 'id' in start_json
self.assertEquals("build-name", start_json['display_name'])
self.assertEquals(['bar'], start_json['job_config']['docker_tags'])
class TestUserAuthorizations(ApiTestCase):
def test_list_get_delete_user_authorizations(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(UserAuthorizationList)
self.assertEquals(1, len(json['authorizations']))
authorization = json['authorizations'][0]
assert 'uuid' in authorization
assert 'scopes' in authorization
assert 'application' in authorization
# Retrieve the authorization.
get_json = self.getJsonResponse(UserAuthorization, params=dict(access_token_uuid = authorization['uuid']))
self.assertEquals(authorization, get_json)
# Delete the authorization.
self.deleteResponse(UserAuthorization, params=dict(access_token_uuid = authorization['uuid']))
# Verify it has been deleted.
self.getJsonResponse(UserAuthorization, params=dict(access_token_uuid = authorization['uuid']),
expected_code=404)
class TestSuperUserLogs(ApiTestCase):
def test_get_logs(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(SuperUserLogs)
assert 'logs' in json
assert len(json['logs']) > 0
class TestSuperUserList(ApiTestCase):
def test_get_users(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(SuperUserList)
assert 'users' in json
assert len(json['users']) > 0
class TestUsageInformation(ApiTestCase):
def test_get_usage(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(UsageInformation)
assert 'usage' in json
assert 'allowed' in json
class TestSuperUserManagement(ApiTestCase):
def test_get_user(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(SuperUserManagement, params=dict(username = 'freshuser'))
self.assertEquals('freshuser', json['username'])
self.assertEquals('jschorr+test@devtable.com', json['email'])
self.assertEquals(False, json['super_user'])
def test_delete_user(self):
self.login(ADMIN_ACCESS_USER)
# Verify the user exists.
json = self.getJsonResponse(SuperUserManagement, params=dict(username = 'freshuser'))
self.assertEquals('freshuser', json['username'])
# Delete the user.
self.deleteResponse(SuperUserManagement, params=dict(username = 'freshuser'), expected_code=204)
# Verify the user no longer exists.
self.getResponse(SuperUserManagement, params=dict(username = 'freshuser'), expected_code=404)
def test_update_user(self):
self.login(ADMIN_ACCESS_USER)
# Verify the user exists.
json = self.getJsonResponse(SuperUserManagement, params=dict(username = 'freshuser'))
self.assertEquals('freshuser', json['username'])
self.assertEquals('jschorr+test@devtable.com', json['email'])
# Update the user.
self.putJsonResponse(SuperUserManagement, params=dict(username='freshuser'), data=dict(email='foo@bar.com'))
# Verify the user was updated.
json = self.getJsonResponse(SuperUserManagement, params=dict(username = 'freshuser'))
self.assertEquals('freshuser', json['username'])
self.assertEquals('foo@bar.com', json['email'])
if __name__ == '__main__':
unittest.main()