import unittest import json as py_json from urllib import urlencode from urlparse import urlparse, urlunparse, parse_qs from endpoints.api import api_bp, api from endpoints.webhooks import webhooks from endpoints.trigger import BuildTrigger as BuildTriggerBase from app import app from initdb import setup_database_for_testing, finished_database_for_testing from data import model, database from endpoints.api.team import TeamMember, TeamMemberList, OrganizationTeam from endpoints.api.tag import RepositoryTagImages, RepositoryTag from endpoints.api.search import FindRepositories, EntitySearch from endpoints.api.image import RepositoryImage, RepositoryImageList from endpoints.api.build import RepositoryBuildStatus, RepositoryBuildLogs, RepositoryBuildList from endpoints.api.robot import UserRobotList, OrgRobot, OrgRobotList, UserRobot from endpoints.api.trigger import (BuildTriggerActivate, BuildTriggerSources, BuildTriggerSubdirs, TriggerBuildList, ActivateBuildTrigger, BuildTrigger, BuildTriggerList, BuildTriggerAnalyze) from endpoints.api.webhook import Webhook, WebhookList from endpoints.api.user import (PrivateRepositories, ConvertToOrganization, Signout, Signin, User, UserAuthorizationList, UserAuthorization) from endpoints.api.repotoken import RepositoryToken, RepositoryTokenList from endpoints.api.prototype import PermissionPrototype, PermissionPrototypeList from endpoints.api.logs import UserLogs, OrgLogs from endpoints.api.billing import (UserCard, UserPlan, ListPlans, OrganizationCard, OrganizationPlan) from endpoints.api.discovery import DiscoveryResource from endpoints.api.organization import (OrganizationList, OrganizationMember, OrgPrivateRepositories, OrgnaizationMemberList, Organization, ApplicationInformation, OrganizationApplications, OrganizationApplicationResource, OrganizationApplicationResetClientSecret) from endpoints.api.repository import RepositoryList, RepositoryVisibility, Repository from endpoints.api.permission import (RepositoryUserPermission, RepositoryTeamPermission, RepositoryTeamPermissionList, RepositoryUserPermissionList) from endpoints.api.superuser import SuperUserLogs, SeatUsage, SuperUserList, SuperUserManagement try: app.register_blueprint(api_bp, url_prefix='/api') except ValueError: # This blueprint was already registered pass app.register_blueprint(webhooks, url_prefix='/webhooks') NO_ACCESS_USER = 'freshuser' READ_ACCESS_USER = 'reader' ADMIN_ACCESS_USER = 'devtable' PUBLIC_USER = 'public' ADMIN_ACCESS_EMAIL = 'jschorr@devtable.com' ORG_REPO = 'orgrepo' ORGANIZATION = 'buynlarge' NEW_USER_DETAILS = { 'username': 'bobby', 'password': 'password', 'email': 'bobby@tables.com', } FAKE_APPLICATION_CLIENT_ID = 'deadbeef' CSRF_TOKEN_KEY = '_csrf_token' CSRF_TOKEN = '123csrfforme' class ApiTestCase(unittest.TestCase): maxDiff = None @staticmethod def _add_csrf(without_csrf): parts = urlparse(without_csrf) query = parse_qs(parts[4]) query[CSRF_TOKEN_KEY] = CSRF_TOKEN return urlunparse(list(parts[0:4]) + [urlencode(query)] + list(parts[5:])) def url_for(self, resource_name, params={}): url = api.url_for(resource_name, **params) url = ApiTestCase._add_csrf(url) return url def setUp(self): setup_database_for_testing(self) self.app = app.test_client() self.ctx = app.test_request_context() self.ctx.__enter__() self.setCsrfToken(CSRF_TOKEN) def tearDown(self): finished_database_for_testing(self) self.ctx.__exit__(True, None, None) def setCsrfToken(self, token): with self.app.session_transaction() as sess: sess[CSRF_TOKEN_KEY] = token def getJsonResponse(self, resource_name, params={}, expected_code=200): rv = self.app.get(api.url_for(resource_name, **params)) self.assertEquals(expected_code, rv.status_code) data = rv.data parsed = py_json.loads(data) return parsed def postResponse(self, resource_name, params={}, data={}, expected_code=200): rv = self.app.post(self.url_for(resource_name, params), data=py_json.dumps(data), headers={"Content-Type": "application/json"}) self.assertEquals(rv.status_code, expected_code) return rv.data def getResponse(self, resource_name, params={}, expected_code=200): rv = self.app.get(api.url_for(resource_name, **params)) self.assertEquals(rv.status_code, expected_code) return rv.data def putResponse(self, resource_name, params={}, data={}, expected_code=200): rv = self.app.put(self.url_for(resource_name, params), data=py_json.dumps(data), headers={"Content-Type": "application/json"}) self.assertEquals(rv.status_code, expected_code) return rv.data def deleteResponse(self, resource_name, params={}, expected_code=204): rv = self.app.delete(self.url_for(resource_name, params)) self.assertEquals(rv.status_code, expected_code) return rv.data def postJsonResponse(self, resource_name, params={}, data={}, expected_code=200): rv = self.app.post(self.url_for(resource_name, params), data=py_json.dumps(data), headers={"Content-Type": "application/json"}) if rv.status_code != expected_code: print 'Mismatch data for resource POST %s: %s' % (resource_name, rv.data) self.assertEquals(rv.status_code, expected_code) data = rv.data parsed = py_json.loads(data) return parsed def putJsonResponse(self, resource_name, params={}, data={}, expected_code=200): rv = self.app.put(self.url_for(resource_name, params), data=py_json.dumps(data), headers={"Content-Type": "application/json"}) if rv.status_code != expected_code: print 'Mismatch data for resource PUT %s: %s' % (resource_name, rv.data) self.assertEquals(rv.status_code, expected_code) data = rv.data parsed = py_json.loads(data) return parsed def login(self, username, password='password'): return self.postJsonResponse(Signin, data=dict(username=username, password=password)) class TestCSRFFailure(ApiTestCase): def test_csrf_failure(self): self.login(READ_ACCESS_USER) # Make sure a simple post call succeeds. self.putJsonResponse(User, data=dict(password='newpasswordiscool')) # Change the session's CSRF token. self.setCsrfToken('someinvalidtoken') # Verify that the call now fails. self.putJsonResponse(User, data=dict(password='newpasswordiscool'), expected_code=403) class TestDiscovery(ApiTestCase): def test_discovery(self): json = self.getJsonResponse(DiscoveryResource) assert 'apis' in json class TestPlans(ApiTestCase): def test_plans(self): json = self.getJsonResponse(ListPlans) found = set([]) for method_info in json['plans']: found.add(method_info['stripeId']) assert 'free' in found class TestLoggedInUser(ApiTestCase): def test_guest(self): self.getJsonResponse(User, expected_code=401) def test_user(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(User) assert json['anonymous'] == False assert json['username'] == READ_ACCESS_USER class TestGetUserPrivateAllowed(ApiTestCase): def test_nonallowed(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(PrivateRepositories) assert json['privateCount'] == 0 assert not json['privateAllowed'] def test_allowed(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(PrivateRepositories) assert json['privateCount'] >= 6 assert json['privateAllowed'] class TestConvertToOrganization(ApiTestCase): def test_sameadminuser(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse(ConvertToOrganization, data={'adminUser': READ_ACCESS_USER, 'adminPassword': 'password', 'plan': 'free'}, expected_code=400) self.assertEqual('The admin user is not valid', json['message']) def test_invalidadminuser(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse(ConvertToOrganization, data={'adminUser': 'unknownuser', 'adminPassword': 'password', 'plan': 'free'}, expected_code=400) self.assertEqual('The admin user credentials are not valid', json['message']) def test_invalidadminpassword(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse(ConvertToOrganization, data={'adminUser': ADMIN_ACCESS_USER, 'adminPassword': 'invalidpass', 'plan': 'free'}, expected_code=400) self.assertEqual('The admin user credentials are not valid', json['message']) def test_convert(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse(ConvertToOrganization, data={'adminUser': ADMIN_ACCESS_USER, 'adminPassword': 'password', 'plan': 'free'}) self.assertEqual(True, json['success']) # Verify the organization exists. organization = model.get_organization(READ_ACCESS_USER) assert organization is not None # Verify the admin user is the org's admin. self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(Organization, params=dict(orgname=READ_ACCESS_USER)) self.assertEquals(READ_ACCESS_USER, json['name']) self.assertEquals(True, json['is_admin']) def test_convert_via_email(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse(ConvertToOrganization, data={'adminUser': ADMIN_ACCESS_EMAIL, 'adminPassword': 'password', 'plan': 'free'}) self.assertEqual(True, json['success']) # Verify the organization exists. organization = model.get_organization(READ_ACCESS_USER) assert organization is not None # Verify the admin user is the org's admin. self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(Organization, params=dict(orgname=READ_ACCESS_USER)) self.assertEquals(READ_ACCESS_USER, json['name']) self.assertEquals(True, json['is_admin']) class TestChangeUserDetails(ApiTestCase): def test_changepassword(self): self.login(READ_ACCESS_USER) self.putJsonResponse(User, data=dict(password='newpasswordiscool')) self.login(READ_ACCESS_USER, password='newpasswordiscool') def test_changeinvoiceemail(self): self.login(READ_ACCESS_USER) json = self.putJsonResponse(User, data=dict(invoice_email=True)) self.assertEquals(True, json['invoice_email']) json = self.putJsonResponse(User, data=dict(invoice_email=False)) self.assertEquals(False, json['invoice_email']) class TestCreateNewUser(ApiTestCase): def test_existingusername(self): json = self.postJsonResponse(User, data=dict(username=READ_ACCESS_USER, password='password', email='test@example.com'), expected_code=400) self.assertEquals('The username already exists', json['message']) def test_trycreatetooshort(self): json = self.postJsonResponse(User, data=dict(username='a', password='password', email='test@example.com'), expected_code=400) self.assertEquals('Invalid username a: Username must be between 4 and 30 characters in length', json['error_description']) def test_trycreateregexmismatch(self): json = self.postJsonResponse(User, data=dict(username='auserName', password='password', email='test@example.com'), expected_code=400) self.assertEquals('Invalid username auserName: Username must match expression [a-z0-9_]+', json['error_description']) def test_createuser(self): data = self.postResponse(User, data=NEW_USER_DETAILS, expected_code=201) self.assertEquals('"Created"', data) class TestSignout(ApiTestCase): def test_signout(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(User) assert json['username'] == READ_ACCESS_USER self.postResponse(Signout) # Make sure we're now signed out. self.getJsonResponse(User, expected_code=401) class TestGetMatchingEntities(ApiTestCase): def test_notinorg(self): self.login(NO_ACCESS_USER) json = self.getJsonResponse(EntitySearch, params=dict(prefix='o', namespace=ORGANIZATION, includeTeams='true')) names = set([r['name'] for r in json['results']]) assert 'outsideorg' in names assert not 'owners' in names def test_inorg(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(EntitySearch, params=dict(prefix='o', namespace=ORGANIZATION, includeTeams='true')) names = set([r['name'] for r in json['results']]) assert 'outsideorg' in names assert 'owners' in names class TestCreateOrganization(ApiTestCase): def test_existinguser(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(OrganizationList, data=dict(name=ADMIN_ACCESS_USER, email='testorg@example.com'), expected_code=400) self.assertEquals('A user or organization with this name already exists', json['message']) def test_existingorg(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(OrganizationList, data=dict(name=ORGANIZATION, email='testorg@example.com'), expected_code=400) self.assertEquals('A user or organization with this name already exists', json['message']) def test_createorg(self): self.login(ADMIN_ACCESS_USER) data = self.postResponse(OrganizationList, data=dict(name='neworg', email='testorg@example.com'), expected_code=201) self.assertEquals('"Created"', data) # Ensure the org was created. organization = model.get_organization('neworg') assert organization is not None # Verify the admin user is the org's admin. json = self.getJsonResponse(Organization, params=dict(orgname='neworg')) self.assertEquals('neworg', json['name']) self.assertEquals(True, json['is_admin']) class TestGetOrganization(ApiTestCase): def test_unknownorg(self): self.login(ADMIN_ACCESS_USER) self.getResponse(Organization, params=dict(orgname='notvalid'), expected_code=403) def test_cannotaccess(self): self.login(NO_ACCESS_USER) self.getResponse(Organization, params=dict(orgname=ORGANIZATION), expected_code=403) def test_getorganization(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(Organization, params=dict(orgname=ORGANIZATION)) self.assertEquals(ORGANIZATION, json['name']) self.assertEquals(False, json['is_admin']) def test_getorganization_asadmin(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(Organization, params=dict(orgname=ORGANIZATION)) self.assertEquals(ORGANIZATION, json['name']) self.assertEquals(True, json['is_admin']) class TestChangeOrganizationDetails(ApiTestCase): def test_changeinvoiceemail(self): self.login(ADMIN_ACCESS_USER) json = self.putJsonResponse(Organization, params=dict(orgname=ORGANIZATION), data=dict(invoice_email=True)) self.assertEquals(True, json['invoice_email']) json = self.putJsonResponse(Organization, params=dict(orgname=ORGANIZATION), data=dict(invoice_email=False)) self.assertEquals(False, json['invoice_email']) def test_changemail(self): self.login(ADMIN_ACCESS_USER) json = self.putJsonResponse(Organization, params=dict(orgname=ORGANIZATION), data=dict(email='newemail@example.com')) self.assertEquals('newemail@example.com', json['email']) class TestGetOrganizationPrototypes(ApiTestCase): def test_getprototypes(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION)) assert len(json['prototypes']) > 0 class TestCreateOrganizationPrototypes(ApiTestCase): def test_invaliduser(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION), data=dict(activating_user={'name': 'unknownuser'}, role='read', delegate={'kind': 'team', 'name': 'owners'}), expected_code=400) self.assertEquals('Unknown activating user', json['message']) def test_missingdelegate(self): self.login(ADMIN_ACCESS_USER) self.postJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION), data=dict(role='read'), expected_code=400) def test_createprototype(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION), data=dict(role='read', delegate={'kind': 'team', 'name': 'readers'})) self.assertEquals('read', json['role']) pid = json['id'] # Verify the prototype exists. json = self.getJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION)) ids = set([p['id'] for p in json['prototypes']]) assert pid in ids class TestDeleteOrganizationPrototypes(ApiTestCase): def test_deleteprototype(self): self.login(ADMIN_ACCESS_USER) # Get the existing prototypes json = self.getJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION)) ids = [p['id'] for p in json['prototypes']] pid = ids[0] # Delete a prototype. self.deleteResponse(PermissionPrototype, params=dict(orgname=ORGANIZATION, prototypeid=pid)) # Verify the prototype no longer exists. json = self.getJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION)) newids = [p['id'] for p in json['prototypes']] assert not pid in newids class TestUpdateOrganizationPrototypes(ApiTestCase): def test_updateprototype(self): self.login(ADMIN_ACCESS_USER) # Get the existing prototypes json = self.getJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION)) ids = [p['id'] for p in json['prototypes']] pid = ids[0] # Update a prototype. json = self.putJsonResponse(PermissionPrototype, params=dict(orgname=ORGANIZATION, prototypeid=pid), data=dict(role='admin')) self.assertEquals('admin', json['role']) class TestGetOrganiaztionMembers(ApiTestCase): def test_getmembers(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrgnaizationMemberList, params=dict(orgname=ORGANIZATION)) assert ADMIN_ACCESS_USER in json['members'] assert READ_ACCESS_USER in json['members'] assert not NO_ACCESS_USER in json['members'] def test_getspecificmember(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrganizationMember, params=dict(orgname=ORGANIZATION, membername=ADMIN_ACCESS_USER)) self.assertEquals(ADMIN_ACCESS_USER, json['member']['name']) self.assertEquals('user', json['member']['kind']) assert 'owners' in json['member']['teams'] class TestGetOrganizationPrivateAllowed(ApiTestCase): def test_existingorg(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrgPrivateRepositories, params=dict(orgname=ORGANIZATION)) self.assertEquals(True, json['privateAllowed']) assert not 'reposAllowed' in json def test_neworg(self): self.login(ADMIN_ACCESS_USER) data = self.postResponse(OrganizationList, data=dict(name='neworg', email='test@example.com'), expected_code=201) json = self.getJsonResponse(OrgPrivateRepositories, params=dict(orgname='neworg')) self.assertEquals(False, json['privateAllowed']) class TestUpdateOrganizationTeam(ApiTestCase): def test_updateexisting(self): self.login(ADMIN_ACCESS_USER) data = self.putJsonResponse(OrganizationTeam, params=dict(orgname=ORGANIZATION, teamname='readers'), data=dict(description='My cool team', role='creator')) self.assertEquals('My cool team', data['description']) self.assertEquals('creator', data['role']) def test_attemptchangeroleonowners(self): self.login(ADMIN_ACCESS_USER) self.putJsonResponse(OrganizationTeam, params=dict(orgname=ORGANIZATION, teamname='owners'), data=dict(role = 'creator'), expected_code=400) def test_createnewteam(self): self.login(ADMIN_ACCESS_USER) data = self.putJsonResponse(OrganizationTeam, params=dict(orgname=ORGANIZATION, teamname='newteam'), data=dict(description='My cool team', role='member')) self.assertEquals('My cool team', data['description']) self.assertEquals('member', data['role']) # Verify the team was created. json = self.getJsonResponse(Organization, params=dict(orgname=ORGANIZATION)) assert 'newteam' in json['teams'] class TestDeleteOrganizationTeam(ApiTestCase): def test_deleteteam(self): self.login(ADMIN_ACCESS_USER) self.deleteResponse(OrganizationTeam, params=dict(orgname=ORGANIZATION, teamname='readers')) # Make sure the team was deleted json = self.getJsonResponse(Organization, params=dict(orgname=ORGANIZATION)) assert not 'readers' in json['teams'] def test_attemptdeleteowners(self): self.login(ADMIN_ACCESS_USER) self.deleteResponse(OrganizationTeam, params=dict(orgname=ORGANIZATION, teamname='owners'), expected_code=400) class TestGetOrganizationTeamMembers(ApiTestCase): def test_invalidteam(self): self.login(ADMIN_ACCESS_USER) self.getResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='notvalid'), expected_code=404) def test_getmembers(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers')) assert READ_ACCESS_USER in json['members'] class TestUpdateOrganizationTeamMember(ApiTestCase): def test_addmember(self): self.login(ADMIN_ACCESS_USER) self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='readers', membername=NO_ACCESS_USER)) # Verify the user was added to the team. json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers')) assert NO_ACCESS_USER in json['members'] class TestDeleteOrganizationTeamMember(ApiTestCase): def test_deletemember(self): self.login(ADMIN_ACCESS_USER) self.deleteResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='readers', membername=READ_ACCESS_USER)) # Verify the user was removed from the team. json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers')) assert not READ_ACCESS_USER in json['members'] class TestCreateRepo(ApiTestCase): def test_duplicaterepo(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(RepositoryList, data=dict(repository='simple', visibility='public', description=''), expected_code=400) self.assertEquals('Repository already exists', json['message']) def test_createrepo(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(RepositoryList, data=dict(repository='newrepo', visibility='public', description=''), expected_code=201) self.assertEquals(ADMIN_ACCESS_USER, json['namespace']) self.assertEquals('newrepo', json['name']) def test_createrepo_underorg(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(RepositoryList, data=dict(namespace=ORGANIZATION, repository='newrepo', visibility='private', description=''), expected_code=201) self.assertEquals(ORGANIZATION, json['namespace']) self.assertEquals('newrepo', json['name']) class TestFindRepos(ApiTestCase): def test_findrepos_asguest(self): json = self.getJsonResponse(FindRepositories, params=dict(query='p')) self.assertEquals(len(json['repositories']), 1) self.assertEquals(json['repositories'][0]['namespace'], 'public') self.assertEquals(json['repositories'][0]['name'], 'publicrepo') def test_findrepos_asuser(self): self.login(NO_ACCESS_USER) json = self.getJsonResponse(FindRepositories, params=dict(query='p')) self.assertEquals(len(json['repositories']), 1) self.assertEquals(json['repositories'][0]['namespace'], 'public') self.assertEquals(json['repositories'][0]['name'], 'publicrepo') def test_findrepos_orgmember(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(FindRepositories, params=dict(query='p')) self.assertGreater(len(json['repositories']), 1) class TestListRepos(ApiTestCase): def test_listrepos_asguest(self): json = self.getJsonResponse(RepositoryList, params=dict(public=True)) self.assertEquals(len(json['repositories']), 1) def test_listrepos_orgmember(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(RepositoryList, params=dict(public=True)) self.assertGreater(len(json['repositories']), 1) def test_listrepos_filter(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(RepositoryList, params=dict(namespace=ORGANIZATION, public=False)) for repo in json['repositories']: self.assertEquals(ORGANIZATION, repo['namespace']) def test_listrepos_limit(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(RepositoryList, params=dict(limit=2)) self.assertEquals(len(json['repositories']), 2) class TestUpdateRepo(ApiTestCase): SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple' def test_updatedescription(self): self.login(ADMIN_ACCESS_USER) self.putJsonResponse(Repository, params=dict(repository=self.SIMPLE_REPO), data=dict(description='Some cool repo')) # Verify the repo description was updated. json = self.getJsonResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) self.assertEquals('Some cool repo', json['description']) class TestChangeRepoVisibility(ApiTestCase): SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple' def test_changevisibility(self): self.login(ADMIN_ACCESS_USER) # Make public. self.postJsonResponse(RepositoryVisibility, params=dict(repository=self.SIMPLE_REPO), data=dict(visibility='public')) # Verify the visibility. json = self.getJsonResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) self.assertEquals(True, json['is_public']) # Make private. self.postJsonResponse(RepositoryVisibility, params=dict(repository=self.SIMPLE_REPO), data=dict(visibility='private')) # Verify the visibility. json = self.getJsonResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) self.assertEquals(False, json['is_public']) class TestDeleteRepository(ApiTestCase): SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple' def test_deleterepo(self): self.login(ADMIN_ACCESS_USER) self.deleteResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) # Verify the repo was deleted. self.getResponse(Repository, params=dict(repository=self.SIMPLE_REPO), expected_code=404) class TestGetRepository(ApiTestCase): PUBLIC_REPO = PUBLIC_USER + '/publicrepo' def test_getrepo_badnames(self): self.login(ADMIN_ACCESS_USER) bad_names = ['logs', 'build', 'tokens', 'foo.bar', 'foo-bar', 'foo_bar'] # For each bad name, create the repo. for bad_name in bad_names: json = self.postJsonResponse(RepositoryList, expected_code=201, data=dict(repository=bad_name, visibility='public', description='')) # Make sure we can retrieve its information. json = self.getJsonResponse(Repository, params=dict(repository=ADMIN_ACCESS_USER + '/' + bad_name)) self.assertEquals(ADMIN_ACCESS_USER, json['namespace']) self.assertEquals(bad_name, json['name']) self.assertEquals(True, json['is_public']) def test_getrepo_public_asguest(self): json = self.getJsonResponse(Repository, params=dict(repository=self.PUBLIC_REPO)) self.assertEquals(PUBLIC_USER, json['namespace']) self.assertEquals('publicrepo', json['name']) self.assertEquals(True, json['is_public']) self.assertEquals(False, json['is_organization']) self.assertEquals(False, json['is_building']) self.assertEquals(False, json['can_write']) self.assertEquals(False, json['can_admin']) assert 'latest' in json['tags'] def test_getrepo_public_asowner(self): self.login(PUBLIC_USER) json = self.getJsonResponse(Repository, params=dict(repository=self.PUBLIC_REPO)) self.assertEquals(False, json['is_organization']) self.assertEquals(True, json['can_write']) self.assertEquals(True, json['can_admin']) def test_getrepo_building(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(Repository, params=dict(repository=ADMIN_ACCESS_USER + '/building')) self.assertEquals(True, json['can_write']) self.assertEquals(True, json['can_admin']) self.assertEquals(True, json['is_building']) self.assertEquals(False, json['is_organization']) def test_getrepo_org_asnonmember(self): self.getResponse(Repository, params=dict(repository=ORGANIZATION + '/' + ORG_REPO), expected_code=401) def test_getrepo_org_asreader(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(Repository, params=dict(repository=ORGANIZATION + '/' + ORG_REPO)) self.assertEquals(ORGANIZATION, json['namespace']) self.assertEquals(ORG_REPO, json['name']) self.assertEquals(False, json['can_write']) self.assertEquals(False, json['can_admin']) self.assertEquals(True, json['is_organization']) def test_getrepo_org_asadmin(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(Repository, params=dict(repository=ORGANIZATION + '/' + ORG_REPO)) self.assertEquals(True, json['can_write']) self.assertEquals(True, json['can_admin']) self.assertEquals(True, json['is_organization']) class TestRepoBuilds(ApiTestCase): def test_getrepo_nobuilds(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['builds']) == 0 def test_getrepobuilds(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/building')) assert len(json['builds']) > 0 build = json['builds'][-1] assert 'id' in build assert 'status' in build # Check the status endpoint. status_json = self.getJsonResponse(RepositoryBuildStatus, params=dict(repository=ADMIN_ACCESS_USER + '/building', build_uuid=build['id'])) self.assertEquals(status_json, build) class TestRequestRepoBuild(ApiTestCase): def test_requestrepobuild(self): self.login(ADMIN_ACCESS_USER) # Ensure we are not yet building. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['builds']) == 0 # Request a (fake) build. self.postResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz'), expected_code=201) # Check for the build. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/building')) assert len(json['builds']) > 0 def test_requestrepobuild_with_robot(self): self.login(ADMIN_ACCESS_USER) # Ensure we are not yet building. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['builds']) == 0 # Request a (fake) build. pull_robot = ADMIN_ACCESS_USER + '+dtrobot' self.postResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz', pull_robot=pull_robot), expected_code=201) # Check for the build. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/building')) assert len(json['builds']) > 0 def test_requestrepobuild_with_invalid_robot(self): self.login(ADMIN_ACCESS_USER) # Request a (fake) build. pull_robot = ADMIN_ACCESS_USER + '+invalidrobot' self.postResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz', pull_robot=pull_robot), expected_code=404) def test_requestrepobuild_with_unauthorized_robot(self): self.login(ADMIN_ACCESS_USER) # Request a (fake) build. pull_robot = 'freshuser+anotherrobot' self.postResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz', pull_robot=pull_robot), expected_code=403) class TestWebhooks(ApiTestCase): def test_webhooks(self): self.login(ADMIN_ACCESS_USER) # Add a webhook. json = self.postJsonResponse(WebhookList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(url='http://example.com'), expected_code=201) self.assertEquals('http://example.com', json['parameters']['url']) wid = json['public_id'] # Get the webhook. json = self.getJsonResponse(Webhook, params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid)) self.assertEquals(wid, json['public_id']) self.assertEquals('http://example.com', json['parameters']['url']) # Verify the webhook is listed. json = self.getJsonResponse(WebhookList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) ids = [w['public_id'] for w in json['webhooks']] assert wid in ids # Delete the webhook. self.deleteResponse(Webhook, params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid), expected_code=204) # Verify the webhook is gone. self.getResponse(Webhook, params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid), expected_code=404) class TestListAndGetImage(ApiTestCase): def test_listandgetimages(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(RepositoryImageList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['images']) > 0 for image in json['images']: assert 'id' in image assert 'tags' in image assert 'created' in image assert 'comment' in image assert 'command' in image assert 'ancestors' in image assert 'dbid' in image assert 'size' in image ijson = self.getJsonResponse(RepositoryImage, params=dict(repository=ADMIN_ACCESS_USER + '/simple', image_id=image['id'])) self.assertEquals(image['id'], ijson['id']) class TestGetImageChanges(ApiTestCase): def test_getimagechanges(self): self.login(ADMIN_ACCESS_USER) # Find an image to check. json = self.getJsonResponse(RepositoryImageList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) image_id = json['images'][0]['id'] # Lookup the image's changes. # TODO: Fix me once we can get fake changes into the test data #self.getJsonResponse(RepositoryImageChanges, # params=dict(repository=ADMIN_ACCESS_USER + '/simple', # image_id=image_id)) class TestListAndDeleteTag(ApiTestCase): def test_listdeletecreateandmovetag(self): self.login(ADMIN_ACCESS_USER) # List the images for prod. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) prod_images = json['images'] assert len(prod_images) > 0 # List the images for staging. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging')) staging_images = json['images'] assert len(prod_images) == len(staging_images) + 1 # Delete prod. self.deleteResponse(RepositoryTag, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'), expected_code=204) # Make sure the tag is gone. self.getResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'), expected_code=404) # Make the sure the staging images are still there. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging')) self.assertEquals(staging_images, json['images']) # Add a new tag to the staging image. self.putResponse(RepositoryTag, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'), data=dict(image=staging_images[0]['id']), expected_code=201) # Make sure the tag is present. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag')) sometag_images = json['images'] self.assertEquals(sometag_images, staging_images) # Move the tag. self.putResponse(RepositoryTag, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'), data=dict(image=staging_images[-1]['id']), expected_code=201) # Make sure the tag has moved. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag')) sometag_new_images = json['images'] self.assertEquals(1, len(sometag_new_images)) self.assertEquals(staging_images[-1], sometag_new_images[0]) def test_deletesubtag(self): self.login(ADMIN_ACCESS_USER) # List the images for prod. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) prod_images = json['images'] assert len(prod_images) > 0 # Delete staging. self.deleteResponse(RepositoryTag, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'), expected_code=204) # Make sure the prod images are still around. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) self.assertEquals(prod_images, json['images']) class TestRepoPermissions(ApiTestCase): def listUserPermissions(self): return self.getJsonResponse(RepositoryUserPermissionList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'))['permissions'] def listTeamPermissions(self): return self.getJsonResponse(RepositoryTeamPermissionList, params=dict(repository=ORGANIZATION + '/' + ORG_REPO))['permissions'] def test_userpermissions(self): self.login(ADMIN_ACCESS_USER) # The repo should start with just the admin as a user perm. permissions = self.listUserPermissions() self.assertEquals(1, len(permissions)) assert ADMIN_ACCESS_USER in permissions self.assertEquals('admin', permissions[ADMIN_ACCESS_USER]['role']) # Add another user. self.putJsonResponse(RepositoryUserPermission, params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER), data=dict(role='read')) # Verify the user is present. permissions = self.listUserPermissions() self.assertEquals(2, len(permissions)) assert NO_ACCESS_USER in permissions self.assertEquals('read', permissions[NO_ACCESS_USER]['role']) json = self.getJsonResponse(RepositoryUserPermission, params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER)) self.assertEquals('read', json['role']) # Change the user's permissions. self.putJsonResponse(RepositoryUserPermission, params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER), data=dict(role='admin')) # Verify. permissions = self.listUserPermissions() self.assertEquals(2, len(permissions)) assert NO_ACCESS_USER in permissions self.assertEquals('admin', permissions[NO_ACCESS_USER]['role']) # Delete the user's permission. self.deleteResponse(RepositoryUserPermission, params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER)) # Verify. permissions = self.listUserPermissions() self.assertEquals(1, len(permissions)) assert not NO_ACCESS_USER in permissions def test_teampermissions(self): self.login(ADMIN_ACCESS_USER) # The repo should start with just the readers as a team perm. permissions = self.listTeamPermissions() self.assertEquals(1, len(permissions)) assert 'readers' in permissions self.assertEquals('read', permissions['readers']['role']) # Add another team. self.putJsonResponse(RepositoryTeamPermission, params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'), data=dict(role='write')) # Verify the team is present. permissions = self.listTeamPermissions() self.assertEquals(2, len(permissions)) assert 'owners' in permissions self.assertEquals('write', permissions['owners']['role']) json = self.getJsonResponse(RepositoryTeamPermission, params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners')) self.assertEquals('write', json['role']) # Change the team's permissions. self.putJsonResponse(RepositoryTeamPermission, params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'), data=dict(role='admin')) # Verify. permissions = self.listTeamPermissions() self.assertEquals(2, len(permissions)) assert 'owners' in permissions self.assertEquals('admin', permissions['owners']['role']) # Delete the team's permission. self.deleteResponse(RepositoryTeamPermission, params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners')) # Verify. permissions = self.listTeamPermissions() self.assertEquals(1, len(permissions)) assert not 'owners' in permissions class TestApiTokens(ApiTestCase): def listTokens(self): return self.getJsonResponse(RepositoryTokenList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'))['tokens'] def test_tokens(self): self.login(ADMIN_ACCESS_USER) # Create a new token. json = self.postJsonResponse(RepositoryTokenList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(role='read', friendlyName='mytoken'), expected_code=201) self.assertEquals('mytoken', json['friendlyName']) self.assertEquals('read', json['role']) token_code = json['code'] # Verify. tokens = self.listTokens() assert token_code in tokens self.assertEquals('mytoken', tokens[token_code]['friendlyName']) json = self.getJsonResponse(RepositoryToken, params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) self.assertEquals(tokens[token_code], json) # Change the token's permission. self.putJsonResponse(RepositoryToken, params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code), data=dict(role='write')) # Verify. json = self.getJsonResponse(RepositoryToken, params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) self.assertEquals('write', json['role']) # Delete the token. self.deleteResponse(RepositoryToken, params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) # Verify. self.getResponse(RepositoryToken, params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code), expected_code=404) class TestUserCard(ApiTestCase): def test_getusercard(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(UserCard) self.assertEquals('4242', json['card']['last4']) self.assertEquals('Visa', json['card']['type']) def test_setusercard_error(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(UserCard, data=dict(token='sometoken'), expected_code=402) assert 'carderror' in json class TestOrgCard(ApiTestCase): def test_getorgcard(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrganizationCard, params=dict(orgname=ORGANIZATION)) self.assertEquals('4242', json['card']['last4']) self.assertEquals('Visa', json['card']['type']) class TestUserSubscription(ApiTestCase): def getSubscription(self): return self.getJsonResponse(UserPlan) def test_updateplan(self): self.login(ADMIN_ACCESS_USER) # Change the plan. self.putJsonResponse(UserPlan, data=dict(plan='free')) # Verify sub = self.getSubscription() self.assertEquals('free', sub['plan']) # Change the plan. self.putJsonResponse(UserPlan, data=dict(plan='bus-large')) # Verify sub = self.getSubscription() self.assertEquals('bus-large', sub['plan']) class TestOrgSubscription(ApiTestCase): def getSubscription(self): return self.getJsonResponse(OrganizationPlan, params=dict(orgname=ORGANIZATION)) def test_updateplan(self): self.login(ADMIN_ACCESS_USER) # Change the plan. self.putJsonResponse(OrganizationPlan, params=dict(orgname=ORGANIZATION), data=dict(plan='free')) # Verify sub = self.getSubscription() self.assertEquals('free', sub['plan']) # Change the plan. self.putJsonResponse(OrganizationPlan, params=dict(orgname=ORGANIZATION), data=dict(plan='bus-large')) # Verify sub = self.getSubscription() self.assertEquals('bus-large', sub['plan']) class TestUserRobots(ApiTestCase): def getRobotNames(self): return [r['name'] for r in self.getJsonResponse(UserRobotList)['robots']] def test_robots(self): self.login(NO_ACCESS_USER) # Create a robot. json = self.putJsonResponse(UserRobot, params=dict(robot_shortname='bender'), expected_code=201) self.assertEquals(NO_ACCESS_USER + '+bender', json['name']) # Verify. robots = self.getRobotNames() assert NO_ACCESS_USER + '+bender' in robots # Delete the robot. self.deleteResponse(UserRobot, params=dict(robot_shortname='bender')) # Verify. robots = self.getRobotNames() assert not NO_ACCESS_USER + '+bender' in robots class TestOrgRobots(ApiTestCase): def getRobotNames(self): return [r['name'] for r in self.getJsonResponse(OrgRobotList, params=dict(orgname=ORGANIZATION))['robots']] def test_robots(self): self.login(ADMIN_ACCESS_USER) # Create a robot. json = self.putJsonResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender'), expected_code=201) self.assertEquals(ORGANIZATION + '+bender', json['name']) # Verify. robots = self.getRobotNames() assert ORGANIZATION + '+bender' in robots # Delete the robot. self.deleteResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender')) # Verify. robots = self.getRobotNames() assert not ORGANIZATION + '+bender' in robots class TestLogs(ApiTestCase): def test_user_logs(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(UserLogs) assert 'logs' in json assert 'start_time' in json assert 'end_time' in json def test_org_logs(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrgLogs, params=dict(orgname=ORGANIZATION)) assert 'logs' in json assert 'start_time' in json assert 'end_time' in json def test_performer(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrgLogs, params=dict(orgname=ORGANIZATION)) all_logs = json['logs'] json = self.getJsonResponse(OrgLogs, params=dict(performer=READ_ACCESS_USER, orgname=ORGANIZATION)) assert len(json['logs']) < len(all_logs) for log in json['logs']: self.assertEquals(READ_ACCESS_USER, log['performer']['name']) class TestApplicationInformation(ApiTestCase): def test_get_info(self): json = self.getJsonResponse(ApplicationInformation, params=dict(client_id=FAKE_APPLICATION_CLIENT_ID)) assert 'name' in json assert 'uri' in json assert 'organization' in json def test_get_invalid_info(self): self.getJsonResponse(ApplicationInformation, params=dict(client_id='invalid-code'), expected_code=404) class TestOrganizationApplications(ApiTestCase): def test_list_create_applications(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION)) self.assertEquals(2, len(json['applications'])) self.assertEquals(FAKE_APPLICATION_CLIENT_ID, json['applications'][0]['client_id']) # Add a new application. json = self.postJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION), data=dict(name="Some cool app", description="foo")) self.assertEquals("Some cool app", json['name']) self.assertEquals("foo", json['description']) # Retrieve the apps list again list_json = self.getJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION)) self.assertEquals(3, len(list_json['applications'])) self.assertEquals(json, list_json['applications'][2]) class TestOrganizationApplicationResource(ApiTestCase): def test_get_edit_delete_application(self): self.login(ADMIN_ACCESS_USER) # Retrieve the application. json = self.getJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) self.assertEquals(FAKE_APPLICATION_CLIENT_ID, json['client_id']) # Edit the application. edit_json = self.putJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID), data=dict(name="Some App", description="foo", application_uri="bar", redirect_uri="baz", gravatar_email="meh")) self.assertEquals(FAKE_APPLICATION_CLIENT_ID, edit_json['client_id']) self.assertEquals("Some App", edit_json['name']) self.assertEquals("foo", edit_json['description']) self.assertEquals("bar", edit_json['application_uri']) self.assertEquals("baz", edit_json['redirect_uri']) self.assertEquals("meh", edit_json['gravatar_email']) # Retrieve the application again. json = self.getJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) self.assertEquals(json, edit_json) # Delete the application. self.deleteResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) # Make sure the application is gone. self.getJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID), expected_code=404) class TestOrganizationApplicationResetClientSecret(ApiTestCase): def test_reset_client_secret(self): self.login(ADMIN_ACCESS_USER) # Retrieve the application. json = self.getJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) self.assertEquals(FAKE_APPLICATION_CLIENT_ID, json['client_id']) # Reset the client secret. reset_json = self.postJsonResponse(OrganizationApplicationResetClientSecret, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) self.assertEquals(FAKE_APPLICATION_CLIENT_ID, reset_json['client_id']) self.assertNotEquals(reset_json['client_secret'], json['client_secret']) # Verify it was changed in the DB. json = self.getJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) self.assertEquals(reset_json['client_secret'], json['client_secret']) class FakeBuildTrigger(BuildTriggerBase): @classmethod def service_name(cls): return 'fakeservice' def list_build_sources(self, auth_token): return [{'first': 'source'}, {'second': auth_token}] def list_build_subdirs(self, auth_token, config): return [auth_token, 'foo', 'bar', config['somevalue']] def handle_trigger_request(self, request, auth_token, config): return ('foo', ['bar'], 'build-name', 'subdir') def is_active(self, config): return 'active' in config and config['active'] def activate(self, trigger_uuid, standard_webhook_url, auth_token, config): config['active'] = True return config def deactivate(self, auth_token, config): config['active'] = False return config def manual_start(self, auth_token, config): return ('foo', ['bar'], 'build-name', 'subdir') def dockerfile_url(self, auth_token, config): return 'http://some/url' def load_dockerfile_contents(self, auth_token, config): if not 'dockerfile' in config: return None return config['dockerfile'] class TestBuildTriggers(ApiTestCase): def test_list_build_triggers(self): self.login(ADMIN_ACCESS_USER) # Check a repo with no known triggers. json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) self.assertEquals(0, len(json['triggers'])) # Check a repo with one known trigger. json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building')) self.assertEquals(1, len(json['triggers'])) trigger = json['triggers'][0] assert 'id' in trigger assert 'is_active' in trigger assert 'config' in trigger assert 'service' in trigger # Verify the get trigger method. trigger_json = self.getJsonResponse(BuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/building', trigger_uuid=trigger['id'])) self.assertEquals(trigger, trigger_json) # Check the recent builds for the trigger. builds_json = self.getJsonResponse(TriggerBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/building', trigger_uuid=trigger['id'])) assert 'builds' in builds_json def test_delete_build_trigger(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building')) self.assertEquals(1, len(json['triggers'])) trigger = json['triggers'][0] # Delete the trigger. self.deleteResponse(BuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/building', trigger_uuid=trigger['id'])) # Verify it was deleted. json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building')) self.assertEquals(0, len(json['triggers'])) def test_analyze_fake_trigger(self): self.login(ADMIN_ACCESS_USER) database.BuildTriggerService.create(name='fakeservice') # Add a new fake trigger. repo = model.get_repository(ADMIN_ACCESS_USER, 'simple') user = model.get_user(ADMIN_ACCESS_USER) trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user) # Analyze the trigger's dockerfile: First, no dockerfile. trigger_config = {} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('error', analyze_json['status']) self.assertEquals('Could not read the Dockerfile for the trigger', analyze_json['message']) # Analyze the trigger's dockerfile: Second, missing FROM in dockerfile. trigger_config = {'dockerfile': 'MAINTAINER me'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('warning', analyze_json['status']) self.assertEquals('No FROM line found in the Dockerfile', analyze_json['message']) # Analyze the trigger's dockerfile: Third, dockerfile with public repo. trigger_config = {'dockerfile': 'FROM somerepo'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('publicbase', analyze_json['status']) # Analyze the trigger's dockerfile: Fourth, dockerfile with private repo with an invalid path. trigger_config = {'dockerfile': 'FROM localhost:5000/somepath'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('warning', analyze_json['status']) self.assertEquals('"localhost:5000/somepath" is not a valid Quay repository path', analyze_json['message']) # Analyze the trigger's dockerfile: Fifth, dockerfile with private repo that does not exist. trigger_config = {'dockerfile': 'FROM localhost:5000/nothere/randomrepo'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('error', analyze_json['status']) self.assertEquals('Repository "localhost:5000/nothere/randomrepo" was not found', analyze_json['message']) # Analyze the trigger's dockerfile: Sixth, dockerfile with private repo that the user cannot see. trigger_config = {'dockerfile': 'FROM localhost:5000/randomuser/randomrepo'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('error', analyze_json['status']) self.assertEquals('Repository "localhost:5000/randomuser/randomrepo" was not found', analyze_json['message']) # Analyze the trigger's dockerfile: Seventh, dockerfile with private repo that the user see. trigger_config = {'dockerfile': 'FROM localhost:5000/devtable/complex'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('analyzed', analyze_json['status']) self.assertEquals('devtable', analyze_json['namespace']) self.assertEquals('complex', analyze_json['name']) self.assertEquals(False, analyze_json['is_public']) self.assertEquals(ADMIN_ACCESS_USER + '+dtrobot', analyze_json['robots'][0]['name']) def test_fake_trigger(self): self.login(ADMIN_ACCESS_USER) database.BuildTriggerService.create(name='fakeservice') # Add a new fake trigger. repo = model.get_repository(ADMIN_ACCESS_USER, 'simple') user = model.get_user(ADMIN_ACCESS_USER) trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user) # Verify the trigger. json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) self.assertEquals(1, len(json['triggers'])) self.assertEquals(trigger.uuid, json['triggers'][0]['id']) self.assertEquals(trigger.service.name, json['triggers'][0]['service']) self.assertEquals(False, json['triggers'][0]['is_active']) # List the trigger's sources. source_json = self.getJsonResponse(BuildTriggerSources, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid)) self.assertEquals([{'first': 'source'}, {'second': 'sometoken'}], source_json['sources']) # List the trigger's subdirs. subdir_json = self.postJsonResponse(BuildTriggerSubdirs, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'somevalue': 'meh'}) self.assertEquals({'status': 'success', 'subdir': ['sometoken', 'foo', 'bar', 'meh']}, subdir_json) # Activate the trigger. trigger_config = {} activate_json = self.postJsonResponse(BuildTriggerActivate, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals(True, activate_json['is_active']) # Make sure the trigger has a write token. trigger = model.get_build_trigger(ADMIN_ACCESS_USER, 'simple', trigger.uuid) self.assertNotEquals(None, trigger.write_token) self.assertEquals(True, py_json.loads(trigger.config)['active']) # Make sure we cannot activate again. self.postResponse(BuildTriggerActivate, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}, expected_code=400) # Start a manual build. start_json = self.postJsonResponse(ActivateBuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), expected_code=201) assert 'id' in start_json self.assertEquals("build-name", start_json['display_name']) self.assertEquals(['bar'], start_json['job_config']['docker_tags']) def test_invalid_robot_account(self): self.login(ADMIN_ACCESS_USER) database.BuildTriggerService.create(name='fakeservice') # Add a new fake trigger. repo = model.get_repository(ADMIN_ACCESS_USER, 'simple') user = model.get_user(ADMIN_ACCESS_USER) trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user) # Try to activate it with an invalid robot account. trigger_config = {} activate_json = self.postJsonResponse(BuildTriggerActivate, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config, 'pull_robot': 'someinvalidrobot'}, expected_code=404) def test_unauthorized_robot_account(self): self.login(ADMIN_ACCESS_USER) database.BuildTriggerService.create(name='fakeservice') # Add a new fake trigger. repo = model.get_repository(ADMIN_ACCESS_USER, 'simple') user = model.get_user(ADMIN_ACCESS_USER) trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user) # Try to activate it with a robot account in the wrong namespace. trigger_config = {} activate_json = self.postJsonResponse(BuildTriggerActivate, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config, 'pull_robot': 'freshuser+anotherrobot'}, expected_code=403) def test_robot_account(self): self.login(ADMIN_ACCESS_USER) database.BuildTriggerService.create(name='fakeservice') # Add a new fake trigger. repo = model.get_repository(ADMIN_ACCESS_USER, 'simple') user = model.get_user(ADMIN_ACCESS_USER) trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user) # Try to activate it with a robot account. trigger_config = {} activate_json = self.postJsonResponse(BuildTriggerActivate, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config, 'pull_robot': ADMIN_ACCESS_USER + '+dtrobot'}) # Verify that the robot was saved. self.assertEquals(True, activate_json['is_active']) self.assertEquals(ADMIN_ACCESS_USER + '+dtrobot', activate_json['pull_robot']['name']) # Start a manual build. start_json = self.postJsonResponse(ActivateBuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), expected_code=201) assert 'id' in start_json self.assertEquals("build-name", start_json['display_name']) self.assertEquals(['bar'], start_json['job_config']['docker_tags']) class TestUserAuthorizations(ApiTestCase): def test_list_get_delete_user_authorizations(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(UserAuthorizationList) self.assertEquals(1, len(json['authorizations'])) authorization = json['authorizations'][0] assert 'uuid' in authorization assert 'scopes' in authorization assert 'application' in authorization # Retrieve the authorization. get_json = self.getJsonResponse(UserAuthorization, params=dict(access_token_uuid = authorization['uuid'])) self.assertEquals(authorization, get_json) # Delete the authorization. self.deleteResponse(UserAuthorization, params=dict(access_token_uuid = authorization['uuid'])) # Verify it has been deleted. self.getJsonResponse(UserAuthorization, params=dict(access_token_uuid = authorization['uuid']), expected_code=404) class TestSuperUserLogs(ApiTestCase): def test_get_logs(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(SuperUserLogs) assert 'logs' in json assert len(json['logs']) > 0 class TestSuperUserList(ApiTestCase): def test_get_users(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(SuperUserList) assert 'users' in json assert len(json['users']) > 0 class TestSuperUserManagement(ApiTestCase): def test_get_user(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(SuperUserManagement, params=dict(username = 'freshuser')) self.assertEquals('freshuser', json['username']) self.assertEquals('no@thanks.com', json['email']) self.assertEquals(False, json['super_user']) def test_delete_user(self): self.login(ADMIN_ACCESS_USER) # Verify the user exists. json = self.getJsonResponse(SuperUserManagement, params=dict(username = 'freshuser')) self.assertEquals('freshuser', json['username']) # Delete the user. self.deleteResponse(SuperUserManagement, params=dict(username = 'freshuser'), expected_code=204) # Verify the user no longer exists. self.getResponse(SuperUserManagement, params=dict(username = 'freshuser'), expected_code=404) def test_update_user(self): self.login(ADMIN_ACCESS_USER) # Verify the user exists. json = self.getJsonResponse(SuperUserManagement, params=dict(username = 'freshuser')) self.assertEquals('freshuser', json['username']) self.assertEquals('no@thanks.com', json['email']) # Update the user. self.putJsonResponse(SuperUserManagement, params=dict(username='freshuser'), data=dict(email='foo@bar.com')) # Verify the user was updated. json = self.getJsonResponse(SuperUserManagement, params=dict(username = 'freshuser')) self.assertEquals('freshuser', json['username']) self.assertEquals('foo@bar.com', json['email']) if __name__ == '__main__': unittest.main()