# coding=utf-8 import unittest import json as py_json from urllib import urlencode from urlparse import urlparse, urlunparse, parse_qs from endpoints.api import api_bp, api from endpoints.webhooks import webhooks from endpoints.trigger import BuildTrigger as BuildTriggerBase from app import app from initdb import setup_database_for_testing, finished_database_for_testing from data import model, database from endpoints.api.team import TeamMember, TeamMemberList, TeamMemberInvite, OrganizationTeam from endpoints.api.tag import RepositoryTagImages, RepositoryTag, RevertTag, ListRepositoryTags from endpoints.api.search import FindRepositories, EntitySearch, ConductSearch from endpoints.api.image import RepositoryImage, RepositoryImageList from endpoints.api.build import (RepositoryBuildStatus, RepositoryBuildLogs, RepositoryBuildList, RepositoryBuildResource) from endpoints.api.robot import (UserRobotList, OrgRobot, OrgRobotList, UserRobot, RegenerateUserRobot, RegenerateOrgRobot) from endpoints.api.trigger import (BuildTriggerActivate, BuildTriggerSources, BuildTriggerSubdirs, TriggerBuildList, ActivateBuildTrigger, BuildTrigger, BuildTriggerList, BuildTriggerAnalyze, BuildTriggerFieldValues) from endpoints.api.repoemail import RepositoryAuthorizedEmail from endpoints.api.repositorynotification import RepositoryNotification, RepositoryNotificationList from endpoints.api.user import (PrivateRepositories, ConvertToOrganization, Signout, Signin, User, UserAuthorizationList, UserAuthorization, UserNotification, UserNotificationList, StarredRepositoryList, StarredRepository) from endpoints.api.repotoken import RepositoryToken, RepositoryTokenList from endpoints.api.prototype import PermissionPrototype, PermissionPrototypeList from endpoints.api.logs import UserLogs, OrgLogs from endpoints.api.billing import (UserCard, UserPlan, ListPlans, OrganizationCard, OrganizationPlan) from endpoints.api.discovery import DiscoveryResource from endpoints.api.organization import (OrganizationList, OrganizationMember, OrgPrivateRepositories, OrgnaizationMemberList, Organization, ApplicationInformation, OrganizationApplications, OrganizationApplicationResource, OrganizationApplicationResetClientSecret) from endpoints.api.repository import RepositoryList, RepositoryVisibility, Repository from endpoints.api.permission import (RepositoryUserPermission, RepositoryTeamPermission, RepositoryTeamPermissionList, RepositoryUserPermissionList) from endpoints.api.superuser import (SuperUserLogs, SuperUserList, SuperUserManagement, UsageInformation) try: app.register_blueprint(api_bp, url_prefix='/api') except ValueError: # This blueprint was already registered pass app.register_blueprint(webhooks, url_prefix='/webhooks') NO_ACCESS_USER = 'freshuser' READ_ACCESS_USER = 'reader' ADMIN_ACCESS_USER = 'devtable' PUBLIC_USER = 'public' ADMIN_ACCESS_EMAIL = 'jschorr@devtable.com' ORG_REPO = 'orgrepo' ORGANIZATION = 'buynlarge' NEW_USER_DETAILS = { 'username': 'bobby', 'password': 'password', 'email': 'bobby@tables.com', } FAKE_APPLICATION_CLIENT_ID = 'deadbeef' CSRF_TOKEN_KEY = '_csrf_token' CSRF_TOKEN = '123csrfforme' class ApiTestCase(unittest.TestCase): maxDiff = None @staticmethod def _add_csrf(without_csrf): parts = urlparse(without_csrf) query = parse_qs(parts[4]) query[CSRF_TOKEN_KEY] = CSRF_TOKEN return urlunparse(list(parts[0:4]) + [urlencode(query)] + list(parts[5:])) def url_for(self, resource_name, params={}): url = api.url_for(resource_name, **params) url = ApiTestCase._add_csrf(url) return url def setUp(self): setup_database_for_testing(self) self.app = app.test_client() self.ctx = app.test_request_context() self.ctx.__enter__() self.setCsrfToken(CSRF_TOKEN) def tearDown(self): finished_database_for_testing(self) self.ctx.__exit__(True, None, None) def setCsrfToken(self, token): with self.app.session_transaction() as sess: sess[CSRF_TOKEN_KEY] = token def getJsonResponse(self, resource_name, params={}, expected_code=200): rv = self.app.get(api.url_for(resource_name, **params)) self.assertEquals(expected_code, rv.status_code) data = rv.data parsed = py_json.loads(data) return parsed def postResponse(self, resource_name, params={}, data={}, expected_code=200): rv = self.app.post(self.url_for(resource_name, params), data=py_json.dumps(data), headers={"Content-Type": "application/json"}) self.assertEquals(rv.status_code, expected_code) return rv.data def getResponse(self, resource_name, params={}, expected_code=200): rv = self.app.get(api.url_for(resource_name, **params)) self.assertEquals(rv.status_code, expected_code) return rv.data def putResponse(self, resource_name, params={}, data={}, expected_code=200): rv = self.app.put(self.url_for(resource_name, params), data=py_json.dumps(data), headers={"Content-Type": "application/json"}) self.assertEquals(rv.status_code, expected_code) return rv.data def deleteResponse(self, resource_name, params={}, expected_code=204): rv = self.app.delete(self.url_for(resource_name, params)) if rv.status_code != expected_code: print 'Mismatch data for resource DELETE %s: %s' % (resource_name, rv.data) self.assertEquals(rv.status_code, expected_code) return rv.data def postJsonResponse(self, resource_name, params={}, data={}, expected_code=200): rv = self.app.post(self.url_for(resource_name, params), data=py_json.dumps(data), headers={"Content-Type": "application/json"}) if rv.status_code != expected_code: print 'Mismatch data for resource POST %s: %s' % (resource_name, rv.data) self.assertEquals(rv.status_code, expected_code) data = rv.data parsed = py_json.loads(data) return parsed def putJsonResponse(self, resource_name, params={}, data={}, expected_code=200): rv = self.app.put(self.url_for(resource_name, params), data=py_json.dumps(data), headers={"Content-Type": "application/json"}) if rv.status_code != expected_code: print 'Mismatch data for resource PUT %s: %s' % (resource_name, rv.data) self.assertEquals(rv.status_code, expected_code) data = rv.data parsed = py_json.loads(data) return parsed def assertInTeam(self, data, membername): for memberData in data['members']: if memberData['name'] == membername: return self.fail(membername + ' not found in team: ' + py_json.dumps(data)) def login(self, username, password='password'): return self.postJsonResponse(Signin, data=dict(username=username, password=password)) class TestCSRFFailure(ApiTestCase): def test_csrf_failure(self): self.login(READ_ACCESS_USER) # Make sure a simple post call succeeds. self.putJsonResponse(User, data=dict(password='newpasswordiscool')) # Change the session's CSRF token. self.setCsrfToken('someinvalidtoken') # Verify that the call now fails. self.putJsonResponse(User, data=dict(password='newpasswordiscool'), expected_code=403) class TestDiscovery(ApiTestCase): def test_discovery(self): json = self.getJsonResponse(DiscoveryResource) assert 'apis' in json class TestPlans(ApiTestCase): def test_plans(self): json = self.getJsonResponse(ListPlans) found = set([]) for method_info in json['plans']: found.add(method_info['stripeId']) assert 'free' in found class TestLoggedInUser(ApiTestCase): def test_guest(self): self.getJsonResponse(User, expected_code=401) def test_user(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(User) assert json['anonymous'] == False assert json['username'] == READ_ACCESS_USER class TestUserStarredRepositoryList(ApiTestCase): def test_get_stars_guest(self): self.getJsonResponse(StarredRepositoryList, expected_code=401) def test_get_stars_user(self): self.login(READ_ACCESS_USER) self.getJsonResponse(StarredRepositoryList, expected_code=200) def test_star_repo_guest(self): self.postJsonResponse(StarredRepositoryList, data={ 'namespace': 'public', 'repository': 'publicrepo', }, expected_code=401) def test_star_and_unstar_repo_user(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(StarredRepositoryList) assert json['repositories'] == [] json = self.postJsonResponse(StarredRepositoryList, data={ 'namespace': 'public', 'repository': 'publicrepo', }, expected_code=201) assert json['namespace'] == 'public' assert json['repository'] == 'publicrepo' self.deleteResponse(StarredRepository, params=dict(repository='public/publicrepo'), expected_code=204) json = self.getJsonResponse(StarredRepositoryList) assert json['repositories'] == [] class TestUserNotification(ApiTestCase): def test_get(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(UserNotificationList) # Make sure each notification can be retrieved. for notification in json['notifications']: njson = self.getJsonResponse(UserNotification, params=dict(uuid=notification['id'])) self.assertEquals(notification['id'], njson['id']) # Update a notification. assert json['notifications'] assert not json['notifications'][0]['dismissed'] pjson = self.putJsonResponse(UserNotification, params=dict(uuid=notification['id']), data=dict(dismissed=True)) self.assertEquals(True, pjson['dismissed']) class TestGetUserPrivateAllowed(ApiTestCase): def test_nonallowed(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(PrivateRepositories) assert json['privateCount'] == 0 assert not json['privateAllowed'] def test_allowed(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(PrivateRepositories) assert json['privateCount'] >= 6 assert json['privateAllowed'] class TestConvertToOrganization(ApiTestCase): def test_sameadminuser(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse(ConvertToOrganization, data={'adminUser': READ_ACCESS_USER, 'adminPassword': 'password', 'plan': 'free'}, expected_code=400) self.assertEqual('The admin user is not valid', json['message']) def test_invalidadminuser(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse(ConvertToOrganization, data={'adminUser': 'unknownuser', 'adminPassword': 'password', 'plan': 'free'}, expected_code=400) self.assertEqual('The admin user credentials are not valid', json['message']) def test_invalidadminpassword(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse(ConvertToOrganization, data={'adminUser': ADMIN_ACCESS_USER, 'adminPassword': 'invalidpass', 'plan': 'free'}, expected_code=400) self.assertEqual('The admin user credentials are not valid', json['message']) def test_convert(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse(ConvertToOrganization, data={'adminUser': ADMIN_ACCESS_USER, 'adminPassword': 'password', 'plan': 'free'}) self.assertEqual(True, json['success']) # Verify the organization exists. organization = model.get_organization(READ_ACCESS_USER) assert organization is not None # Verify the admin user is the org's admin. self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(Organization, params=dict(orgname=READ_ACCESS_USER)) self.assertEquals(READ_ACCESS_USER, json['name']) self.assertEquals(True, json['is_admin']) def test_convert_via_email(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse(ConvertToOrganization, data={'adminUser': ADMIN_ACCESS_EMAIL, 'adminPassword': 'password', 'plan': 'free'}) self.assertEqual(True, json['success']) # Verify the organization exists. organization = model.get_organization(READ_ACCESS_USER) assert organization is not None # Verify the admin user is the org's admin. self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(Organization, params=dict(orgname=READ_ACCESS_USER)) self.assertEquals(READ_ACCESS_USER, json['name']) self.assertEquals(True, json['is_admin']) class TestChangeUserDetails(ApiTestCase): def test_changepassword(self): self.login(READ_ACCESS_USER) self.putJsonResponse(User, data=dict(password='newpasswordiscool')) self.login(READ_ACCESS_USER, password='newpasswordiscool') def test_changepassword_unicode(self): self.login(READ_ACCESS_USER) self.putJsonResponse(User, data=dict(password=u'someunicode北京市pass')) self.login(READ_ACCESS_USER, password=u'someunicode北京市pass') def test_changeeemail(self): self.login(READ_ACCESS_USER) self.putJsonResponse(User, data=dict(email='test+foo@devtable.com')) def test_changeinvoiceemail(self): self.login(READ_ACCESS_USER) json = self.putJsonResponse(User, data=dict(invoice_email=True)) self.assertEquals(True, json['invoice_email']) json = self.putJsonResponse(User, data=dict(invoice_email=False)) self.assertEquals(False, json['invoice_email']) class TestCreateNewUser(ApiTestCase): def test_existingusername(self): json = self.postJsonResponse(User, data=dict(username=READ_ACCESS_USER, password='password', email='test@example.com'), expected_code=400) self.assertEquals('The username already exists', json['message']) def test_trycreatetooshort(self): json = self.postJsonResponse(User, data=dict(username='a', password='password', email='test@example.com'), expected_code=400) self.assertEquals('Invalid username a: Username must be between 4 and 30 characters in length', json['error_description']) def test_trycreateregexmismatch(self): json = self.postJsonResponse(User, data=dict(username='auserName', password='password', email='test@example.com'), expected_code=400) self.assertEquals('Invalid username auserName: Username must match expression [a-z0-9_]+', json['error_description']) def test_createuser(self): data = self.postJsonResponse(User, data=NEW_USER_DETAILS, expected_code=200) self.assertEquals(True, data['awaiting_verification']) def test_createuser_withteaminvite(self): inviter = model.get_user(ADMIN_ACCESS_USER) team = model.get_organization_team(ORGANIZATION, 'owners') invite = model.add_or_invite_to_team(inviter, team, None, 'foo@example.com') details = { 'invite_code': invite.invite_token } details.update(NEW_USER_DETAILS); data = self.postJsonResponse(User, data=details, expected_code=200) self.assertEquals(True, data['awaiting_verification']) # Make sure the user was added to the team. self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='owners')) self.assertInTeam(json, NEW_USER_DETAILS['username']) class TestSignout(ApiTestCase): def test_signout(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(User) assert json['username'] == READ_ACCESS_USER self.postResponse(Signout) # Make sure we're now signed out. self.getJsonResponse(User, expected_code=401) class TestConductSearch(ApiTestCase): def test_noaccess(self): self.login(NO_ACCESS_USER) json = self.getJsonResponse(ConductSearch, params=dict(query='read')) self.assertEquals(0, len(json['results'])) json = self.getJsonResponse(ConductSearch, params=dict(query='owners')) self.assertEquals(0, len(json['results'])) def test_nouser(self): json = self.getJsonResponse(ConductSearch, params=dict(query='read')) self.assertEquals(0, len(json['results'])) json = self.getJsonResponse(ConductSearch, params=dict(query='public')) self.assertEquals(2, len(json['results'])) self.assertEquals(json['results'][0]['kind'], 'user') self.assertEquals(json['results'][0]['name'], 'public') self.assertEquals(json['results'][1]['kind'], 'repository') self.assertEquals(json['results'][1]['name'], 'publicrepo') json = self.getJsonResponse(ConductSearch, params=dict(query='owners')) self.assertEquals(0, len(json['results'])) def test_orgmember(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(ConductSearch, params=dict(query='owners')) self.assertEquals(0, len(json['results'])) json = self.getJsonResponse(ConductSearch, params=dict(query='readers')) self.assertEquals(1, len(json['results'])) self.assertEquals(json['results'][0]['kind'], 'team') self.assertEquals(json['results'][0]['name'], 'readers') def test_orgadmin(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(ConductSearch, params=dict(query='owners')) self.assertEquals(1, len(json['results'])) self.assertEquals(json['results'][0]['kind'], 'team') self.assertEquals(json['results'][0]['name'], 'owners') json = self.getJsonResponse(ConductSearch, params=dict(query='readers')) self.assertEquals(1, len(json['results'])) self.assertEquals(json['results'][0]['kind'], 'team') self.assertEquals(json['results'][0]['name'], 'readers') class TestGetMatchingEntities(ApiTestCase): def test_notinorg(self): self.login(NO_ACCESS_USER) json = self.getJsonResponse(EntitySearch, params=dict(prefix='o', namespace=ORGANIZATION, includeTeams='true')) names = set([r['name'] for r in json['results']]) assert 'outsideorg' in names assert not 'owners' in names def test_inorg(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(EntitySearch, params=dict(prefix='o', namespace=ORGANIZATION, includeTeams='true')) names = set([r['name'] for r in json['results']]) assert 'outsideorg' in names assert 'owners' in names def test_inorg_withorgs(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(EntitySearch, params=dict(prefix=ORGANIZATION[0], namespace=ORGANIZATION, includeOrgs='true')) names = set([r['name'] for r in json['results']]) assert ORGANIZATION in names class TestCreateOrganization(ApiTestCase): def test_existinguser(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(OrganizationList, data=dict(name=ADMIN_ACCESS_USER, email='testorg@example.com'), expected_code=400) self.assertEquals('A user or organization with this name already exists', json['message']) def test_existingorg(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(OrganizationList, data=dict(name=ORGANIZATION, email='testorg@example.com'), expected_code=400) self.assertEquals('A user or organization with this name already exists', json['message']) def test_createorg(self): self.login(ADMIN_ACCESS_USER) data = self.postResponse(OrganizationList, data=dict(name='neworg', email='testorg@example.com'), expected_code=201) self.assertEquals('"Created"', data) # Ensure the org was created. organization = model.get_organization('neworg') assert organization is not None # Verify the admin user is the org's admin. json = self.getJsonResponse(Organization, params=dict(orgname='neworg')) self.assertEquals('neworg', json['name']) self.assertEquals(True, json['is_admin']) class TestGetOrganization(ApiTestCase): def test_unknownorg(self): self.login(ADMIN_ACCESS_USER) self.getResponse(Organization, params=dict(orgname='notvalid'), expected_code=404) def test_cannotaccess(self): self.login(NO_ACCESS_USER) self.getResponse(Organization, params=dict(orgname=ORGANIZATION), expected_code=200) def test_getorganization(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(Organization, params=dict(orgname=ORGANIZATION)) self.assertEquals(ORGANIZATION, json['name']) self.assertEquals(False, json['is_admin']) def test_getorganization_asadmin(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(Organization, params=dict(orgname=ORGANIZATION)) self.assertEquals(ORGANIZATION, json['name']) self.assertEquals(True, json['is_admin']) class TestChangeOrganizationDetails(ApiTestCase): def test_changeinvoiceemail(self): self.login(ADMIN_ACCESS_USER) json = self.putJsonResponse(Organization, params=dict(orgname=ORGANIZATION), data=dict(invoice_email=True)) self.assertEquals(True, json['invoice_email']) json = self.putJsonResponse(Organization, params=dict(orgname=ORGANIZATION), data=dict(invoice_email=False)) self.assertEquals(False, json['invoice_email']) def test_changemail(self): self.login(ADMIN_ACCESS_USER) json = self.putJsonResponse(Organization, params=dict(orgname=ORGANIZATION), data=dict(email='newemail@example.com')) self.assertEquals('newemail@example.com', json['email']) class TestGetOrganizationPrototypes(ApiTestCase): def test_getprototypes(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION)) assert len(json['prototypes']) > 0 class TestCreateOrganizationPrototypes(ApiTestCase): def test_invaliduser(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION), data=dict(activating_user={'name': 'unknownuser'}, role='read', delegate={'kind': 'team', 'name': 'owners'}), expected_code=400) self.assertEquals('Unknown activating user', json['message']) def test_missingdelegate(self): self.login(ADMIN_ACCESS_USER) self.postJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION), data=dict(role='read'), expected_code=400) def test_createprototype(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION), data=dict(role='read', delegate={'kind': 'team', 'name': 'readers'})) self.assertEquals('read', json['role']) pid = json['id'] # Verify the prototype exists. json = self.getJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION)) ids = set([p['id'] for p in json['prototypes']]) assert pid in ids class TestDeleteOrganizationPrototypes(ApiTestCase): def test_deleteprototype(self): self.login(ADMIN_ACCESS_USER) # Get the existing prototypes json = self.getJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION)) ids = [p['id'] for p in json['prototypes']] pid = ids[0] # Delete a prototype. self.deleteResponse(PermissionPrototype, params=dict(orgname=ORGANIZATION, prototypeid=pid)) # Verify the prototype no longer exists. json = self.getJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION)) newids = [p['id'] for p in json['prototypes']] assert not pid in newids class TestUpdateOrganizationPrototypes(ApiTestCase): def test_updateprototype(self): self.login(ADMIN_ACCESS_USER) # Get the existing prototypes json = self.getJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION)) ids = [p['id'] for p in json['prototypes']] pid = ids[0] # Update a prototype. json = self.putJsonResponse(PermissionPrototype, params=dict(orgname=ORGANIZATION, prototypeid=pid), data=dict(role='admin')) self.assertEquals('admin', json['role']) class TestGetOrganiaztionMembers(ApiTestCase): def test_getmembers(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrgnaizationMemberList, params=dict(orgname=ORGANIZATION)) assert ADMIN_ACCESS_USER in json['members'] assert READ_ACCESS_USER in json['members'] assert not NO_ACCESS_USER in json['members'] def test_getspecificmember(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrganizationMember, params=dict(orgname=ORGANIZATION, membername=ADMIN_ACCESS_USER)) self.assertEquals(ADMIN_ACCESS_USER, json['member']['name']) self.assertEquals('user', json['member']['kind']) assert 'owners' in json['member']['teams'] class TestGetOrganizationPrivateAllowed(ApiTestCase): def test_existingorg(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrgPrivateRepositories, params=dict(orgname=ORGANIZATION)) self.assertEquals(True, json['privateAllowed']) assert not 'reposAllowed' in json def test_neworg(self): self.login(ADMIN_ACCESS_USER) data = self.postResponse(OrganizationList, data=dict(name='neworg', email='test@example.com'), expected_code=201) json = self.getJsonResponse(OrgPrivateRepositories, params=dict(orgname='neworg')) self.assertEquals(False, json['privateAllowed']) class TestUpdateOrganizationTeam(ApiTestCase): def test_updateexisting(self): self.login(ADMIN_ACCESS_USER) data = self.putJsonResponse(OrganizationTeam, params=dict(orgname=ORGANIZATION, teamname='readers'), data=dict(description='My cool team', role='creator')) self.assertEquals('My cool team', data['description']) self.assertEquals('creator', data['role']) def test_attemptchangeroleonowners(self): self.login(ADMIN_ACCESS_USER) self.putJsonResponse(OrganizationTeam, params=dict(orgname=ORGANIZATION, teamname='owners'), data=dict(role='creator'), expected_code=400) def test_createnewteam(self): self.login(ADMIN_ACCESS_USER) data = self.putJsonResponse(OrganizationTeam, params=dict(orgname=ORGANIZATION, teamname='newteam'), data=dict(description='My cool team', role='member')) self.assertEquals('My cool team', data['description']) self.assertEquals('member', data['role']) # Verify the team was created. json = self.getJsonResponse(Organization, params=dict(orgname=ORGANIZATION)) assert 'newteam' in json['teams'] class TestDeleteOrganizationTeam(ApiTestCase): def test_deleteteam(self): self.login(ADMIN_ACCESS_USER) self.deleteResponse(OrganizationTeam, params=dict(orgname=ORGANIZATION, teamname='readers')) # Make sure the team was deleted json = self.getJsonResponse(Organization, params=dict(orgname=ORGANIZATION)) assert not 'readers' in json['teams'] def test_attemptdeleteowners(self): self.login(ADMIN_ACCESS_USER) self.deleteResponse(OrganizationTeam, params=dict(orgname=ORGANIZATION, teamname='owners'), expected_code=400) class TestGetOrganizationTeamMembers(ApiTestCase): def test_invalidteam(self): self.login(ADMIN_ACCESS_USER) self.getResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='notvalid'), expected_code=404) def test_getmembers(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers')) self.assertEquals(READ_ACCESS_USER, json['members'][1]['name']) class TestUpdateOrganizationTeamMember(ApiTestCase): def test_addmember_alreadyteammember(self): self.login(ADMIN_ACCESS_USER) membername = READ_ACCESS_USER self.putResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='readers', membername=membername), expected_code=400) def test_addmember_orgmember(self): self.login(ADMIN_ACCESS_USER) membername = READ_ACCESS_USER self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='owners', membername=membername)) # Verify the user was added to the team. json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='owners')) self.assertInTeam(json, membername) def test_addmember_robot(self): self.login(ADMIN_ACCESS_USER) membername = ORGANIZATION + '+coolrobot' self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='readers', membername=membername)) # Verify the user was added to the team. json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers')) self.assertInTeam(json, membername) def test_addmember_invalidrobot(self): self.login(ADMIN_ACCESS_USER) membername = 'freshuser+anotherrobot' self.putResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='readers', membername=membername), expected_code=400) def test_addmember_nonorgmember(self): self.login(ADMIN_ACCESS_USER) membername = NO_ACCESS_USER response = self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='owners', membername=membername)) self.assertEquals(True, response['invited']) # Make sure the user is not (yet) part of the team. json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers')) for member in json['members']: self.assertNotEqual(membername, member['name']) class TestAcceptTeamMemberInvite(ApiTestCase): def assertInTeam(self, data, membername): for memberData in data['members']: if memberData['name'] == membername: return self.fail(membername + ' not found in team: ' + json.dumps(data)) def test_accept(self): self.login(ADMIN_ACCESS_USER) # Create the invite. membername = NO_ACCESS_USER response = self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='owners', membername=membername)) self.assertEquals(True, response['invited']) # Login as the user. self.login(membername) # Accept the invite. user = model.get_user(membername) invites = list(model.lookup_team_invites(user)) self.assertEquals(1, len(invites)) self.putJsonResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token)) # Verify the user is now on the team. json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='owners')) self.assertInTeam(json, membername) # Verify the accept now fails. self.putResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token), expected_code=400) class TestDeclineTeamMemberInvite(ApiTestCase): def test_decline_wronguser(self): self.login(ADMIN_ACCESS_USER) # Create the invite. membername = NO_ACCESS_USER response = self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='owners', membername=membername)) self.assertEquals(True, response['invited']) # Try to decline the invite. user = model.get_user(membername) invites = list(model.lookup_team_invites(user)) self.assertEquals(1, len(invites)) self.deleteResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token), expected_code=400) def test_decline(self): self.login(ADMIN_ACCESS_USER) # Create the invite. membername = NO_ACCESS_USER response = self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='owners', membername=membername)) self.assertEquals(True, response['invited']) # Login as the user. self.login(membername) # Decline the invite. user = model.get_user(membername) invites = list(model.lookup_team_invites(user)) self.assertEquals(1, len(invites)) self.deleteResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token)) # Make sure the invite was deleted. self.deleteResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token), expected_code=400) class TestDeleteOrganizationTeamMember(ApiTestCase): def test_deletememberinvite(self): self.login(ADMIN_ACCESS_USER) membername = NO_ACCESS_USER response = self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='readers', membername=membername)) self.assertEquals(True, response['invited']) # Verify the invite was added. json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers', includePending=True)) assert len(json['members']) == 3 # Delete the invite. self.deleteResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='readers', membername=membername)) # Verify the user was removed from the team. json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers', includePending=True)) assert len(json['members']) == 2 def test_deletemember(self): self.login(ADMIN_ACCESS_USER) self.deleteResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='readers', membername=READ_ACCESS_USER)) # Verify the user was removed from the team. json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers')) assert len(json['members']) == 1 class TestCreateRepo(ApiTestCase): def test_duplicaterepo(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(RepositoryList, data=dict(repository='simple', visibility='public', description=''), expected_code=400) self.assertEquals('Repository already exists', json['message']) def test_createrepo(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(RepositoryList, data=dict(repository='newrepo', visibility='public', description=''), expected_code=201) self.assertEquals(ADMIN_ACCESS_USER, json['namespace']) self.assertEquals('newrepo', json['name']) def test_createrepo_underorg(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(RepositoryList, data=dict(namespace=ORGANIZATION, repository='newrepo', visibility='private', description=''), expected_code=201) self.assertEquals(ORGANIZATION, json['namespace']) self.assertEquals('newrepo', json['name']) class TestFindRepos(ApiTestCase): def test_findrepos_asguest(self): json = self.getJsonResponse(FindRepositories, params=dict(query='p')) self.assertEquals(len(json['repositories']), 1) self.assertEquals(json['repositories'][0]['namespace'], 'public') self.assertEquals(json['repositories'][0]['name'], 'publicrepo') def test_findrepos_asuser(self): self.login(NO_ACCESS_USER) json = self.getJsonResponse(FindRepositories, params=dict(query='p')) self.assertEquals(len(json['repositories']), 1) self.assertEquals(json['repositories'][0]['namespace'], 'public') self.assertEquals(json['repositories'][0]['name'], 'publicrepo') def test_findrepos_orgmember(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(FindRepositories, params=dict(query='p')) self.assertGreater(len(json['repositories']), 1) class TestListRepos(ApiTestCase): def test_listrepos_asguest(self): json = self.getJsonResponse(RepositoryList, params=dict(public=True)) self.assertEquals(len(json['repositories']), 1) def test_listrepos_orgmember(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(RepositoryList, params=dict(public=True)) self.assertGreater(len(json['repositories']), 1) def test_listrepos_filter(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(RepositoryList, params=dict(namespace=ORGANIZATION, public=False)) for repo in json['repositories']: self.assertEquals(ORGANIZATION, repo['namespace']) def test_listrepos_limit(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(RepositoryList, params=dict(limit=2)) self.assertEquals(len(json['repositories']), 2) class TestUpdateRepo(ApiTestCase): SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple' def test_updatedescription(self): self.login(ADMIN_ACCESS_USER) self.putJsonResponse(Repository, params=dict(repository=self.SIMPLE_REPO), data=dict(description='Some cool repo')) # Verify the repo description was updated. json = self.getJsonResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) self.assertEquals('Some cool repo', json['description']) class TestChangeRepoVisibility(ApiTestCase): SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple' def test_changevisibility(self): self.login(ADMIN_ACCESS_USER) # Make public. self.postJsonResponse(RepositoryVisibility, params=dict(repository=self.SIMPLE_REPO), data=dict(visibility='public')) # Verify the visibility. json = self.getJsonResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) self.assertEquals(True, json['is_public']) # Make private. self.postJsonResponse(RepositoryVisibility, params=dict(repository=self.SIMPLE_REPO), data=dict(visibility='private')) # Verify the visibility. json = self.getJsonResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) self.assertEquals(False, json['is_public']) class TestDeleteRepository(ApiTestCase): SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple' COMPLEX_REPO = ADMIN_ACCESS_USER + '/complex' def test_deleterepo(self): self.login(ADMIN_ACCESS_USER) self.deleteResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) # Verify the repo was deleted. self.getResponse(Repository, params=dict(repository=self.SIMPLE_REPO), expected_code=404) def test_deleterepo2(self): self.login(ADMIN_ACCESS_USER) self.deleteResponse(Repository, params=dict(repository=self.COMPLEX_REPO)) # Verify the repo was deleted. self.getResponse(Repository, params=dict(repository=self.COMPLEX_REPO), expected_code=404) def test_populate_and_delete_repo(self): self.login(ADMIN_ACCESS_USER) # Make sure the repository has come images and tags. self.assertTrue(len(list(model.get_repository_images(ADMIN_ACCESS_USER, 'complex'))) > 0) self.assertTrue(len(list(model.list_repository_tags(ADMIN_ACCESS_USER, 'complex'))) > 0) # Add some data for the repository, in addition to is already existing images and tags. repository = model.get_repository(ADMIN_ACCESS_USER, 'complex') # Create some access tokens. access_token = model.create_access_token(repository, 'read') model.create_access_token(repository, 'write') delegate_token = model.create_delegate_token(ADMIN_ACCESS_USER, 'complex', 'sometoken', 'read') model.create_delegate_token(ADMIN_ACCESS_USER, 'complex', 'sometoken', 'write') # Create some repository builds. model.create_repository_build(repository, access_token, {}, 'someid', 'foobar') model.create_repository_build(repository, delegate_token, {}, 'someid2', 'foobar2') # Create some notifications. model.create_repo_notification(repository, 'repo_push', 'hipchat', {}) model.create_repo_notification(repository, 'build_queued', 'slack', {}) # Create some logs. model.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository) model.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository) # Create some build triggers. user = model.get_user(ADMIN_ACCESS_USER) model.create_build_trigger(repository, 'github', 'sometoken', user) model.create_build_trigger(repository, 'github', 'anothertoken', user) # Create some email authorizations. model.create_email_authorization_for_repo(ADMIN_ACCESS_USER, 'complex', 'a@b.com') model.create_email_authorization_for_repo(ADMIN_ACCESS_USER, 'complex', 'b@c.com') # Delete the repository. self.deleteResponse(Repository, params=dict(repository=self.COMPLEX_REPO)) # Verify the repo was deleted. self.getResponse(Repository, params=dict(repository=self.COMPLEX_REPO), expected_code=404) class TestGetRepository(ApiTestCase): PUBLIC_REPO = PUBLIC_USER + '/publicrepo' def test_getrepo_badnames(self): self.login(ADMIN_ACCESS_USER) bad_names = ['logs', 'build', 'tokens', 'foo.bar', 'foo-bar', 'foo_bar'] # For each bad name, create the repo. for bad_name in bad_names: json = self.postJsonResponse(RepositoryList, expected_code=201, data=dict(repository=bad_name, visibility='public', description='')) # Make sure we can retrieve its information. json = self.getJsonResponse(Repository, params=dict(repository=ADMIN_ACCESS_USER + '/' + bad_name)) self.assertEquals(ADMIN_ACCESS_USER, json['namespace']) self.assertEquals(bad_name, json['name']) self.assertEquals(True, json['is_public']) def test_getrepo_public_asguest(self): json = self.getJsonResponse(Repository, params=dict(repository=self.PUBLIC_REPO)) self.assertEquals(PUBLIC_USER, json['namespace']) self.assertEquals('publicrepo', json['name']) self.assertEquals(True, json['is_public']) self.assertEquals(False, json['is_organization']) self.assertEquals(False, json['is_building']) self.assertEquals(False, json['can_write']) self.assertEquals(False, json['can_admin']) assert 'latest' in json['tags'] def test_getrepo_public_asowner(self): self.login(PUBLIC_USER) json = self.getJsonResponse(Repository, params=dict(repository=self.PUBLIC_REPO)) self.assertEquals(False, json['is_organization']) self.assertEquals(True, json['can_write']) self.assertEquals(True, json['can_admin']) def test_getrepo_building(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(Repository, params=dict(repository=ADMIN_ACCESS_USER + '/building')) self.assertEquals(True, json['can_write']) self.assertEquals(True, json['can_admin']) self.assertEquals(True, json['is_building']) self.assertEquals(False, json['is_organization']) def test_getrepo_org_asnonmember(self): self.getResponse(Repository, params=dict(repository=ORGANIZATION + '/' + ORG_REPO), expected_code=401) def test_getrepo_org_asreader(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(Repository, params=dict(repository=ORGANIZATION + '/' + ORG_REPO)) self.assertEquals(ORGANIZATION, json['namespace']) self.assertEquals(ORG_REPO, json['name']) self.assertEquals(False, json['can_write']) self.assertEquals(False, json['can_admin']) self.assertEquals(True, json['is_organization']) def test_getrepo_org_asadmin(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(Repository, params=dict(repository=ORGANIZATION + '/' + ORG_REPO)) self.assertEquals(True, json['can_write']) self.assertEquals(True, json['can_admin']) self.assertEquals(True, json['is_organization']) class TestRepositoryBuildResource(ApiTestCase): def test_cancel_invalidbuild(self): self.login(ADMIN_ACCESS_USER) self.deleteResponse(RepositoryBuildResource, params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid='invalid'), expected_code=404) def test_cancel_waitingbuild(self): self.login(ADMIN_ACCESS_USER) # Request a (fake) build. json = self.postJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz'), expected_code=201) uuid = json['id'] # Check for the build. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) self.assertEquals(1, len(json['builds'])) self.assertEquals(uuid, json['builds'][0]['id']) # Find the build's queue item. build_ref = database.RepositoryBuild.get(uuid=uuid) queue_item = database.QueueItem.get(id=build_ref.queue_id) self.assertTrue(queue_item.available) self.assertTrue(queue_item.retries_remaining > 0) # Cancel the build. self.deleteResponse(RepositoryBuildResource, params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid=uuid), expected_code=201) # Check for the build. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) self.assertEquals(0, len(json['builds'])) # Check for the build's queue item. try: database.QueueItem.get(id=build_ref.queue_id) self.fail('QueueItem still exists for build') except database.QueueItem.DoesNotExist: pass def test_attemptcancel_scheduledbuild(self): self.login(ADMIN_ACCESS_USER) # Request a (fake) build. json = self.postJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz'), expected_code=201) uuid = json['id'] # Check for the build. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) self.assertEquals(1, len(json['builds'])) self.assertEquals(uuid, json['builds'][0]['id']) # Set queue item to be picked up. build_ref = database.RepositoryBuild.get(uuid=uuid) qi = database.QueueItem.get(id=build_ref.queue_id) qi.available = False qi.save() # Try to cancel the build. self.deleteResponse(RepositoryBuildResource, params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid=uuid), expected_code=400) def test_attemptcancel_workingbuild(self): self.login(ADMIN_ACCESS_USER) # Request a (fake) build. json = self.postJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz'), expected_code=201) uuid = json['id'] # Check for the build. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) self.assertEquals(1, len(json['builds'])) self.assertEquals(uuid, json['builds'][0]['id']) # Set the build to a different phase. rb = database.RepositoryBuild.get(uuid=uuid) rb.phase = database.BUILD_PHASE.BUILDING rb.save() # Try to cancel the build. self.deleteResponse(RepositoryBuildResource, params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid=uuid), expected_code=400) class TestRepoBuilds(ApiTestCase): def test_getrepo_nobuilds(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['builds']) == 0 def test_getrepobuilds(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/building')) assert len(json['builds']) > 0 build = json['builds'][-1] assert 'id' in build assert 'status' in build # Check the status endpoint. status_json = self.getJsonResponse(RepositoryBuildStatus, params=dict(repository=ADMIN_ACCESS_USER + '/building', build_uuid=build['id'])) self.assertEquals(status_json['id'], build['id']) self.assertEquals(status_json['resource_key'], build['resource_key']) self.assertEquals(status_json['trigger'], build['trigger']) class TestRequestRepoBuild(ApiTestCase): def test_requestrepobuild(self): self.login(ADMIN_ACCESS_USER) # Ensure we are not yet building. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['builds']) == 0 # Request a (fake) build. self.postResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz'), expected_code=201) # Check for the build. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/building')) assert len(json['builds']) > 0 def test_requestrepobuild_with_robot(self): self.login(ADMIN_ACCESS_USER) # Ensure we are not yet building. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['builds']) == 0 # Request a (fake) build. pull_robot = ADMIN_ACCESS_USER + '+dtrobot' self.postResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz', pull_robot=pull_robot), expected_code=201) # Check for the build. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/building')) assert len(json['builds']) > 0 def test_requestrepobuild_with_invalid_robot(self): self.login(ADMIN_ACCESS_USER) # Request a (fake) build. pull_robot = ADMIN_ACCESS_USER + '+invalidrobot' self.postResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz', pull_robot=pull_robot), expected_code=404) def test_requestrepobuild_with_unauthorized_robot(self): self.login(ADMIN_ACCESS_USER) # Request a (fake) build. pull_robot = 'freshuser+anotherrobot' self.postResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz', pull_robot=pull_robot), expected_code=403) class TestRepositoryEmail(ApiTestCase): def test_emailnotauthorized(self): self.login(ADMIN_ACCESS_USER) # Verify the e-mail address is not authorized. json = self.getResponse(RepositoryAuthorizedEmail, params=dict(repository=ADMIN_ACCESS_USER + '/simple', email='test@example.com'), expected_code=404) def test_emailnotauthorized_butsent(self): self.login(ADMIN_ACCESS_USER) # Verify the e-mail address is not authorized. json = self.getJsonResponse(RepositoryAuthorizedEmail, params=dict(repository=ADMIN_ACCESS_USER + '/simple', email='jschorr+other@devtable.com')) self.assertEquals(False, json['confirmed']) self.assertEquals(ADMIN_ACCESS_USER, json['namespace']) self.assertEquals('simple', json['repository']) def test_emailauthorized(self): self.login(ADMIN_ACCESS_USER) # Verify the e-mail address is authorized. json = self.getJsonResponse(RepositoryAuthorizedEmail, params=dict(repository=ADMIN_ACCESS_USER + '/simple', email='jschorr@devtable.com')) self.assertEquals(True, json['confirmed']) self.assertEquals(ADMIN_ACCESS_USER, json['namespace']) self.assertEquals('simple', json['repository']) def test_send_email_authorization(self): self.login(ADMIN_ACCESS_USER) # Send the email. json = self.postJsonResponse(RepositoryAuthorizedEmail, params=dict(repository=ADMIN_ACCESS_USER + '/simple', email='jschorr+foo@devtable.com')) self.assertEquals(False, json['confirmed']) self.assertEquals(ADMIN_ACCESS_USER, json['namespace']) self.assertEquals('simple', json['repository']) class TestRepositoryNotifications(ApiTestCase): def test_webhooks(self): self.login(ADMIN_ACCESS_USER) # Add a notification. json = self.postJsonResponse(RepositoryNotificationList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(config={'url': 'http://example.com'}, event='repo_push', method='webhook'), expected_code=201) self.assertEquals('repo_push', json['event']) self.assertEquals('webhook', json['method']) self.assertEquals('http://example.com', json['config']['url']) wid = json['uuid'] # Get the notification. json = self.getJsonResponse(RepositoryNotification, params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=wid)) self.assertEquals(wid, json['uuid']) self.assertEquals('repo_push', json['event']) self.assertEquals('webhook', json['method']) # Verify the notification is listed. json = self.getJsonResponse(RepositoryNotificationList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) ids = [w['uuid'] for w in json['notifications']] assert wid in ids # Delete the notification. self.deleteResponse(RepositoryNotification, params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=wid), expected_code=204) # Verify the notification is gone. self.getResponse(RepositoryNotification, params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=wid), expected_code=404) class TestListAndGetImage(ApiTestCase): def test_listandgetimages(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(RepositoryImageList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['images']) > 0 for image in json['images']: assert 'id' in image assert 'tags' in image assert 'created' in image assert 'comment' in image assert 'command' in image assert 'ancestors' in image assert 'size' in image ijson = self.getJsonResponse(RepositoryImage, params=dict(repository=ADMIN_ACCESS_USER + '/simple', image_id=image['id'])) self.assertEquals(image['id'], ijson['id']) class TestGetImageChanges(ApiTestCase): def test_getimagechanges(self): self.login(ADMIN_ACCESS_USER) # Find an image to check. json = self.getJsonResponse(RepositoryImageList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) image_id = json['images'][0]['id'] # Lookup the image's changes. # TODO: Fix me once we can get fake changes into the test data #self.getJsonResponse(RepositoryImageChanges, # params=dict(repository=ADMIN_ACCESS_USER + '/simple', # image_id=image_id)) class TestRevertTag(ApiTestCase): def test_reverttag_invalidtag(self): self.login(ADMIN_ACCESS_USER) self.postResponse(RevertTag, params=dict(repository=ADMIN_ACCESS_USER + '/history', tag='invalidtag'), data=dict(image='invalid_image'), expected_code=404) def test_reverttag_invalidimage(self): self.login(ADMIN_ACCESS_USER) self.postResponse(RevertTag, params=dict(repository=ADMIN_ACCESS_USER + '/history', tag='latest'), data=dict(image='invalid_image'), expected_code=400) def test_reverttag(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(ListRepositoryTags, params=dict(repository=ADMIN_ACCESS_USER + '/history', tag='latest')) self.assertEquals(2, len(json['tags'])) self.assertFalse('end_ts' in json['tags'][0]) previous_image_id = json['tags'][1]['docker_image_id'] self.postJsonResponse(RevertTag, params=dict(repository=ADMIN_ACCESS_USER + '/history', tag='latest'), data=dict(image=previous_image_id)) json = self.getJsonResponse(ListRepositoryTags, params=dict(repository=ADMIN_ACCESS_USER + '/history', tag='latest')) self.assertEquals(3, len(json['tags'])) self.assertFalse('end_ts' in json['tags'][0]) self.assertEquals(previous_image_id, json['tags'][0]['docker_image_id']) class TestListAndDeleteTag(ApiTestCase): def test_listdeletecreateandmovetag(self): self.login(ADMIN_ACCESS_USER) # List the images for prod. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) prod_images = json['images'] assert len(prod_images) > 0 # List the images for staging. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging')) staging_images = json['images'] assert len(prod_images) == len(staging_images) + 1 # Delete prod. self.deleteResponse(RepositoryTag, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'), expected_code=204) # Make sure the tag is gone. self.getResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'), expected_code=404) # Make the sure the staging images are still there. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging')) self.assertEquals(staging_images, json['images']) # Add a new tag to the staging image. self.putResponse(RepositoryTag, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'), data=dict(image=staging_images[0]['id']), expected_code=201) # Make sure the tag is present. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag')) sometag_images = json['images'] self.assertEquals(sometag_images, staging_images) # Move the tag. self.putResponse(RepositoryTag, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'), data=dict(image=staging_images[-1]['id']), expected_code=201) # Make sure the tag has moved. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag')) sometag_new_images = json['images'] self.assertEquals(1, len(sometag_new_images)) self.assertEquals(staging_images[-1], sometag_new_images[0]) def test_deletesubtag(self): self.login(ADMIN_ACCESS_USER) # List the images for prod. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) prod_images = json['images'] assert len(prod_images) > 0 # Delete staging. self.deleteResponse(RepositoryTag, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'), expected_code=204) # Make sure the prod images are still around. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) self.assertEquals(prod_images, json['images']) class TestRepoPermissions(ApiTestCase): def listUserPermissions(self, namespace=ADMIN_ACCESS_USER, repo='simple'): return self.getJsonResponse(RepositoryUserPermissionList, params=dict(repository=namespace + '/' + repo))['permissions'] def listTeamPermissions(self): return self.getJsonResponse(RepositoryTeamPermissionList, params=dict(repository=ORGANIZATION + '/' + ORG_REPO))['permissions'] def test_userpermissions_underorg(self): self.login(ADMIN_ACCESS_USER) permissions = self.listUserPermissions(namespace=ORGANIZATION, repo=ORG_REPO) self.assertEquals(1, len(permissions)) assert 'outsideorg' in permissions self.assertEquals('read', permissions['outsideorg']['role']) self.assertEquals(False, permissions['outsideorg']['is_org_member']) # Add another user. self.putJsonResponse(RepositoryUserPermission, params=dict(repository=ORGANIZATION + '/' + ORG_REPO, username=ADMIN_ACCESS_USER), data=dict(role='admin')) # Verify the user is present. permissions = self.listUserPermissions(namespace=ORGANIZATION, repo=ORG_REPO) self.assertEquals(2, len(permissions)) assert ADMIN_ACCESS_USER in permissions self.assertEquals('admin', permissions[ADMIN_ACCESS_USER]['role']) self.assertEquals(True, permissions[ADMIN_ACCESS_USER]['is_org_member']) def test_userpermissions(self): self.login(ADMIN_ACCESS_USER) # The repo should start with just the admin as a user perm. permissions = self.listUserPermissions() self.assertEquals(1, len(permissions)) assert ADMIN_ACCESS_USER in permissions self.assertEquals('admin', permissions[ADMIN_ACCESS_USER]['role']) self.assertFalse('is_org_member' in permissions[ADMIN_ACCESS_USER]) # Add another user. self.putJsonResponse(RepositoryUserPermission, params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER), data=dict(role='read')) # Verify the user is present. permissions = self.listUserPermissions() self.assertEquals(2, len(permissions)) assert NO_ACCESS_USER in permissions self.assertEquals('read', permissions[NO_ACCESS_USER]['role']) self.assertFalse('is_org_member' in permissions[NO_ACCESS_USER]) json = self.getJsonResponse(RepositoryUserPermission, params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER)) self.assertEquals('read', json['role']) # Change the user's permissions. self.putJsonResponse(RepositoryUserPermission, params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER), data=dict(role='admin')) # Verify. permissions = self.listUserPermissions() self.assertEquals(2, len(permissions)) assert NO_ACCESS_USER in permissions self.assertEquals('admin', permissions[NO_ACCESS_USER]['role']) # Delete the user's permission. self.deleteResponse(RepositoryUserPermission, params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER)) # Verify. permissions = self.listUserPermissions() self.assertEquals(1, len(permissions)) assert not NO_ACCESS_USER in permissions def test_teampermissions(self): self.login(ADMIN_ACCESS_USER) # The repo should start with just the readers as a team perm. permissions = self.listTeamPermissions() self.assertEquals(1, len(permissions)) assert 'readers' in permissions self.assertEquals('read', permissions['readers']['role']) # Add another team. self.putJsonResponse(RepositoryTeamPermission, params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'), data=dict(role='write')) # Verify the team is present. permissions = self.listTeamPermissions() self.assertEquals(2, len(permissions)) assert 'owners' in permissions self.assertEquals('write', permissions['owners']['role']) json = self.getJsonResponse(RepositoryTeamPermission, params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners')) self.assertEquals('write', json['role']) # Change the team's permissions. self.putJsonResponse(RepositoryTeamPermission, params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'), data=dict(role='admin')) # Verify. permissions = self.listTeamPermissions() self.assertEquals(2, len(permissions)) assert 'owners' in permissions self.assertEquals('admin', permissions['owners']['role']) # Delete the team's permission. self.deleteResponse(RepositoryTeamPermission, params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners')) # Verify. permissions = self.listTeamPermissions() self.assertEquals(1, len(permissions)) assert not 'owners' in permissions class TestApiTokens(ApiTestCase): def listTokens(self): return self.getJsonResponse(RepositoryTokenList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'))['tokens'] def test_tokens(self): self.login(ADMIN_ACCESS_USER) # Create a new token. json = self.postJsonResponse(RepositoryTokenList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(role='read', friendlyName='mytoken'), expected_code=201) self.assertEquals('mytoken', json['friendlyName']) self.assertEquals('read', json['role']) token_code = json['code'] # Verify. tokens = self.listTokens() assert token_code in tokens self.assertEquals('mytoken', tokens[token_code]['friendlyName']) json = self.getJsonResponse(RepositoryToken, params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) self.assertEquals(tokens[token_code], json) # Change the token's permission. self.putJsonResponse(RepositoryToken, params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code), data=dict(role='write')) # Verify. json = self.getJsonResponse(RepositoryToken, params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) self.assertEquals('write', json['role']) # Delete the token. self.deleteResponse(RepositoryToken, params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) # Verify. self.getResponse(RepositoryToken, params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code), expected_code=404) class TestUserCard(ApiTestCase): def test_getusercard(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(UserCard) self.assertEquals('4242', json['card']['last4']) self.assertEquals('Visa', json['card']['type']) def test_setusercard_error(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(UserCard, data=dict(token='sometoken'), expected_code=402) assert 'carderror' in json class TestOrgCard(ApiTestCase): def test_getorgcard(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrganizationCard, params=dict(orgname=ORGANIZATION)) self.assertEquals('4242', json['card']['last4']) self.assertEquals('Visa', json['card']['type']) class TestUserSubscription(ApiTestCase): def getSubscription(self): return self.getJsonResponse(UserPlan) def test_updateplan(self): self.login(ADMIN_ACCESS_USER) # Change the plan. self.putJsonResponse(UserPlan, data=dict(plan='free')) # Verify sub = self.getSubscription() self.assertEquals('free', sub['plan']) # Change the plan. self.putJsonResponse(UserPlan, data=dict(plan='bus-large')) # Verify sub = self.getSubscription() self.assertEquals('bus-large', sub['plan']) class TestOrgSubscription(ApiTestCase): def getSubscription(self): return self.getJsonResponse(OrganizationPlan, params=dict(orgname=ORGANIZATION)) def test_updateplan(self): self.login(ADMIN_ACCESS_USER) # Change the plan. self.putJsonResponse(OrganizationPlan, params=dict(orgname=ORGANIZATION), data=dict(plan='free')) # Verify sub = self.getSubscription() self.assertEquals('free', sub['plan']) # Change the plan. self.putJsonResponse(OrganizationPlan, params=dict(orgname=ORGANIZATION), data=dict(plan='bus-large')) # Verify sub = self.getSubscription() self.assertEquals('bus-large', sub['plan']) class TestUserRobots(ApiTestCase): def getRobotNames(self): return [r['name'] for r in self.getJsonResponse(UserRobotList)['robots']] def test_robots(self): self.login(NO_ACCESS_USER) # Create a robot. json = self.putJsonResponse(UserRobot, params=dict(robot_shortname='bender'), expected_code=201) self.assertEquals(NO_ACCESS_USER + '+bender', json['name']) # Verify. robots = self.getRobotNames() assert NO_ACCESS_USER + '+bender' in robots # Delete the robot. self.deleteResponse(UserRobot, params=dict(robot_shortname='bender')) # Verify. robots = self.getRobotNames() assert not NO_ACCESS_USER + '+bender' in robots def test_regenerate(self): self.login(NO_ACCESS_USER) # Create a robot. json = self.putJsonResponse(UserRobot, params=dict(robot_shortname='bender'), expected_code=201) token = json['token'] # Regenerate the robot. json = self.postJsonResponse(RegenerateUserRobot, params=dict(robot_shortname='bender'), expected_code=200) # Verify the token changed. self.assertNotEquals(token, json['token']) json2 = self.getJsonResponse(UserRobot, params=dict(robot_shortname='bender'), expected_code=200) self.assertEquals(json['token'], json2['token']) class TestOrgRobots(ApiTestCase): def getRobotNames(self): return [r['name'] for r in self.getJsonResponse(OrgRobotList, params=dict(orgname=ORGANIZATION))['robots']] def test_delete_robot_after_use(self): self.login(ADMIN_ACCESS_USER) # Create the robot. self.putJsonResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender'), expected_code=201) # Add the robot to a team. membername = ORGANIZATION + '+bender' self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='readers', membername=membername)) # Add a repository permission. self.putJsonResponse(RepositoryUserPermission, params=dict(repository=ORGANIZATION + '/' + ORG_REPO, username=membername), data=dict(role='read')) # Add a permission prototype with the robot as the activating user. self.postJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION), data=dict(role='read', activating_user={'name': membername}, delegate={'kind': 'user', 'name': membername})) # Add a permission prototype with the robot as the delegating user. self.postJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION), data=dict(role='read', delegate={'kind': 'user', 'name': membername})) # Add a build trigger with the robot as the pull robot. database.BuildTriggerService.create(name='fakeservice') # Add a new fake trigger. repo = model.get_repository(ORGANIZATION, ORG_REPO) user = model.get_user(ADMIN_ACCESS_USER) pull_robot = model.get_user(membername) model.create_build_trigger(repo, 'fakeservice', 'sometoken', user, pull_robot=pull_robot) # Add some log entries for the robot. model.log_action('pull_repo', ORGANIZATION, performer=pull_robot, repository=repo) # Delete the robot and verify it works. self.deleteResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender')) # All the above records should now be deleted, along with the robot. We verify a few of the # critical ones below. # Check the team. team = model.get_organization_team(ORGANIZATION, 'readers') members = [member.username for member in model.get_organization_team_members(team.id)] self.assertFalse(membername in members) # Check the robot itself. self.assertIsNone(model.get_user(membername)) def test_robots(self): self.login(ADMIN_ACCESS_USER) # Create a robot. json = self.putJsonResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender'), expected_code=201) self.assertEquals(ORGANIZATION + '+bender', json['name']) # Verify. robots = self.getRobotNames() assert ORGANIZATION + '+bender' in robots # Delete the robot. self.deleteResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender')) # Verify. robots = self.getRobotNames() assert not ORGANIZATION + '+bender' in robots def test_regenerate(self): self.login(ADMIN_ACCESS_USER) # Create a robot. json = self.putJsonResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender'), expected_code=201) token = json['token'] # Regenerate the robot. json = self.postJsonResponse(RegenerateOrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender'), expected_code=200) # Verify the token changed. self.assertNotEquals(token, json['token']) json2 = self.getJsonResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender'), expected_code=200) self.assertEquals(json['token'], json2['token']) class TestLogs(ApiTestCase): def test_user_logs(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(UserLogs) assert 'logs' in json assert 'start_time' in json assert 'end_time' in json def test_org_logs(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrgLogs, params=dict(orgname=ORGANIZATION)) assert 'logs' in json assert 'start_time' in json assert 'end_time' in json def test_performer(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrgLogs, params=dict(orgname=ORGANIZATION)) all_logs = json['logs'] json = self.getJsonResponse(OrgLogs, params=dict(performer=READ_ACCESS_USER, orgname=ORGANIZATION)) assert len(json['logs']) < len(all_logs) for log in json['logs']: self.assertEquals(READ_ACCESS_USER, log['performer']['name']) class TestApplicationInformation(ApiTestCase): def test_get_info(self): json = self.getJsonResponse(ApplicationInformation, params=dict(client_id=FAKE_APPLICATION_CLIENT_ID)) assert 'name' in json assert 'uri' in json assert 'organization' in json def test_get_invalid_info(self): self.getJsonResponse(ApplicationInformation, params=dict(client_id='invalid-code'), expected_code=404) class TestOrganizationApplications(ApiTestCase): def test_list_create_applications(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION)) self.assertEquals(2, len(json['applications'])) found = False for application in json['applications']: if application['client_id'] == FAKE_APPLICATION_CLIENT_ID: found = True break self.assertTrue(found) # Add a new application. json = self.postJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION), data=dict(name="Some cool app", description="foo")) self.assertEquals("Some cool app", json['name']) self.assertEquals("foo", json['description']) # Retrieve the apps list again list_json = self.getJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION)) self.assertEquals(3, len(list_json['applications'])) class TestOrganizationApplicationResource(ApiTestCase): def test_get_edit_delete_application(self): self.login(ADMIN_ACCESS_USER) # Retrieve the application. json = self.getJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) self.assertEquals(FAKE_APPLICATION_CLIENT_ID, json['client_id']) # Edit the application. edit_json = self.putJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID), data=dict(name="Some App", description="foo", application_uri="bar", redirect_uri="baz", avatar_email="meh")) self.assertEquals(FAKE_APPLICATION_CLIENT_ID, edit_json['client_id']) self.assertEquals("Some App", edit_json['name']) self.assertEquals("foo", edit_json['description']) self.assertEquals("bar", edit_json['application_uri']) self.assertEquals("baz", edit_json['redirect_uri']) self.assertEquals("meh", edit_json['avatar_email']) # Retrieve the application again. json = self.getJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) self.assertEquals(json, edit_json) # Delete the application. self.deleteResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) # Make sure the application is gone. self.getJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID), expected_code=404) class TestOrganizationApplicationResetClientSecret(ApiTestCase): def test_reset_client_secret(self): self.login(ADMIN_ACCESS_USER) # Retrieve the application. json = self.getJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) self.assertEquals(FAKE_APPLICATION_CLIENT_ID, json['client_id']) # Reset the client secret. reset_json = self.postJsonResponse(OrganizationApplicationResetClientSecret, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) self.assertEquals(FAKE_APPLICATION_CLIENT_ID, reset_json['client_id']) self.assertNotEquals(reset_json['client_secret'], json['client_secret']) # Verify it was changed in the DB. json = self.getJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) self.assertEquals(reset_json['client_secret'], json['client_secret']) class FakeBuildTrigger(BuildTriggerBase): @classmethod def service_name(cls): return 'fakeservice' def list_build_sources(self, auth_token): return [{'first': 'source'}, {'second': auth_token}] def list_build_subdirs(self, auth_token, config): return [auth_token, 'foo', 'bar', config['somevalue']] def handle_trigger_request(self, request, auth_token, config): return ('foo', ['bar'], 'build-name', 'subdir', {'foo': 'bar'}) def is_active(self, config): return 'active' in config and config['active'] def activate(self, trigger_uuid, standard_webhook_url, auth_token, config): config['active'] = True return config def deactivate(self, auth_token, config): config['active'] = False return config def manual_start(self, auth_token, config, run_parameters=None): return ('foo', ['bar'], 'build-name', 'subdir', {'foo': 'bar'}) def dockerfile_url(self, auth_token, config): return 'http://some/url' def load_dockerfile_contents(self, auth_token, config): if not 'dockerfile' in config: return None return config['dockerfile'] def list_field_values(self, auth_token, config, field_name): if field_name == 'test_field': return [1, 2, 3] return None class TestBuildTriggers(ApiTestCase): def test_list_build_triggers(self): self.login(ADMIN_ACCESS_USER) # Check a repo with no known triggers. json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) self.assertEquals(0, len(json['triggers'])) # Check a repo with one known trigger. json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building')) self.assertEquals(1, len(json['triggers'])) trigger = json['triggers'][0] assert 'id' in trigger assert 'is_active' in trigger assert 'config' in trigger assert 'service' in trigger # Verify the get trigger method. trigger_json = self.getJsonResponse(BuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/building', trigger_uuid=trigger['id'])) self.assertEquals(trigger, trigger_json) # Check the recent builds for the trigger. builds_json = self.getJsonResponse(TriggerBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/building', trigger_uuid=trigger['id'])) assert 'builds' in builds_json def test_delete_build_trigger(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building')) self.assertEquals(1, len(json['triggers'])) trigger = json['triggers'][0] # Delete the trigger. self.deleteResponse(BuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/building', trigger_uuid=trigger['id'])) # Verify it was deleted. json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building')) self.assertEquals(0, len(json['triggers'])) def test_analyze_fake_trigger(self): self.login(ADMIN_ACCESS_USER) database.BuildTriggerService.create(name='fakeservice') # Add a new fake trigger. repo = model.get_repository(ADMIN_ACCESS_USER, 'simple') user = model.get_user(ADMIN_ACCESS_USER) trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user) # Analyze the trigger's dockerfile: First, no dockerfile. trigger_config = {} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('error', analyze_json['status']) self.assertEquals('Could not read the Dockerfile for the trigger', analyze_json['message']) # Analyze the trigger's dockerfile: Second, missing FROM in dockerfile. trigger_config = {'dockerfile': 'MAINTAINER me'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('warning', analyze_json['status']) self.assertEquals('No FROM line found in the Dockerfile', analyze_json['message']) # Analyze the trigger's dockerfile: Third, dockerfile with public repo. trigger_config = {'dockerfile': 'FROM somerepo'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('publicbase', analyze_json['status']) # Analyze the trigger's dockerfile: Fourth, dockerfile with private repo with an invalid path. trigger_config = {'dockerfile': 'FROM localhost:5000/somepath'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('warning', analyze_json['status']) self.assertEquals('"localhost:5000/somepath" is not a valid Quay repository path', analyze_json['message']) # Analyze the trigger's dockerfile: Fifth, dockerfile with private repo that does not exist. trigger_config = {'dockerfile': 'FROM localhost:5000/nothere/randomrepo'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('error', analyze_json['status']) self.assertEquals('Repository "localhost:5000/nothere/randomrepo" referenced by the Dockerfile was not found', analyze_json['message']) # Analyze the trigger's dockerfile: Sixth, dockerfile with private repo that the user cannot see. trigger_config = {'dockerfile': 'FROM localhost:5000/randomuser/randomrepo'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('error', analyze_json['status']) self.assertEquals('Repository "localhost:5000/randomuser/randomrepo" referenced by the Dockerfile was not found', analyze_json['message']) # Analyze the trigger's dockerfile: Seventh, dockerfile with private repo that the user see. trigger_config = {'dockerfile': 'FROM localhost:5000/devtable/complex'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('analyzed', analyze_json['status']) self.assertEquals('devtable', analyze_json['namespace']) self.assertEquals('complex', analyze_json['name']) self.assertEquals(False, analyze_json['is_public']) self.assertEquals(ADMIN_ACCESS_USER + '+dtrobot', analyze_json['robots'][0]['name']) def test_fake_trigger(self): self.login(ADMIN_ACCESS_USER) database.BuildTriggerService.create(name='fakeservice') # Add a new fake trigger. repo = model.get_repository(ADMIN_ACCESS_USER, 'simple') user = model.get_user(ADMIN_ACCESS_USER) trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user) # Verify the trigger. json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) self.assertEquals(1, len(json['triggers'])) self.assertEquals(trigger.uuid, json['triggers'][0]['id']) self.assertEquals(trigger.service.name, json['triggers'][0]['service']) self.assertEquals(False, json['triggers'][0]['is_active']) # List the trigger's sources. source_json = self.getJsonResponse(BuildTriggerSources, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid)) self.assertEquals([{'first': 'source'}, {'second': 'sometoken'}], source_json['sources']) # List the trigger's subdirs. subdir_json = self.postJsonResponse(BuildTriggerSubdirs, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'somevalue': 'meh'}) self.assertEquals({'status': 'success', 'subdir': ['sometoken', 'foo', 'bar', 'meh']}, subdir_json) # Activate the trigger. trigger_config = {} activate_json = self.postJsonResponse(BuildTriggerActivate, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals(True, activate_json['is_active']) # Make sure the trigger has a write token. trigger = model.get_build_trigger(trigger.uuid) self.assertNotEquals(None, trigger.write_token) self.assertEquals(True, py_json.loads(trigger.config)['active']) # Make sure we cannot activate again. self.postResponse(BuildTriggerActivate, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}, expected_code=400) # Retrieve values for a field. result = self.postJsonResponse(BuildTriggerFieldValues, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid, field_name="test_field")) self.assertEquals(result['values'], [1, 2, 3]) self.postResponse(BuildTriggerFieldValues, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid, field_name="another_field"), expected_code = 404) # Start a manual build. start_json = self.postJsonResponse(ActivateBuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data=dict(), expected_code=201) assert 'id' in start_json self.assertEquals("build-name", start_json['display_name']) self.assertEquals(['bar'], start_json['job_config']['docker_tags']) # Verify the metadata was added. build_obj = database.RepositoryBuild.get(database.RepositoryBuild.uuid == start_json['id']) self.assertEquals('bar', py_json.loads(build_obj.job_config)['trigger_metadata']['foo']) def test_invalid_robot_account(self): self.login(ADMIN_ACCESS_USER) database.BuildTriggerService.create(name='fakeservice') # Add a new fake trigger. repo = model.get_repository(ADMIN_ACCESS_USER, 'simple') user = model.get_user(ADMIN_ACCESS_USER) trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user) # Try to activate it with an invalid robot account. trigger_config = {} activate_json = self.postJsonResponse(BuildTriggerActivate, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config, 'pull_robot': 'someinvalidrobot'}, expected_code=404) def test_unauthorized_robot_account(self): self.login(ADMIN_ACCESS_USER) database.BuildTriggerService.create(name='fakeservice') # Add a new fake trigger. repo = model.get_repository(ADMIN_ACCESS_USER, 'simple') user = model.get_user(ADMIN_ACCESS_USER) trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user) # Try to activate it with a robot account in the wrong namespace. trigger_config = {} activate_json = self.postJsonResponse(BuildTriggerActivate, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config, 'pull_robot': 'freshuser+anotherrobot'}, expected_code=403) def test_robot_account(self): self.login(ADMIN_ACCESS_USER) database.BuildTriggerService.create(name='fakeservice') # Add a new fake trigger. repo = model.get_repository(ADMIN_ACCESS_USER, 'simple') user = model.get_user(ADMIN_ACCESS_USER) trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user) # Try to activate it with a robot account. trigger_config = {} activate_json = self.postJsonResponse(BuildTriggerActivate, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config, 'pull_robot': ADMIN_ACCESS_USER + '+dtrobot'}) # Verify that the robot was saved. self.assertEquals(True, activate_json['is_active']) self.assertEquals(ADMIN_ACCESS_USER + '+dtrobot', activate_json['pull_robot']['name']) # Start a manual build. start_json = self.postJsonResponse(ActivateBuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data=dict(), expected_code=201) assert 'id' in start_json self.assertEquals("build-name", start_json['display_name']) self.assertEquals(['bar'], start_json['job_config']['docker_tags']) class TestUserAuthorizations(ApiTestCase): def test_list_get_delete_user_authorizations(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(UserAuthorizationList) self.assertEquals(1, len(json['authorizations'])) authorization = json['authorizations'][0] assert 'uuid' in authorization assert 'scopes' in authorization assert 'application' in authorization # Retrieve the authorization. get_json = self.getJsonResponse(UserAuthorization, params=dict(access_token_uuid = authorization['uuid'])) self.assertEquals(authorization, get_json) # Delete the authorization. self.deleteResponse(UserAuthorization, params=dict(access_token_uuid = authorization['uuid'])) # Verify it has been deleted. self.getJsonResponse(UserAuthorization, params=dict(access_token_uuid = authorization['uuid']), expected_code=404) class TestSuperUserLogs(ApiTestCase): def test_get_logs(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(SuperUserLogs) assert 'logs' in json assert len(json['logs']) > 0 class TestSuperUserList(ApiTestCase): def test_get_users(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(SuperUserList) assert 'users' in json assert len(json['users']) > 0 class TestUsageInformation(ApiTestCase): def test_get_usage(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(UsageInformation) assert 'usage' in json assert 'allowed' in json class TestSuperUserManagement(ApiTestCase): def test_get_user(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(SuperUserManagement, params=dict(username = 'freshuser')) self.assertEquals('freshuser', json['username']) self.assertEquals('jschorr+test@devtable.com', json['email']) self.assertEquals(False, json['super_user']) def test_delete_user(self): self.login(ADMIN_ACCESS_USER) # Verify the user exists. json = self.getJsonResponse(SuperUserManagement, params=dict(username = 'freshuser')) self.assertEquals('freshuser', json['username']) # Delete the user. self.deleteResponse(SuperUserManagement, params=dict(username = 'freshuser'), expected_code=204) # Verify the user no longer exists. self.getResponse(SuperUserManagement, params=dict(username = 'freshuser'), expected_code=404) def test_update_user(self): self.login(ADMIN_ACCESS_USER) # Verify the user exists. json = self.getJsonResponse(SuperUserManagement, params=dict(username = 'freshuser')) self.assertEquals('freshuser', json['username']) self.assertEquals('jschorr+test@devtable.com', json['email']) # Update the user. self.putJsonResponse(SuperUserManagement, params=dict(username='freshuser'), data=dict(email='foo@bar.com')) # Verify the user was updated. json = self.getJsonResponse(SuperUserManagement, params=dict(username = 'freshuser')) self.assertEquals('freshuser', json['username']) self.assertEquals('foo@bar.com', json['email']) if __name__ == '__main__': unittest.main()