From 900ccd4c4772f547638e1e5ac0814a43723e7aaa Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 31 Jan 2014 16:19:29 -0500 Subject: [PATCH 1/6] Start on unit tests for the API endpoint --- endpoints/api.py | 2 + test/test_api_usage.py | 85 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) create mode 100644 test/test_api_usage.py diff --git a/endpoints/api.py b/endpoints/api.py index 2b31e4700..b4cec5ca4 100644 --- a/endpoints/api.py +++ b/endpoints/api.py @@ -314,6 +314,8 @@ def create_new_user(): @internal_api_call def signin_user(): signin_data = request.get_json() + if not signin_data: + abort(404) username = signin_data['username'] password = signin_data['password'] diff --git a/test/test_api_usage.py b/test/test_api_usage.py new file mode 100644 index 000000000..23c1c1d33 --- /dev/null +++ b/test/test_api_usage.py @@ -0,0 +1,85 @@ +import unittest +import json + +from flask import url_for +from endpoints.api import api +from app import app +from initdb import setup_database_for_testing, finished_database_for_testing +from specs import build_specs + +app.register_blueprint(api, url_prefix='/api') + +NO_ACCESS_USER = 'freshuser' +READ_ACCESS_USER = 'reader' +ADMIN_ACCESS_USER = 'devtable' +ORGANIZATION = 'buynlarge' + +class ApiTestCase(unittest.TestCase): + def setUp(self): + setup_database_for_testing(self) + self.app = app.test_client() + self.ctx = app.test_request_context() + self.ctx.__enter__() + + def tearDown(self): + finished_database_for_testing(self) + self.ctx.__exit__(True, None, None) + + def getJsonResponse(self, method_name): + rv = self.app.get(url_for(method_name)) + assert rv.status_code == 200 + data = rv.data + parsed = json.loads(data) + return parsed + + def login(self, username): + self.app.post(url_for('api.signin_user'), + data=json.dumps(dict(username=username, password='password')), + headers={"Content-Type": "application/json"}) + +class TestDiscovery(ApiTestCase): + def test_discovery(self): + """ Basic sanity check that discovery returns valid JSON in the expected format. """ + json = self.getJsonResponse('api.discovery') + found = set([]) + for method_info in json['endpoints']: + found.add(method_info['name']) + + assert 'discovery' in found + +class TestPlans(ApiTestCase): + def test_plans(self): + """ Basic sanity check that the plans are returned in the expected format. """ + json = self.getJsonResponse('api.list_plans') + found = set([]) + for method_info in json['plans']: + found.add(method_info['stripeId']) + + assert 'free' in found + +class TestLoggedInUser(ApiTestCase): + def test_guest(self): + json = self.getJsonResponse('api.get_logged_in_user') + assert json['anonymous'] == True + + def test_user(self): + self.login(READ_ACCESS_USER) + json = self.getJsonResponse('api.get_logged_in_user') + assert json['anonymous'] == False + assert json['username'] == READ_ACCESS_USER + +class TestGetUserPrivateCount(ApiTestCase): + def test_nonallowed(self): + self.login(READ_ACCESS_USER) + json = self.getJsonResponse('api.get_user_private_count') + assert json['privateCount'] == 0 + assert json['reposAllowed'] == 0 + + def test_allowed(self): + self.login(ADMIN_ACCESS_USER) + json = self.getJsonResponse('api.get_user_private_count') + assert json['privateCount'] == 6 + assert json['reposAllowed'] == 125 + +if __name__ == '__main__': + unittest.main() From 05b33dced44f0e82a6c4d567edb4aa48e8184aa2 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 31 Jan 2014 17:54:01 -0500 Subject: [PATCH 2/6] Continue on API unit tests --- endpoints/api.py | 21 ++-- test/test_api_usage.py | 241 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 248 insertions(+), 14 deletions(-) diff --git a/endpoints/api.py b/endpoints/api.py index b4cec5ca4..a1b3ba51a 100644 --- a/endpoints/api.py +++ b/endpoints/api.py @@ -261,7 +261,6 @@ def convert_user_to_organization(): @internal_api_call def change_user_details(): user = current_user.db_user() - user_data = request.get_json() try: @@ -422,6 +421,7 @@ def get_matching_entities(prefix): team_data = [entity_team_view(team) for team in teams] user_data = [user_view(user) for user in users] + return jsonify({ 'results': team_data + user_data }) @@ -447,11 +447,16 @@ def create_organization(): existing = None try: - existing = (model.get_organization(org_data['name']) or - model.get_user(org_data['name'])) + existing = model.get_organization(org_data['name']) except model.InvalidOrganizationException: pass + if not existing: + try: + existing = model.get_user(org_data['name']) + except model.InvalidUserException: + pass + if existing: msg = 'A user or organization with this name already exists' return request_error(message=msg) @@ -550,7 +555,7 @@ def prototype_view(proto, org_members): 'id': proto.uuid, } -@api.route('/api/organization//prototypes', methods=['GET']) +@api.route('/organization//prototypes', methods=['GET']) @api_login_required def get_organization_prototype_permissions(orgname): permission = AdministerOrganizationPermission(orgname) @@ -588,7 +593,7 @@ def log_prototype_action(action_kind, orgname, prototype, **kwargs): log_action(action_kind, orgname, log_params) -@api.route('/api/organization//prototypes', methods=['POST']) +@api.route('/organization//prototypes', methods=['POST']) @api_login_required def create_organization_prototype_permission(orgname): permission = AdministerOrganizationPermission(orgname) @@ -636,7 +641,7 @@ def create_organization_prototype_permission(orgname): abort(403) -@api.route('/api/organization//prototypes/', +@api.route('/organization//prototypes/', methods=['DELETE']) @api_login_required def delete_organization_prototype_permission(orgname, prototypeid): @@ -658,7 +663,7 @@ def delete_organization_prototype_permission(orgname, prototypeid): abort(403) -@api.route('/api/organization//prototypes/', +@api.route('/organization//prototypes/', methods=['PUT']) @api_login_required def update_organization_prototype_permission(orgname, prototypeid): @@ -1363,7 +1368,7 @@ def get_image_changes(namespace, repository, image_id): abort(403) -@api.route('/api/repository//tag/', +@api.route('/repository//tag/', methods=['DELETE']) @parse_repository_name def delete_full_tag(namespace, repository, tag): diff --git a/test/test_api_usage.py b/test/test_api_usage.py index 23c1c1d33..67d8deb1b 100644 --- a/test/test_api_usage.py +++ b/test/test_api_usage.py @@ -6,6 +6,7 @@ from endpoints.api import api from app import app from initdb import setup_database_for_testing, finished_database_for_testing from specs import build_specs +from data import model app.register_blueprint(api, url_prefix='/api') @@ -14,6 +15,12 @@ READ_ACCESS_USER = 'reader' ADMIN_ACCESS_USER = 'devtable' ORGANIZATION = 'buynlarge' +NEW_USER_DETAILS = { + 'username': 'bobby', + 'password': 'password', + 'email': 'bobby@tables.com', +} + class ApiTestCase(unittest.TestCase): def setUp(self): setup_database_for_testing(self) @@ -25,17 +32,50 @@ class ApiTestCase(unittest.TestCase): finished_database_for_testing(self) self.ctx.__exit__(True, None, None) - def getJsonResponse(self, method_name): - rv = self.app.get(url_for(method_name)) - assert rv.status_code == 200 + def getJsonResponse(self, method_name, params={}): + rv = self.app.get(url_for(method_name, **params)) + self.assertEquals(200, rv.status_code) data = rv.data parsed = json.loads(data) return parsed - def login(self, username): - self.app.post(url_for('api.signin_user'), - data=json.dumps(dict(username=username, password='password')), + def postResponse(self, method_name, params={}, data={}, expected_code=200): + rv = self.app.post(url_for(method_name, **params), data=json.dumps(data), headers={"Content-Type": "application/json"}) + self.assertEquals(rv.status_code, expected_code) + return rv.data + + def getResponse(self, method_name, params={}, expected_code=200): + rv = self.app.get(url_for(method_name, **params)) + self.assertEquals(rv.status_code, expected_code) + return rv.data + + def postJsonResponse(self, method_name, params={}, data={}, expected_code=200): + rv = self.app.post(url_for(method_name, **params), data=json.dumps(data), + headers={"Content-Type": "application/json"}) + + if rv.status_code != expected_code: + print 'Mismatch data for method %s: %s' % (method_name, rv.data) + + self.assertEquals(rv.status_code, expected_code) + data = rv.data + parsed = json.loads(data) + return parsed + + def putJsonResponse(self, method_name, params={}, data={}, expected_code=200): + rv = self.app.put(url_for(method_name, **params), data=json.dumps(data), + headers={"Content-Type": "application/json"}) + + if rv.status_code != expected_code: + print 'Mismatch data for method %s: %s' % (method_name, rv.data) + + self.assertEquals(rv.status_code, expected_code) + data = rv.data + parsed = json.loads(data) + return parsed + + def login(self, username, password='password'): + return self.postJsonResponse('api.signin_user', data=dict(username=username, password=password)) class TestDiscovery(ApiTestCase): def test_discovery(self): @@ -81,5 +121,194 @@ class TestGetUserPrivateCount(ApiTestCase): assert json['privateCount'] == 6 assert json['reposAllowed'] == 125 +class TestConvertToOrganization(ApiTestCase): + def test_sameadminuser(self): + self.login(READ_ACCESS_USER) + json = self.postJsonResponse('api.convert_user_to_organization', + data={'adminUser': READ_ACCESS_USER, 'adminPassword': 'password'}, + expected_code=400) + + self.assertEqual('The admin user is not valid', json['message']) + + def test_invalidadminuser(self): + self.login(READ_ACCESS_USER) + json = self.postJsonResponse('api.convert_user_to_organization', + data={'adminUser': 'unknownuser', 'adminPassword': 'password'}, + expected_code=400) + + self.assertEqual('The admin user credentials are not valid', json['message']) + + def test_invalidadminpassword(self): + self.login(READ_ACCESS_USER) + json = self.postJsonResponse('api.convert_user_to_organization', + data={'adminUser': ADMIN_ACCESS_USER, 'adminPassword': 'invalidpass'}, + expected_code=400) + + self.assertEqual('The admin user credentials are not valid', json['message']) + + def test_convert(self): + self.login(READ_ACCESS_USER) + json = self.postJsonResponse('api.convert_user_to_organization', + data={'adminUser': ADMIN_ACCESS_USER, + 'adminPassword': 'password', + 'plan': 'free'}) + + self.assertEqual(True, json['success']) + + # Verify the organization exists. + organization = model.get_organization(READ_ACCESS_USER) + assert organization is not None + + +class TestChangeUserDetails(ApiTestCase): + def test_changepassword(self): + self.login(READ_ACCESS_USER) + self.putJsonResponse('api.change_user_details', data=dict(password='newpasswordiscool')) + self.login(READ_ACCESS_USER, password='newpasswordiscool') + + def test_changeinvoiceemail(self): + self.login(READ_ACCESS_USER) + + json = self.putJsonResponse('api.change_user_details', data=dict(invoice_email=True)) + self.assertEquals(True, json['invoice_email']) + + json = self.putJsonResponse('api.change_user_details', data=dict(invoice_email=False)) + self.assertEquals(False, json['invoice_email']) + + +class TestCreateNewUser(ApiTestCase): + def test_existingusername(self): + json = self.postJsonResponse('api.create_new_user', + data=dict(username=READ_ACCESS_USER, + password='password', + email='test@example.com'), + expected_code=400) + + self.assertEquals('The username already exists', json['message']) + + def test_createuser(self): + data = self.postResponse('api.create_new_user', + data=NEW_USER_DETAILS, + expected_code=201) + self.assertEquals('Created', data) + + +class TestSignout(ApiTestCase): + def test_signout(self): + self.login(READ_ACCESS_USER) + + json = self.getJsonResponse('api.get_logged_in_user') + assert json['username'] == READ_ACCESS_USER + + self.postResponse('api.logout') + + json = self.getJsonResponse('api.get_logged_in_user') + assert json['anonymous'] == True + + +class TestGetMatchingEntities(ApiTestCase): + def test_notinorg(self): + self.login(NO_ACCESS_USER) + + json = self.getJsonResponse('api.get_matching_entities', + params=dict(prefix='o', namespace=ORGANIZATION, includeTeams=True)) + + names = set([r['name'] for r in json['results']]) + assert 'outsideorg' in names + assert not 'owners' in names + + def test_inorg(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_matching_entities', + params=dict(prefix='o', namespace=ORGANIZATION, includeTeams=True)) + + names = set([r['name'] for r in json['results']]) + assert 'outsideorg' in names + assert 'owners' in names + + +class TestCreateOrganization(ApiTestCase): + def test_existinguser(self): + self.login(ADMIN_ACCESS_USER) + + json = self.postJsonResponse('api.create_organization', data=dict(name=ADMIN_ACCESS_USER), + expected_code=400) + + self.assertEquals('A user or organization with this name already exists', json['message']) + + def test_existingorg(self): + self.login(ADMIN_ACCESS_USER) + + json = self.postJsonResponse('api.create_organization', data=dict(name=ORGANIZATION), + expected_code=400) + + self.assertEquals('A user or organization with this name already exists', json['message']) + + def test_createorg(self): + self.login(ADMIN_ACCESS_USER) + + data = self.postResponse('api.create_organization', + data=dict(name='neworg', email='test@example.com'), + expected_code=201) + + self.assertEquals('Created', data) + + # Ensure the org was created. + organization = model.get_organization('neworg') + assert organization is not None + + +class TestGetOrganization(ApiTestCase): + def test_unknownorg(self): + self.login(ADMIN_ACCESS_USER) + self.getResponse('api.get_organization', params=dict(orgname='notvalid'), + expected_code=403) + + def test_cannotaccess(self): + self.login(NO_ACCESS_USER) + self.getResponse('api.get_organization', params=dict(orgname=ORGANIZATION), + expected_code=403) + + def test_getorganization(self): + self.login(READ_ACCESS_USER) + json = self.getJsonResponse('api.get_organization', params=dict(orgname=ORGANIZATION)) + + self.assertEquals(ORGANIZATION, json['name']) + self.assertEquals(False, json['is_admin']) + + def test_getorganization_asadmin(self): + self.login(ADMIN_ACCESS_USER) + json = self.getJsonResponse('api.get_organization', params=dict(orgname=ORGANIZATION)) + + self.assertEquals(ORGANIZATION, json['name']) + self.assertEquals(True, json['is_admin']) + +class TestChangeOrganizationDetails(ApiTestCase): + def test_changeinvoiceemail(self): + self.login(ADMIN_ACCESS_USER) + + json = self.putJsonResponse('api.change_organization_details', + params=dict(orgname=ORGANIZATION), + data=dict(invoice_email=True)) + + self.assertEquals(True, json['invoice_email']) + + json = self.putJsonResponse('api.change_organization_details', + params=dict(orgname=ORGANIZATION), + data=dict(invoice_email=False)) + self.assertEquals(False, json['invoice_email']) + + + def test_changemail(self): + self.login(ADMIN_ACCESS_USER) + + json = self.putJsonResponse('api.change_organization_details', + params=dict(orgname=ORGANIZATION), + data=dict(email='newemail@example.com')) + + self.assertEquals('newemail@example.com', json['email']) + + if __name__ == '__main__': unittest.main() From 36d37e839bfe367d63e58bf953669638322f16fe Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 31 Jan 2014 18:54:31 -0500 Subject: [PATCH 3/6] Continue on API unit tests: Now 50% (or so) coverage --- endpoints/api.py | 8 +- test/test_api_usage.py | 262 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 265 insertions(+), 5 deletions(-) diff --git a/endpoints/api.py b/endpoints/api.py index a1b3ba51a..6dfc52dc1 100644 --- a/endpoints/api.py +++ b/endpoints/api.py @@ -610,9 +610,9 @@ def create_organization_prototype_permission(orgname): 'name' in details['activating_user']): activating_username = details['activating_user']['name'] - delegate = details['delegate'] - delegate_kind = delegate['kind'] - delegate_name = delegate['name'] + delegate = details['delegate'] if 'delegate' in details else {} + delegate_kind = delegate.get('kind', None) + delegate_name = delegate.get('name', None) delegate_username = delegate_name if delegate_kind == 'user' else None delegate_teamname = delegate_name if delegate_kind == 'team' else None @@ -628,7 +628,7 @@ def create_organization_prototype_permission(orgname): return request_error(message='Unknown activating user') if not delegate_user and not delegate_team: - return request_error(message='Missing delagate user or team') + return request_error(message='Missing delegate user or team') role_name = details['role'] diff --git a/test/test_api_usage.py b/test/test_api_usage.py index 67d8deb1b..81837d590 100644 --- a/test/test_api_usage.py +++ b/test/test_api_usage.py @@ -50,6 +50,11 @@ class ApiTestCase(unittest.TestCase): self.assertEquals(rv.status_code, expected_code) return rv.data + def deleteResponse(self, method_name, params={}, expected_code=204): + rv = self.app.delete(url_for(method_name, **params)) + self.assertEquals(rv.status_code, expected_code) + return rv.data + def postJsonResponse(self, method_name, params={}, data={}, expected_code=200): rv = self.app.post(url_for(method_name, **params), data=json.dumps(data), headers={"Content-Type": "application/json"}) @@ -158,6 +163,13 @@ class TestConvertToOrganization(ApiTestCase): # Verify the organization exists. organization = model.get_organization(READ_ACCESS_USER) assert organization is not None + + # Verify the admin user is the org's admin. + self.login(ADMIN_ACCESS_USER) + json = self.getJsonResponse('api.get_organization', params=dict(orgname=READ_ACCESS_USER)) + + self.assertEquals(READ_ACCESS_USER, json['name']) + self.assertEquals(True, json['is_admin']) class TestChangeUserDetails(ApiTestCase): @@ -257,7 +269,12 @@ class TestCreateOrganization(ApiTestCase): # Ensure the org was created. organization = model.get_organization('neworg') assert organization is not None - + + # Verify the admin user is the org's admin. + json = self.getJsonResponse('api.get_organization', params=dict(orgname='neworg')) + self.assertEquals('neworg', json['name']) + self.assertEquals(True, json['is_admin']) + class TestGetOrganization(ApiTestCase): def test_unknownorg(self): @@ -284,6 +301,7 @@ class TestGetOrganization(ApiTestCase): self.assertEquals(ORGANIZATION, json['name']) self.assertEquals(True, json['is_admin']) + class TestChangeOrganizationDetails(ApiTestCase): def test_changeinvoiceemail(self): self.login(ADMIN_ACCESS_USER) @@ -310,5 +328,247 @@ class TestChangeOrganizationDetails(ApiTestCase): self.assertEquals('newemail@example.com', json['email']) +class TestGetOrganizationPrototypes(ApiTestCase): + def test_getprototypes(self): + self.login(ADMIN_ACCESS_USER) + json = self.getJsonResponse('api.get_organization_prototype_permissions', + params=dict(orgname=ORGANIZATION)) + + assert len(json['prototypes']) > 0 + + +class TestCreateOrganizationPrototypes(ApiTestCase): + def test_invaliduser(self): + self.login(ADMIN_ACCESS_USER) + + json = self.postJsonResponse('api.create_organization_prototype_permission', + params=dict(orgname=ORGANIZATION), + data=dict(activating_user={'name': 'unknownuser'}, + role='read', + delegate={'kind': 'team', 'name': 'owners'}), + expected_code=400) + + self.assertEquals('Unknown activating user', json['message']) + + + def test_missingdelegate(self): + self.login(ADMIN_ACCESS_USER) + + json = self.postJsonResponse('api.create_organization_prototype_permission', + params=dict(orgname=ORGANIZATION), + data=dict(role='read'), + expected_code=400) + + self.assertEquals('Missing delegate user or team', json['message']) + + def test_createprototype(self): + self.login(ADMIN_ACCESS_USER) + + json = self.postJsonResponse('api.create_organization_prototype_permission', + params=dict(orgname=ORGANIZATION), + data=dict(role='read', delegate={'kind': 'team', 'name': 'readers'})) + + self.assertEquals('read', json['role']) + pid = json['id'] + + # Verify the prototype exists. + json = self.getJsonResponse('api.get_organization_prototype_permissions', + params=dict(orgname=ORGANIZATION)) + + ids = set([p['id'] for p in json['prototypes']]) + assert pid in ids + + +class TestDeleteOrganizationPrototypes(ApiTestCase): + def test_deleteprototype(self): + self.login(ADMIN_ACCESS_USER) + + # Get the existing prototypes + json = self.getJsonResponse('api.get_organization_prototype_permissions', + params=dict(orgname=ORGANIZATION)) + + ids = [p['id'] for p in json['prototypes']] + pid = ids[0] + + # Delete a prototype. + self.deleteResponse('api.delete_organization_prototype_permission', + params=dict(orgname=ORGANIZATION, prototypeid=pid)) + + # Verify the prototype no longer exists. + json = self.getJsonResponse('api.get_organization_prototype_permissions', + params=dict(orgname=ORGANIZATION)) + + newids = [p['id'] for p in json['prototypes']] + assert not pid in newids + + +class TestUpdateOrganizationPrototypes(ApiTestCase): + def test_updateprototype(self): + self.login(ADMIN_ACCESS_USER) + + # Get the existing prototypes + json = self.getJsonResponse('api.get_organization_prototype_permissions', + params=dict(orgname=ORGANIZATION)) + + ids = [p['id'] for p in json['prototypes']] + pid = ids[0] + + # Update a prototype. + json = self.putJsonResponse('api.delete_organization_prototype_permission', + params=dict(orgname=ORGANIZATION, prototypeid=pid), + data=dict(role='admin')) + + self.assertEquals('admin', json['role']) + + + +class TestGetOrganiaztionMembers(ApiTestCase): + def test_getmembers(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_organization_members', + params=dict(orgname=ORGANIZATION)) + + assert ADMIN_ACCESS_USER in json['members'] + assert READ_ACCESS_USER in json['members'] + assert not NO_ACCESS_USER in json['members'] + + def test_getspecificmember(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_organization_member', + params=dict(orgname=ORGANIZATION, membername=ADMIN_ACCESS_USER)) + + self.assertEquals(ADMIN_ACCESS_USER, json['member']['name']) + self.assertEquals('user', json['member']['kind']) + + assert 'owners' in json['member']['teams'] + + +class TestGetOrganizationPrivateAllowed(ApiTestCase): + def test_existingorg(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_organization_private_allowed', + params=dict(orgname=ORGANIZATION)) + + self.assertEquals(True, json['privateAllowed']) + assert not 'reposAllowed' in json + + + def test_neworg(self): + self.login(ADMIN_ACCESS_USER) + + data = self.postResponse('api.create_organization', + data=dict(name='neworg', email='test@example.com'), + expected_code=201) + + json = self.getJsonResponse('api.get_organization_private_allowed', + params=dict(orgname='neworg')) + + self.assertEquals(False, json['privateAllowed']) + + +class TestUpdateOrganizationTeam(ApiTestCase): + def test_updateexisting(self): + self.login(ADMIN_ACCESS_USER) + + data = self.postJsonResponse('api.update_organization_team', + params=dict(orgname=ORGANIZATION, teamname='readers'), + data=dict(description = 'My cool team', role = 'creator')) + + self.assertEquals('My cool team', data['description']) + self.assertEquals('creator', data['role']) + + def test_attemptchangeroleonowners(self): + self.login(ADMIN_ACCESS_USER) + + self.postResponse('api.update_organization_team', + params=dict(orgname=ORGANIZATION, teamname='owners'), + data=dict(role = 'creator'), + expected_code=400) + + def test_createnewteam(self): + self.login(ADMIN_ACCESS_USER) + + data = self.putJsonResponse('api.update_organization_team', + params=dict(orgname=ORGANIZATION, teamname='newteam'), + data=dict(description = 'My cool team', role = 'member'), + expected_code=201) + + self.assertEquals('My cool team', data['description']) + self.assertEquals('member', data['role']) + + # Verify the team was created. + json = self.getJsonResponse('api.get_organization', params=dict(orgname=ORGANIZATION)) + assert 'newteam' in json['teams'] + + +class TestDeleteOrganizationTeam(ApiTestCase): + def test_deleteteam(self): + self.login(ADMIN_ACCESS_USER) + + self.deleteResponse('api.delete_organization_team', + params=dict(orgname=ORGANIZATION, teamname='readers')) + + # Make sure the team was deleted + json = self.getJsonResponse('api.get_organization', params=dict(orgname=ORGANIZATION)) + assert not 'readers' in json['teams'] + + def test_attemptdeleteowners(self): + self.login(ADMIN_ACCESS_USER) + + self.deleteResponse('api.delete_organization_team', + params=dict(orgname=ORGANIZATION, teamname='owners'), + expected_code=400) + + +class TestGetOrganizationTeamMembers(ApiTestCase): + def test_invalidteam(self): + self.login(ADMIN_ACCESS_USER) + + self.getResponse('api.get_organization_team_members', + params=dict(orgname=ORGANIZATION, teamname='notvalid'), + expected_code=404) + + def test_getmembers(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_organization_team_members', + params=dict(orgname=ORGANIZATION, teamname='readers')) + + assert READ_ACCESS_USER in json['members'] + + +class TestUpdateOrganizationTeamMember(ApiTestCase): + def test_addmember(self): + self.login(ADMIN_ACCESS_USER) + + self.postJsonResponse('api.update_organization_team_member', + params=dict(orgname=ORGANIZATION, teamname='readers', membername=NO_ACCESS_USER)) + + + # Verify the user was added to the team. + json = self.getJsonResponse('api.get_organization_team_members', + params=dict(orgname=ORGANIZATION, teamname='readers')) + + assert NO_ACCESS_USER in json['members'] + + +class TestDeleteOrganizationTeamMember(ApiTestCase): + def test_deletemember(self): + self.login(ADMIN_ACCESS_USER) + + self.deleteResponse('api.delete_organization_team_member', + params=dict(orgname=ORGANIZATION, teamname='readers', membername=READ_ACCESS_USER)) + + + # Verify the user was removed from the team. + json = self.getJsonResponse('api.get_organization_team_members', + params=dict(orgname=ORGANIZATION, teamname='readers')) + + assert not READ_ACCESS_USER in json['members'] + + if __name__ == '__main__': unittest.main() From e3eee958a4cd91a89ae5b76a85f18b4e7c6b5907 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Fri, 31 Jan 2014 19:45:44 -0500 Subject: [PATCH 4/6] Continue on API unit tests: Now 60% (or so) coverage --- test/test_api_usage.py | 138 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) diff --git a/test/test_api_usage.py b/test/test_api_usage.py index 81837d590..205828c91 100644 --- a/test/test_api_usage.py +++ b/test/test_api_usage.py @@ -570,5 +570,143 @@ class TestDeleteOrganizationTeamMember(ApiTestCase): assert not READ_ACCESS_USER in json['members'] +class TestCreateRepo(ApiTestCase): + def test_duplicaterepo(self): + self.login(ADMIN_ACCESS_USER) + + json = self.postJsonResponse('api.create_repo', + data=dict(repository='simple', visibility='public'), + expected_code=400) + + self.assertEquals('Repository already exists', json['message']) + + + def test_createrepo(self): + self.login(ADMIN_ACCESS_USER) + + json = self.postJsonResponse('api.create_repo', + data=dict(repository='newrepo', visibility='public', description='')) + + + self.assertEquals(ADMIN_ACCESS_USER, json['namespace']) + self.assertEquals('newrepo', json['name']) + + + def test_createrepo_underorg(self): + self.login(ADMIN_ACCESS_USER) + + json = self.postJsonResponse('api.create_repo', + data=dict(namespace=ORGANIZATION, + repository='newrepo', + visibility='private', + description='')) + + self.assertEquals(ORGANIZATION, json['namespace']) + self.assertEquals('newrepo', json['name']) + + +class TestFindRepos(ApiTestCase): + def test_findrepos_asguest(self): + json = self.getJsonResponse('api.find_repos', params=dict(query='p')) + assert len(json['repositories']) == 1 + + self.assertEquals(json['repositories'][0]['namespace'], 'public') + self.assertEquals(json['repositories'][0]['name'], 'publicrepo') + + def test_findrepos_asuser(self): + self.login(NO_ACCESS_USER) + + json = self.getJsonResponse('api.find_repos', params=dict(query='p')) + assert len(json['repositories']) == 1 + + self.assertEquals(json['repositories'][0]['namespace'], 'public') + self.assertEquals(json['repositories'][0]['name'], 'publicrepo') + + def test_findrepos_orgmember(self): + self.login(READ_ACCESS_USER) + + json = self.getJsonResponse('api.find_repos', params=dict(query='p')) + assert len(json['repositories']) > 1 + + +class TestListRepos(ApiTestCase): + def test_listrepos_asguest(self): + json = self.getJsonResponse('api.list_repos', params=dict(public=True)) + assert len(json['repositories']) == 0 + + def test_listrepos_orgmember(self): + self.login(READ_ACCESS_USER) + json = self.getJsonResponse('api.list_repos', params=dict(public=True)) + assert len(json['repositories']) > 1 + + def test_listrepos_filter(self): + self.login(READ_ACCESS_USER) + json = self.getJsonResponse('api.list_repos', params=dict(namespace=ORGANIZATION, public=False)) + + for repo in json['repositories']: + self.assertEquals(ORGANIZATION, repo['namespace']) + + def test_listrepos_limit(self): + self.login(READ_ACCESS_USER) + json = self.getJsonResponse('api.list_repos', params=dict(limit=2)) + + assert len(json['repositories']) == 2 + + +class TestUpdateRepo(ApiTestCase): + def test_updatedescription(self): + self.login(ADMIN_ACCESS_USER) + + self.putJsonResponse('api.update_repo', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'), + data=dict(description='Some cool repo')) + + # Verify the repo description was updated. + json = self.getJsonResponse('api.get_repo', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + self.assertEquals('Some cool repo', json['description']) + + +class TestChangeRepoVisibility(ApiTestCase): + def test_changevisibility(self): + self.login(ADMIN_ACCESS_USER) + + # Make public. + self.postJsonResponse('api.change_repo_visibility', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'), + data=dict(visibility='public')) + + # Verify the visibility. + json = self.getJsonResponse('api.get_repo', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + self.assertEquals(True, json['is_public']) + + # Make private. + self.postJsonResponse('api.change_repo_visibility', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'), + data=dict(visibility='private')) + + # Verify the visibility. + json = self.getJsonResponse('api.get_repo', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + self.assertEquals(False, json['is_public']) + + +class TestDeleteRepository(ApiTestCase): + def test_deleterepo(self): + self.login(ADMIN_ACCESS_USER) + + self.deleteResponse('api.delete_repository', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + # Verify the repo was deleted. + self.getResponse('api.get_repo', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'), + expected_code=404) + + if __name__ == '__main__': unittest.main() From 08160afddeaf357fea80e072d4d99e8a68b1b248 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 3 Feb 2014 18:18:33 -0500 Subject: [PATCH 5/6] Finish API endpoint unit tests --- endpoints/api.py | 14 +- static/js/app.js | 7 +- test/data/test.db | Bin 141312 -> 143360 bytes test/test_api_usage.py | 569 ++++++++++++++++++++++++++++++++++++++++- 4 files changed, 586 insertions(+), 4 deletions(-) diff --git a/endpoints/api.py b/endpoints/api.py index 6dfc52dc1..e1c8710fa 100644 --- a/endpoints/api.py +++ b/endpoints/api.py @@ -1248,7 +1248,11 @@ def create_webhook(namespace, repository): def get_webhook(namespace, repository, public_id): permission = AdministerRepositoryPermission(namespace, repository) if permission.can(): - webhook = model.get_webhook(namespace, repository, public_id) + try: + webhook = model.get_webhook(namespace, repository, public_id) + except model.InvalidWebhookException: + abort(404) + return jsonify(webhook_view(webhook)) abort(403) # Permission denied @@ -1643,7 +1647,11 @@ def list_repo_tokens(namespace, repository): def get_tokens(namespace, repository, code): permission = AdministerRepositoryPermission(namespace, repository) if permission.can(): - perm = model.get_repo_delegate_token(namespace, repository, code) + try: + perm = model.get_repo_delegate_token(namespace, repository, code) + except model.InvalidTokenException: + abort(404) + return jsonify(token_view(perm)) abort(403) # Permission denied @@ -1780,6 +1788,8 @@ def set_card(user, token): cus.save() except stripe.CardError as e: return carderror_response(e) + except stripe.InvalidRequestError as e: + return carderror_response(e) return get_card(user) diff --git a/static/js/app.js b/static/js/app.js index 8d7d08231..8c2d6a4b4 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -427,6 +427,8 @@ quayApp = angular.module('quay', ['ngRoute', 'chieffancypants.loadingBar', 'rest var planService = {}; var listeners = []; + var previousSubscribeFailure = false; + planService.getFreePlan = function() { return 'free'; }; @@ -616,12 +618,15 @@ quayApp = angular.module('quay', ['ngRoute', 'chieffancypants.loadingBar', 'rest if (orgname && !planService.isOrgCompatible(plan)) { return; } planService.getCardInfo(orgname, function(cardInfo) { - if (plan.price > 0 && !cardInfo.last4) { + if (plan.price > 0 && (previousSubscribeFailure || !cardInfo.last4)) { planService.showSubscribeDialog($scope, orgname, planId, callbacks); return; } + previousSubscribeFailure = false; + planService.setSubscription(orgname, planId, callbacks['success'], function(resp) { + previousSubscribeFailure = true; planService.handleCardError(resp); callbacks['failure'](resp); }); diff --git a/test/data/test.db b/test/data/test.db index be6291cdaf3680fcaa53c5ac81823dcc8be9fd92..ef37436e49c58b2480d56f94c74fafc38a630553 100644 GIT binary patch delta 696 zcmZvZPiPZC6vpT6w;|bdGex{9)MXJG(GB}2yK7gdmehI?di1O$1;G}z^i+!V;6*Da zdq@w0A|6XA)pnr~+9FnHz&{|0prZAlR*O<79@2xeMnX+wV3>LDec$&RX8d_-{6p%- z-ozwhY~rSNR2+=8{g;b#MY`3=6QW9}iJeWL7V#`?Ag*0&n?FRdtZ_^;ETD!h{J>Xy z!aKai3smtKi+F(P`@7Vu2!Gy|GfAERQ8FV#A+@}GguHO;NxLwJm=LUB8^5rL&sf7U z9wNebyu~Y2g+YjL>6_ffW947@icj5~W0w7q#pAP)dYJB@T?QjD$>K>?4=RX|N_}z@ z8~7q9-{TEl;u%&2=MpL+q>SB7()b!at$AsVnfQ%!n2A!y=x!^`Vu6lpT(Y?aFES2F<*7bOpRx_p(JVi3oxn14o@aJ>4(~t@*`}2@%tqS| zTtDafj_LTB|F)fc&h{P8$*Xa@eBl7?iB|i_`#7H!pOXe$h6+aEittWc>g8#skiuBc zzOH0b)(#ZQ_H(9K)~J5|-02}b(DXAS!^U8#cy?q^AM4IET#ROy9mK6JC-s4grIJ2Y UP=$d~$7rIkgT|9u_5V=(2BER3MgRZ+ delta 260 zcmZp8z|nAlV}dm6R0alyu89ivK!)xk&Bm0iDU5T}w`nl3uqs}({?@w#zhDN4m&eCN^vu-VPJm5e3*F&b1QQi lvpcf}Gbht4rqfJofClt2P4{ 0 class TestConvertToOrganization(ApiTestCase): def test_sameadminuser(self): @@ -707,6 +711,569 @@ class TestDeleteRepository(ApiTestCase): params=dict(repository=ADMIN_ACCESS_USER + '/simple'), expected_code=404) +class TestGetRepository(ApiTestCase): + def test_getrepo_public_asguest(self): + json = self.getJsonResponse('api.get_repo', + params=dict(repository=PUBLIC_USER + '/publicrepo')) + + self.assertEquals(PUBLIC_USER, json['namespace']) + self.assertEquals('publicrepo', json['name']) + + self.assertEquals(True, json['is_public']) + self.assertEquals(False, json['is_organization']) + self.assertEquals(False, json['is_building']) + + self.assertEquals(False, json['can_write']) + self.assertEquals(False, json['can_admin']) + + assert 'latest' in json['tags'] + + def test_getrepo_public_asowner(self): + self.login(PUBLIC_USER) + + json = self.getJsonResponse('api.get_repo', + params=dict(repository=PUBLIC_USER + '/publicrepo')) + + self.assertEquals(False, json['is_organization']) + self.assertEquals(True, json['can_write']) + self.assertEquals(True, json['can_admin']) + + def test_getrepo_building(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_repo', + params=dict(repository=ADMIN_ACCESS_USER + '/building')) + + self.assertEquals(True, json['can_write']) + self.assertEquals(True, json['can_admin']) + self.assertEquals(True, json['is_building']) + self.assertEquals(False, json['is_organization']) + + def test_getrepo_org_asnonmember(self): + self.getResponse('api.get_repo', + params=dict(repository=ORGANIZATION + '/' + ORG_REPO), + expected_code=403) + + def test_getrepo_org_asreader(self): + self.login(READ_ACCESS_USER) + + json = self.getJsonResponse('api.get_repo', + params=dict(repository=ORGANIZATION + '/' + ORG_REPO)) + + self.assertEquals(ORGANIZATION, json['namespace']) + self.assertEquals(ORG_REPO, json['name']) + + self.assertEquals(False, json['can_write']) + self.assertEquals(False, json['can_admin']) + + self.assertEquals(True, json['is_organization']) + + def test_getrepo_org_asadmin(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_repo', + params=dict(repository=ORGANIZATION + '/' + ORG_REPO)) + + self.assertEquals(True, json['can_write']) + self.assertEquals(True, json['can_admin']) + + self.assertEquals(True, json['is_organization']) + + +class TestGetRepoBuilds(ApiTestCase): + def test_getrepo_nobuilds(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_repo_builds', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + assert len(json['builds']) == 0 + + def test_getrepobuilds(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_repo_builds', + params=dict(repository=ADMIN_ACCESS_USER + '/building')) + + assert len(json['builds']) > 0 + build = json['builds'][0] + + assert 'id' in build + assert 'status' in build + assert 'message' in build + + +class TestRequearRepoBuild(ApiTestCase): + def test_requestrepobuild(self): + self.login(ADMIN_ACCESS_USER) + + # Ensure where not yet building. + json = self.getJsonResponse('api.get_repo_builds', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + assert len(json['builds']) == 0 + + # Request a (fake) build. + self.postResponse('api.request_repo_build', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'), + data=dict(file_id = 'foobarbaz'), + expected_code=201) + + # Check for the build. + json = self.getJsonResponse('api.get_repo_builds', + params=dict(repository=ADMIN_ACCESS_USER + '/building')) + + assert len(json['builds']) > 0 + + +class TestWebhooks(ApiTestCase): + def test_webhooks(self): + self.login(ADMIN_ACCESS_USER) + + # Add a webhook. + json = self.postJsonResponse('api.create_webhook', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'), + data=dict(url='http://example.com')) + + self.assertEquals('http://example.com', json['parameters']['url']) + wid = json['public_id'] + + # Get the webhook. + json = self.getJsonResponse('api.get_webhook', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid)) + + self.assertEquals(wid, json['public_id']) + self.assertEquals('http://example.com', json['parameters']['url']) + + # Verify the webhook is listed. + json = self.getJsonResponse('api.list_webhooks', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + ids = [w['public_id'] for w in json['webhooks']] + assert wid in ids + + # Delete the webhook. + self.deleteResponse('api.delete_webhook', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid), + expected_code=204) + + # Verify the webhook is gone. + self.getResponse('api.get_webhook', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid), + expected_code=404) + + +class TestListAndGetImage(ApiTestCase): + def test_listandgetimages(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.list_repository_images', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + assert len(json['images']) > 0 + for image in json['images']: + assert 'id' in image + assert 'tags' in image + assert 'created' in image + assert 'comment' in image + assert 'command' in image + assert 'ancestors' in image + assert 'dbid' in image + assert 'size' in image + + ijson = self.getJsonResponse('api.get_image', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', + image_id=image['id'])) + + self.assertEquals(image['id'], ijson['id']) + +class TestGetImageChanges(ApiTestCase): + def test_getimagechanges(self): + self.login(ADMIN_ACCESS_USER) + + # Find an image to check. + json = self.getJsonResponse('api.list_repository_images', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + image_id = json['images'][0]['id'] + + # Lookup the image's changes. + # TODO: Fix me once we can get fake changes into the test data + #self.getJsonResponse('api.get_image_changes', + # params=dict(repository=ADMIN_ACCESS_USER + '/simple', + # image_id=image_id)) + + +class TestListAndDeleteTag(ApiTestCase): + def test_listtagimagesanddeletetag(self): + self.login(ADMIN_ACCESS_USER) + + # List the images for prod. + json = self.getJsonResponse('api.list_tag_images', + params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) + + prod_images = json['images'] + assert len(prod_images) > 0 + + # List the images for staging. + json = self.getJsonResponse('api.list_tag_images', + params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging')) + + staging_images = json['images'] + assert len(prod_images) == len(staging_images) + 1 + + # Delete prod. + self.deleteResponse('api.delete_full_tag', + params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'), + expected_code=204) + + # Make sure the tag is gone. + self.getResponse('api.list_tag_images', + params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'), + expected_code=404) + + # Make the sure the staging images are still there. + json = self.getJsonResponse('api.list_tag_images', + params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging')) + + self.assertEquals(staging_images, json['images']) + + + def test_deletesubtag(self): + self.login(ADMIN_ACCESS_USER) + + # List the images for prod. + json = self.getJsonResponse('api.list_tag_images', + params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) + + prod_images = json['images'] + assert len(prod_images) > 0 + + # Delete staging. + self.deleteResponse('api.delete_full_tag', + params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'), + expected_code=204) + + # Make sure the prod images are still around. + json = self.getJsonResponse('api.list_tag_images', + params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) + + self.assertEquals(prod_images, json['images']) + + +class TestRepoPermissions(ApiTestCase): + def listUserPermissions(self): + return self.getJsonResponse('api.list_repo_user_permissions', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'))['permissions'] + + def listTeamPermissions(self): + return self.getJsonResponse('api.list_repo_team_permissions', + params=dict(repository=ORGANIZATION + '/' + ORG_REPO))['permissions'] + + def test_userpermissions(self): + self.login(ADMIN_ACCESS_USER) + + # The repo should start with just the admin as a user perm. + permissions = self.listUserPermissions() + + self.assertEquals(1, len(permissions)) + assert ADMIN_ACCESS_USER in permissions + self.assertEquals('admin', permissions[ADMIN_ACCESS_USER]['role']) + + # Add another user. + self.putJsonResponse('api.change_user_permissions', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER), + data=dict(role='read')) + + # Verify the user is present. + permissions = self.listUserPermissions() + + self.assertEquals(2, len(permissions)) + assert NO_ACCESS_USER in permissions + self.assertEquals('read', permissions[NO_ACCESS_USER]['role']) + + json = self.getJsonResponse('api.get_user_permissions', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER)) + self.assertEquals('read', json['role']) + + # Change the user's permissions. + self.putJsonResponse('api.change_user_permissions', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER), + data=dict(role='admin')) + + # Verify. + permissions = self.listUserPermissions() + + self.assertEquals(2, len(permissions)) + assert NO_ACCESS_USER in permissions + self.assertEquals('admin', permissions[NO_ACCESS_USER]['role']) + + # Delete the user's permission. + self.deleteResponse('api.delete_user_permissions', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER)) + + # Verify. + permissions = self.listUserPermissions() + + self.assertEquals(1, len(permissions)) + assert not NO_ACCESS_USER in permissions + + + def test_teampermissions(self): + self.login(ADMIN_ACCESS_USER) + + # The repo should start with just the readers as a team perm. + permissions = self.listTeamPermissions() + + self.assertEquals(1, len(permissions)) + assert 'readers' in permissions + self.assertEquals('read', permissions['readers']['role']) + + # Add another team. + self.putJsonResponse('api.change_team_permissions', + params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'), + data=dict(role='write')) + + # Verify the team is present. + permissions = self.listTeamPermissions() + + self.assertEquals(2, len(permissions)) + assert 'owners' in permissions + self.assertEquals('write', permissions['owners']['role']) + + json = self.getJsonResponse('api.get_team_permissions', + params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners')) + self.assertEquals('write', json['role']) + + # Change the team's permissions. + self.putJsonResponse('api.change_team_permissions', + params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'), + data=dict(role='admin')) + + # Verify. + permissions = self.listTeamPermissions() + + self.assertEquals(2, len(permissions)) + assert 'owners' in permissions + self.assertEquals('admin', permissions['owners']['role']) + + # Delete the team's permission. + self.deleteResponse('api.delete_team_permissions', + params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners')) + + # Verify. + permissions = self.listTeamPermissions() + + self.assertEquals(1, len(permissions)) + assert not 'owners' in permissions + +class TestApiTokens(ApiTestCase): + def listTokens(self): + return self.getJsonResponse('api.list_repo_tokens', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'))['tokens'] + + def test_tokens(self): + self.login(ADMIN_ACCESS_USER) + + # Create a new token. + json = self.postJsonResponse('api.create_token', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'), + data=dict(role='read', friendlyName='mytoken'), + expected_code=201) + + self.assertEquals('mytoken', json['friendlyName']) + self.assertEquals('read', json['role']) + token_code = json['code'] + + # Verify. + tokens = self.listTokens() + assert token_code in tokens + self.assertEquals('mytoken', tokens[token_code]['friendlyName']) + + json = self.getJsonResponse('api.get_tokens', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) + self.assertEquals(tokens[token_code], json) + + # Change the token's permission. + self.putJsonResponse('api.change_token', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code), + data=dict(role='write')) + + # Verify. + json = self.getJsonResponse('api.get_tokens', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) + self.assertEquals('write', json['role']) + + # Delete the token. + self.deleteResponse('api.delete_token', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) + + # Verify. + self.getResponse('api.get_tokens', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code), + expected_code=404) + + +class TestUserCard(ApiTestCase): + def test_getusercard(self): + self.login(ADMIN_ACCESS_USER) + json = self.getJsonResponse('api.get_user_card') + + self.assertEquals('4242', json['card']['last4']) + self.assertEquals('Visa', json['card']['type']) + + def test_setusercard_error(self): + self.login(ADMIN_ACCESS_USER) + json = self.postJsonResponse('api.set_user_card', + data=dict(token='sometoken'), + expected_code=402) + assert 'carderror' in json + + +class TestOrgCard(ApiTestCase): + def test_getorgcard(self): + self.login(ADMIN_ACCESS_USER) + json = self.getJsonResponse('api.get_org_card', + params=dict(orgname=ORGANIZATION)) + + self.assertEquals('4242', json['card']['last4']) + self.assertEquals('Visa', json['card']['type']) + + +class TestUserSubscription(ApiTestCase): + def getSubscription(self): + return self.getJsonResponse('api.get_user_subscription') + + def test_updateplan(self): + self.login(ADMIN_ACCESS_USER) + + # Change the plan. + self.putJsonResponse('api.update_user_subscription', + data=dict(plan='free')) + + # Verify + sub = self.getSubscription() + self.assertEquals('free', sub['plan']) + + # Change the plan. + self.putJsonResponse('api.update_user_subscription', + data=dict(plan='bus-large')) + + # Verify + sub = self.getSubscription() + self.assertEquals('bus-large', sub['plan']) + + +class TestOrgSubscription(ApiTestCase): + def getSubscription(self): + return self.getJsonResponse('api.get_org_subscription', params=dict(orgname=ORGANIZATION)) + + def test_updateplan(self): + self.login(ADMIN_ACCESS_USER) + + # Change the plan. + self.putJsonResponse('api.update_org_subscription', + params=dict(orgname=ORGANIZATION), + data=dict(plan='free')) + + # Verify + sub = self.getSubscription() + self.assertEquals('free', sub['plan']) + + # Change the plan. + self.putJsonResponse('api.update_org_subscription', + params=dict(orgname=ORGANIZATION), + data=dict(plan='bus-large')) + + # Verify + sub = self.getSubscription() + self.assertEquals('bus-large', sub['plan']) + + +class TestUserRobots(ApiTestCase): + def getRobotNames(self): + return [r['name'] for r in self.getJsonResponse('api.get_user_robots')['robots']] + + def test_robots(self): + self.login(NO_ACCESS_USER) + + # Create a robot. + json = self.putJsonResponse('api.create_user_robot', + params=dict(robot_shortname='bender'), + expected_code=201) + + self.assertEquals(NO_ACCESS_USER + '+bender', json['name']) + + # Verify. + robots = self.getRobotNames() + assert NO_ACCESS_USER + '+bender' in robots + + # Delete the robot. + self.deleteResponse('api.delete_user_robot', + params=dict(robot_shortname='bender')) + + # Verify. + robots = self.getRobotNames() + assert not NO_ACCESS_USER + '+bender' in robots + + +class TestOrgRobots(ApiTestCase): + def getRobotNames(self): + return [r['name'] for r in self.getJsonResponse('api.get_org_robots', + params=dict(orgname=ORGANIZATION))['robots']] + + def test_robots(self): + self.login(ADMIN_ACCESS_USER) + + # Create a robot. + json = self.putJsonResponse('api.create_org_robot', + params=dict(orgname=ORGANIZATION, robot_shortname='bender'), + expected_code=201) + + self.assertEquals(ORGANIZATION + '+bender', json['name']) + + # Verify. + robots = self.getRobotNames() + assert ORGANIZATION + '+bender' in robots + + # Delete the robot. + self.deleteResponse('api.delete_org_robot', + params=dict(orgname=ORGANIZATION, robot_shortname='bender')) + + # Verify. + robots = self.getRobotNames() + assert not ORGANIZATION + '+bender' in robots + + +class TestLogs(ApiTestCase): + def test_user_logs(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.list_user_logs') + assert 'logs' in json + assert 'start_time' in json + assert 'end_time' in json + + def test_org_logs(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.list_org_logs', params=dict(orgname=ORGANIZATION)) + assert 'logs' in json + assert 'start_time' in json + assert 'end_time' in json + + def test_performer(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.list_org_logs', params=dict(orgname=ORGANIZATION)) + all_logs = json['logs'] + + json = self.getJsonResponse('api.list_org_logs', + params=dict(performer=READ_ACCESS_USER, orgname=ORGANIZATION)) + + assert len(json['logs']) < len(all_logs) + for log in json['logs']: + self.assertEquals(READ_ACCESS_USER, log['performer']['name']) if __name__ == '__main__': unittest.main() From 83a34c0ef3be44248c096a7bfcab8af3965163f1 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 3 Feb 2014 18:30:06 -0500 Subject: [PATCH 6/6] Fix the few broken security tests after the error code fixes --- test/specs.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/specs.py b/test/specs.py index e0a037655..0c44903b3 100644 --- a/test/specs.py +++ b/test/specs.py @@ -231,9 +231,9 @@ def build_specs(): TestSpec(url_for('api.get_webhook', repository=PUBLIC_REPO, public_id=FAKE_WEBHOOK), admin_code=403), TestSpec(url_for('api.get_webhook', repository=ORG_REPO, - public_id=FAKE_WEBHOOK), admin_code=400), + public_id=FAKE_WEBHOOK), admin_code=404), TestSpec(url_for('api.get_webhook', repository=PRIVATE_REPO, - public_id=FAKE_WEBHOOK), admin_code=400), + public_id=FAKE_WEBHOOK), admin_code=404), TestSpec(url_for('api.list_webhooks', repository=PUBLIC_REPO), admin_code=403), @@ -382,9 +382,9 @@ def build_specs(): TestSpec(url_for('api.get_tokens', repository=PUBLIC_REPO, code=FAKE_TOKEN), admin_code=403), TestSpec(url_for('api.get_tokens', repository=ORG_REPO, code=FAKE_TOKEN), - admin_code=400), + admin_code=404), TestSpec(url_for('api.get_tokens', repository=PRIVATE_REPO, - code=FAKE_TOKEN), admin_code=400), + code=FAKE_TOKEN), admin_code=404), TestSpec(url_for('api.create_token', repository=PUBLIC_REPO), admin_code=403).set_method('POST'),