diff --git a/endpoints/api.py b/endpoints/api.py index c3f9cf0ad..e1c8710fa 100644 --- a/endpoints/api.py +++ b/endpoints/api.py @@ -261,7 +261,6 @@ def convert_user_to_organization(): @internal_api_call def change_user_details(): user = current_user.db_user() - user_data = request.get_json() try: @@ -314,6 +313,8 @@ def create_new_user(): @internal_api_call def signin_user(): signin_data = request.get_json() + if not signin_data: + abort(404) username = signin_data['username'] password = signin_data['password'] @@ -420,6 +421,7 @@ def get_matching_entities(prefix): team_data = [entity_team_view(team) for team in teams] user_data = [user_view(user) for user in users] + return jsonify({ 'results': team_data + user_data }) @@ -445,11 +447,16 @@ def create_organization(): existing = None try: - existing = (model.get_organization(org_data['name']) or - model.get_user(org_data['name'])) + existing = model.get_organization(org_data['name']) except model.InvalidOrganizationException: pass + if not existing: + try: + existing = model.get_user(org_data['name']) + except model.InvalidUserException: + pass + if existing: msg = 'A user or organization with this name already exists' return request_error(message=msg) @@ -603,9 +610,9 @@ def create_organization_prototype_permission(orgname): 'name' in details['activating_user']): activating_username = details['activating_user']['name'] - delegate = details['delegate'] - delegate_kind = delegate['kind'] - delegate_name = delegate['name'] + delegate = details['delegate'] if 'delegate' in details else {} + delegate_kind = delegate.get('kind', None) + delegate_name = delegate.get('name', None) delegate_username = delegate_name if delegate_kind == 'user' else None delegate_teamname = delegate_name if delegate_kind == 'team' else None @@ -621,7 +628,7 @@ def create_organization_prototype_permission(orgname): return request_error(message='Unknown activating user') if not delegate_user and not delegate_team: - return request_error(message='Missing delagate user or team') + return request_error(message='Missing delegate user or team') role_name = details['role'] @@ -1241,7 +1248,11 @@ def create_webhook(namespace, repository): def get_webhook(namespace, repository, public_id): permission = AdministerRepositoryPermission(namespace, repository) if permission.can(): - webhook = model.get_webhook(namespace, repository, public_id) + try: + webhook = model.get_webhook(namespace, repository, public_id) + except model.InvalidWebhookException: + abort(404) + return jsonify(webhook_view(webhook)) abort(403) # Permission denied @@ -1636,7 +1647,11 @@ def list_repo_tokens(namespace, repository): def get_tokens(namespace, repository, code): permission = AdministerRepositoryPermission(namespace, repository) if permission.can(): - perm = model.get_repo_delegate_token(namespace, repository, code) + try: + perm = model.get_repo_delegate_token(namespace, repository, code) + except model.InvalidTokenException: + abort(404) + return jsonify(token_view(perm)) abort(403) # Permission denied @@ -1773,6 +1788,8 @@ def set_card(user, token): cus.save() except stripe.CardError as e: return carderror_response(e) + except stripe.InvalidRequestError as e: + return carderror_response(e) return get_card(user) diff --git a/static/js/app.js b/static/js/app.js index cf48945bd..31f75158c 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -427,6 +427,8 @@ quayApp = angular.module('quay', ['ngRoute', 'chieffancypants.loadingBar', 'rest var planService = {}; var listeners = []; + var previousSubscribeFailure = false; + planService.getFreePlan = function() { return 'free'; }; @@ -616,12 +618,15 @@ quayApp = angular.module('quay', ['ngRoute', 'chieffancypants.loadingBar', 'rest if (orgname && !planService.isOrgCompatible(plan)) { return; } planService.getCardInfo(orgname, function(cardInfo) { - if (plan.price > 0 && !cardInfo.last4) { + if (plan.price > 0 && (previousSubscribeFailure || !cardInfo.last4)) { planService.showSubscribeDialog($scope, orgname, planId, callbacks); return; } + previousSubscribeFailure = false; + planService.setSubscription(orgname, planId, callbacks['success'], function(resp) { + previousSubscribeFailure = true; planService.handleCardError(resp); callbacks['failure'](resp); }); diff --git a/test/data/test.db b/test/data/test.db index be6291cda..ef37436e4 100644 Binary files a/test/data/test.db and b/test/data/test.db differ diff --git a/test/specs.py b/test/specs.py index 0a523f0fe..65fb4332b 100644 --- a/test/specs.py +++ b/test/specs.py @@ -231,9 +231,9 @@ def build_specs(): TestSpec(url_for('api.get_webhook', repository=PUBLIC_REPO, public_id=FAKE_WEBHOOK), admin_code=403), TestSpec(url_for('api.get_webhook', repository=ORG_REPO, - public_id=FAKE_WEBHOOK), admin_code=400), + public_id=FAKE_WEBHOOK), admin_code=404), TestSpec(url_for('api.get_webhook', repository=PRIVATE_REPO, - public_id=FAKE_WEBHOOK), admin_code=400), + public_id=FAKE_WEBHOOK), admin_code=404), TestSpec(url_for('api.list_webhooks', repository=PUBLIC_REPO), admin_code=403), @@ -382,9 +382,9 @@ def build_specs(): TestSpec(url_for('api.get_tokens', repository=PUBLIC_REPO, code=FAKE_TOKEN), admin_code=403), TestSpec(url_for('api.get_tokens', repository=ORG_REPO, code=FAKE_TOKEN), - admin_code=400), + admin_code=404), TestSpec(url_for('api.get_tokens', repository=PRIVATE_REPO, - code=FAKE_TOKEN), admin_code=400), + code=FAKE_TOKEN), admin_code=404), TestSpec(url_for('api.create_token', repository=PUBLIC_REPO), admin_code=403).set_method('POST'), diff --git a/test/test_api_usage.py b/test/test_api_usage.py new file mode 100644 index 000000000..7f586a614 --- /dev/null +++ b/test/test_api_usage.py @@ -0,0 +1,1279 @@ +import unittest +import json + +from flask import url_for +from endpoints.api import api +from app import app +from initdb import setup_database_for_testing, finished_database_for_testing +from specs import build_specs +from data import model + +app.register_blueprint(api, url_prefix='/api') + +NO_ACCESS_USER = 'freshuser' +READ_ACCESS_USER = 'reader' +ADMIN_ACCESS_USER = 'devtable' +PUBLIC_USER = 'public' + +ORG_REPO = 'orgrepo' + +ORGANIZATION = 'buynlarge' + +NEW_USER_DETAILS = { + 'username': 'bobby', + 'password': 'password', + 'email': 'bobby@tables.com', +} + +class ApiTestCase(unittest.TestCase): + def setUp(self): + setup_database_for_testing(self) + self.app = app.test_client() + self.ctx = app.test_request_context() + self.ctx.__enter__() + + def tearDown(self): + finished_database_for_testing(self) + self.ctx.__exit__(True, None, None) + + def getJsonResponse(self, method_name, params={}): + rv = self.app.get(url_for(method_name, **params)) + self.assertEquals(200, rv.status_code) + data = rv.data + parsed = json.loads(data) + return parsed + + def postResponse(self, method_name, params={}, data={}, expected_code=200): + rv = self.app.post(url_for(method_name, **params), data=json.dumps(data), + headers={"Content-Type": "application/json"}) + self.assertEquals(rv.status_code, expected_code) + return rv.data + + def getResponse(self, method_name, params={}, expected_code=200): + rv = self.app.get(url_for(method_name, **params)) + self.assertEquals(rv.status_code, expected_code) + return rv.data + + def deleteResponse(self, method_name, params={}, expected_code=204): + rv = self.app.delete(url_for(method_name, **params)) + self.assertEquals(rv.status_code, expected_code) + return rv.data + + def postJsonResponse(self, method_name, params={}, data={}, expected_code=200): + rv = self.app.post(url_for(method_name, **params), data=json.dumps(data), + headers={"Content-Type": "application/json"}) + + if rv.status_code != expected_code: + print 'Mismatch data for method %s: %s' % (method_name, rv.data) + + self.assertEquals(rv.status_code, expected_code) + data = rv.data + parsed = json.loads(data) + return parsed + + def putJsonResponse(self, method_name, params={}, data={}, expected_code=200): + rv = self.app.put(url_for(method_name, **params), data=json.dumps(data), + headers={"Content-Type": "application/json"}) + + if rv.status_code != expected_code: + print 'Mismatch data for method %s: %s' % (method_name, rv.data) + + self.assertEquals(rv.status_code, expected_code) + data = rv.data + parsed = json.loads(data) + return parsed + + def login(self, username, password='password'): + return self.postJsonResponse('api.signin_user', data=dict(username=username, password=password)) + +class TestDiscovery(ApiTestCase): + def test_discovery(self): + """ Basic sanity check that discovery returns valid JSON in the expected format. """ + json = self.getJsonResponse('api.discovery') + found = set([]) + for method_info in json['endpoints']: + found.add(method_info['name']) + + assert 'discovery' in found + +class TestPlans(ApiTestCase): + def test_plans(self): + """ Basic sanity check that the plans are returned in the expected format. """ + json = self.getJsonResponse('api.list_plans') + found = set([]) + for method_info in json['plans']: + found.add(method_info['stripeId']) + + assert 'free' in found + +class TestLoggedInUser(ApiTestCase): + def test_guest(self): + json = self.getJsonResponse('api.get_logged_in_user') + assert json['anonymous'] == True + + def test_user(self): + self.login(READ_ACCESS_USER) + json = self.getJsonResponse('api.get_logged_in_user') + assert json['anonymous'] == False + assert json['username'] == READ_ACCESS_USER + +class TestGetUserPrivateCount(ApiTestCase): + def test_nonallowed(self): + self.login(READ_ACCESS_USER) + json = self.getJsonResponse('api.get_user_private_count') + assert json['privateCount'] == 0 + assert json['reposAllowed'] == 0 + + def test_allowed(self): + self.login(ADMIN_ACCESS_USER) + json = self.getJsonResponse('api.get_user_private_count') + assert json['privateCount'] == 6 + assert json['reposAllowed'] > 0 + +class TestConvertToOrganization(ApiTestCase): + def test_sameadminuser(self): + self.login(READ_ACCESS_USER) + json = self.postJsonResponse('api.convert_user_to_organization', + data={'adminUser': READ_ACCESS_USER, 'adminPassword': 'password'}, + expected_code=400) + + self.assertEqual('The admin user is not valid', json['message']) + + def test_invalidadminuser(self): + self.login(READ_ACCESS_USER) + json = self.postJsonResponse('api.convert_user_to_organization', + data={'adminUser': 'unknownuser', 'adminPassword': 'password'}, + expected_code=400) + + self.assertEqual('The admin user credentials are not valid', json['message']) + + def test_invalidadminpassword(self): + self.login(READ_ACCESS_USER) + json = self.postJsonResponse('api.convert_user_to_organization', + data={'adminUser': ADMIN_ACCESS_USER, 'adminPassword': 'invalidpass'}, + expected_code=400) + + self.assertEqual('The admin user credentials are not valid', json['message']) + + def test_convert(self): + self.login(READ_ACCESS_USER) + json = self.postJsonResponse('api.convert_user_to_organization', + data={'adminUser': ADMIN_ACCESS_USER, + 'adminPassword': 'password', + 'plan': 'free'}) + + self.assertEqual(True, json['success']) + + # Verify the organization exists. + organization = model.get_organization(READ_ACCESS_USER) + assert organization is not None + + # Verify the admin user is the org's admin. + self.login(ADMIN_ACCESS_USER) + json = self.getJsonResponse('api.get_organization', params=dict(orgname=READ_ACCESS_USER)) + + self.assertEquals(READ_ACCESS_USER, json['name']) + self.assertEquals(True, json['is_admin']) + + +class TestChangeUserDetails(ApiTestCase): + def test_changepassword(self): + self.login(READ_ACCESS_USER) + self.putJsonResponse('api.change_user_details', data=dict(password='newpasswordiscool')) + self.login(READ_ACCESS_USER, password='newpasswordiscool') + + def test_changeinvoiceemail(self): + self.login(READ_ACCESS_USER) + + json = self.putJsonResponse('api.change_user_details', data=dict(invoice_email=True)) + self.assertEquals(True, json['invoice_email']) + + json = self.putJsonResponse('api.change_user_details', data=dict(invoice_email=False)) + self.assertEquals(False, json['invoice_email']) + + +class TestCreateNewUser(ApiTestCase): + def test_existingusername(self): + json = self.postJsonResponse('api.create_new_user', + data=dict(username=READ_ACCESS_USER, + password='password', + email='test@example.com'), + expected_code=400) + + self.assertEquals('The username already exists', json['message']) + + def test_createuser(self): + data = self.postResponse('api.create_new_user', + data=NEW_USER_DETAILS, + expected_code=201) + self.assertEquals('Created', data) + + +class TestSignout(ApiTestCase): + def test_signout(self): + self.login(READ_ACCESS_USER) + + json = self.getJsonResponse('api.get_logged_in_user') + assert json['username'] == READ_ACCESS_USER + + self.postResponse('api.logout') + + json = self.getJsonResponse('api.get_logged_in_user') + assert json['anonymous'] == True + + +class TestGetMatchingEntities(ApiTestCase): + def test_notinorg(self): + self.login(NO_ACCESS_USER) + + json = self.getJsonResponse('api.get_matching_entities', + params=dict(prefix='o', namespace=ORGANIZATION, includeTeams=True)) + + names = set([r['name'] for r in json['results']]) + assert 'outsideorg' in names + assert not 'owners' in names + + def test_inorg(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_matching_entities', + params=dict(prefix='o', namespace=ORGANIZATION, includeTeams=True)) + + names = set([r['name'] for r in json['results']]) + assert 'outsideorg' in names + assert 'owners' in names + + +class TestCreateOrganization(ApiTestCase): + def test_existinguser(self): + self.login(ADMIN_ACCESS_USER) + + json = self.postJsonResponse('api.create_organization', data=dict(name=ADMIN_ACCESS_USER), + expected_code=400) + + self.assertEquals('A user or organization with this name already exists', json['message']) + + def test_existingorg(self): + self.login(ADMIN_ACCESS_USER) + + json = self.postJsonResponse('api.create_organization', data=dict(name=ORGANIZATION), + expected_code=400) + + self.assertEquals('A user or organization with this name already exists', json['message']) + + def test_createorg(self): + self.login(ADMIN_ACCESS_USER) + + data = self.postResponse('api.create_organization', + data=dict(name='neworg', email='test@example.com'), + expected_code=201) + + self.assertEquals('Created', data) + + # Ensure the org was created. + organization = model.get_organization('neworg') + assert organization is not None + + # Verify the admin user is the org's admin. + json = self.getJsonResponse('api.get_organization', params=dict(orgname='neworg')) + self.assertEquals('neworg', json['name']) + self.assertEquals(True, json['is_admin']) + + +class TestGetOrganization(ApiTestCase): + def test_unknownorg(self): + self.login(ADMIN_ACCESS_USER) + self.getResponse('api.get_organization', params=dict(orgname='notvalid'), + expected_code=403) + + def test_cannotaccess(self): + self.login(NO_ACCESS_USER) + self.getResponse('api.get_organization', params=dict(orgname=ORGANIZATION), + expected_code=403) + + def test_getorganization(self): + self.login(READ_ACCESS_USER) + json = self.getJsonResponse('api.get_organization', params=dict(orgname=ORGANIZATION)) + + self.assertEquals(ORGANIZATION, json['name']) + self.assertEquals(False, json['is_admin']) + + def test_getorganization_asadmin(self): + self.login(ADMIN_ACCESS_USER) + json = self.getJsonResponse('api.get_organization', params=dict(orgname=ORGANIZATION)) + + self.assertEquals(ORGANIZATION, json['name']) + self.assertEquals(True, json['is_admin']) + + +class TestChangeOrganizationDetails(ApiTestCase): + def test_changeinvoiceemail(self): + self.login(ADMIN_ACCESS_USER) + + json = self.putJsonResponse('api.change_organization_details', + params=dict(orgname=ORGANIZATION), + data=dict(invoice_email=True)) + + self.assertEquals(True, json['invoice_email']) + + json = self.putJsonResponse('api.change_organization_details', + params=dict(orgname=ORGANIZATION), + data=dict(invoice_email=False)) + self.assertEquals(False, json['invoice_email']) + + + def test_changemail(self): + self.login(ADMIN_ACCESS_USER) + + json = self.putJsonResponse('api.change_organization_details', + params=dict(orgname=ORGANIZATION), + data=dict(email='newemail@example.com')) + + self.assertEquals('newemail@example.com', json['email']) + + +class TestGetOrganizationPrototypes(ApiTestCase): + def test_getprototypes(self): + self.login(ADMIN_ACCESS_USER) + json = self.getJsonResponse('api.get_organization_prototype_permissions', + params=dict(orgname=ORGANIZATION)) + + assert len(json['prototypes']) > 0 + + +class TestCreateOrganizationPrototypes(ApiTestCase): + def test_invaliduser(self): + self.login(ADMIN_ACCESS_USER) + + json = self.postJsonResponse('api.create_organization_prototype_permission', + params=dict(orgname=ORGANIZATION), + data=dict(activating_user={'name': 'unknownuser'}, + role='read', + delegate={'kind': 'team', 'name': 'owners'}), + expected_code=400) + + self.assertEquals('Unknown activating user', json['message']) + + + def test_missingdelegate(self): + self.login(ADMIN_ACCESS_USER) + + json = self.postJsonResponse('api.create_organization_prototype_permission', + params=dict(orgname=ORGANIZATION), + data=dict(role='read'), + expected_code=400) + + self.assertEquals('Missing delegate user or team', json['message']) + + def test_createprototype(self): + self.login(ADMIN_ACCESS_USER) + + json = self.postJsonResponse('api.create_organization_prototype_permission', + params=dict(orgname=ORGANIZATION), + data=dict(role='read', delegate={'kind': 'team', 'name': 'readers'})) + + self.assertEquals('read', json['role']) + pid = json['id'] + + # Verify the prototype exists. + json = self.getJsonResponse('api.get_organization_prototype_permissions', + params=dict(orgname=ORGANIZATION)) + + ids = set([p['id'] for p in json['prototypes']]) + assert pid in ids + + +class TestDeleteOrganizationPrototypes(ApiTestCase): + def test_deleteprototype(self): + self.login(ADMIN_ACCESS_USER) + + # Get the existing prototypes + json = self.getJsonResponse('api.get_organization_prototype_permissions', + params=dict(orgname=ORGANIZATION)) + + ids = [p['id'] for p in json['prototypes']] + pid = ids[0] + + # Delete a prototype. + self.deleteResponse('api.delete_organization_prototype_permission', + params=dict(orgname=ORGANIZATION, prototypeid=pid)) + + # Verify the prototype no longer exists. + json = self.getJsonResponse('api.get_organization_prototype_permissions', + params=dict(orgname=ORGANIZATION)) + + newids = [p['id'] for p in json['prototypes']] + assert not pid in newids + + +class TestUpdateOrganizationPrototypes(ApiTestCase): + def test_updateprototype(self): + self.login(ADMIN_ACCESS_USER) + + # Get the existing prototypes + json = self.getJsonResponse('api.get_organization_prototype_permissions', + params=dict(orgname=ORGANIZATION)) + + ids = [p['id'] for p in json['prototypes']] + pid = ids[0] + + # Update a prototype. + json = self.putJsonResponse('api.delete_organization_prototype_permission', + params=dict(orgname=ORGANIZATION, prototypeid=pid), + data=dict(role='admin')) + + self.assertEquals('admin', json['role']) + + + +class TestGetOrganiaztionMembers(ApiTestCase): + def test_getmembers(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_organization_members', + params=dict(orgname=ORGANIZATION)) + + assert ADMIN_ACCESS_USER in json['members'] + assert READ_ACCESS_USER in json['members'] + assert not NO_ACCESS_USER in json['members'] + + def test_getspecificmember(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_organization_member', + params=dict(orgname=ORGANIZATION, membername=ADMIN_ACCESS_USER)) + + self.assertEquals(ADMIN_ACCESS_USER, json['member']['name']) + self.assertEquals('user', json['member']['kind']) + + assert 'owners' in json['member']['teams'] + + +class TestGetOrganizationPrivateAllowed(ApiTestCase): + def test_existingorg(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_organization_private_allowed', + params=dict(orgname=ORGANIZATION)) + + self.assertEquals(True, json['privateAllowed']) + assert not 'reposAllowed' in json + + + def test_neworg(self): + self.login(ADMIN_ACCESS_USER) + + data = self.postResponse('api.create_organization', + data=dict(name='neworg', email='test@example.com'), + expected_code=201) + + json = self.getJsonResponse('api.get_organization_private_allowed', + params=dict(orgname='neworg')) + + self.assertEquals(False, json['privateAllowed']) + + +class TestUpdateOrganizationTeam(ApiTestCase): + def test_updateexisting(self): + self.login(ADMIN_ACCESS_USER) + + data = self.postJsonResponse('api.update_organization_team', + params=dict(orgname=ORGANIZATION, teamname='readers'), + data=dict(description = 'My cool team', role = 'creator')) + + self.assertEquals('My cool team', data['description']) + self.assertEquals('creator', data['role']) + + def test_attemptchangeroleonowners(self): + self.login(ADMIN_ACCESS_USER) + + self.postResponse('api.update_organization_team', + params=dict(orgname=ORGANIZATION, teamname='owners'), + data=dict(role = 'creator'), + expected_code=400) + + def test_createnewteam(self): + self.login(ADMIN_ACCESS_USER) + + data = self.putJsonResponse('api.update_organization_team', + params=dict(orgname=ORGANIZATION, teamname='newteam'), + data=dict(description = 'My cool team', role = 'member'), + expected_code=201) + + self.assertEquals('My cool team', data['description']) + self.assertEquals('member', data['role']) + + # Verify the team was created. + json = self.getJsonResponse('api.get_organization', params=dict(orgname=ORGANIZATION)) + assert 'newteam' in json['teams'] + + +class TestDeleteOrganizationTeam(ApiTestCase): + def test_deleteteam(self): + self.login(ADMIN_ACCESS_USER) + + self.deleteResponse('api.delete_organization_team', + params=dict(orgname=ORGANIZATION, teamname='readers')) + + # Make sure the team was deleted + json = self.getJsonResponse('api.get_organization', params=dict(orgname=ORGANIZATION)) + assert not 'readers' in json['teams'] + + def test_attemptdeleteowners(self): + self.login(ADMIN_ACCESS_USER) + + self.deleteResponse('api.delete_organization_team', + params=dict(orgname=ORGANIZATION, teamname='owners'), + expected_code=400) + + +class TestGetOrganizationTeamMembers(ApiTestCase): + def test_invalidteam(self): + self.login(ADMIN_ACCESS_USER) + + self.getResponse('api.get_organization_team_members', + params=dict(orgname=ORGANIZATION, teamname='notvalid'), + expected_code=404) + + def test_getmembers(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_organization_team_members', + params=dict(orgname=ORGANIZATION, teamname='readers')) + + assert READ_ACCESS_USER in json['members'] + + +class TestUpdateOrganizationTeamMember(ApiTestCase): + def test_addmember(self): + self.login(ADMIN_ACCESS_USER) + + self.postJsonResponse('api.update_organization_team_member', + params=dict(orgname=ORGANIZATION, teamname='readers', membername=NO_ACCESS_USER)) + + + # Verify the user was added to the team. + json = self.getJsonResponse('api.get_organization_team_members', + params=dict(orgname=ORGANIZATION, teamname='readers')) + + assert NO_ACCESS_USER in json['members'] + + +class TestDeleteOrganizationTeamMember(ApiTestCase): + def test_deletemember(self): + self.login(ADMIN_ACCESS_USER) + + self.deleteResponse('api.delete_organization_team_member', + params=dict(orgname=ORGANIZATION, teamname='readers', membername=READ_ACCESS_USER)) + + + # Verify the user was removed from the team. + json = self.getJsonResponse('api.get_organization_team_members', + params=dict(orgname=ORGANIZATION, teamname='readers')) + + assert not READ_ACCESS_USER in json['members'] + + +class TestCreateRepo(ApiTestCase): + def test_duplicaterepo(self): + self.login(ADMIN_ACCESS_USER) + + json = self.postJsonResponse('api.create_repo', + data=dict(repository='simple', visibility='public'), + expected_code=400) + + self.assertEquals('Repository already exists', json['message']) + + + def test_createrepo(self): + self.login(ADMIN_ACCESS_USER) + + json = self.postJsonResponse('api.create_repo', + data=dict(repository='newrepo', visibility='public', description='')) + + + self.assertEquals(ADMIN_ACCESS_USER, json['namespace']) + self.assertEquals('newrepo', json['name']) + + + def test_createrepo_underorg(self): + self.login(ADMIN_ACCESS_USER) + + json = self.postJsonResponse('api.create_repo', + data=dict(namespace=ORGANIZATION, + repository='newrepo', + visibility='private', + description='')) + + self.assertEquals(ORGANIZATION, json['namespace']) + self.assertEquals('newrepo', json['name']) + + +class TestFindRepos(ApiTestCase): + def test_findrepos_asguest(self): + json = self.getJsonResponse('api.find_repos', params=dict(query='p')) + assert len(json['repositories']) == 1 + + self.assertEquals(json['repositories'][0]['namespace'], 'public') + self.assertEquals(json['repositories'][0]['name'], 'publicrepo') + + def test_findrepos_asuser(self): + self.login(NO_ACCESS_USER) + + json = self.getJsonResponse('api.find_repos', params=dict(query='p')) + assert len(json['repositories']) == 1 + + self.assertEquals(json['repositories'][0]['namespace'], 'public') + self.assertEquals(json['repositories'][0]['name'], 'publicrepo') + + def test_findrepos_orgmember(self): + self.login(READ_ACCESS_USER) + + json = self.getJsonResponse('api.find_repos', params=dict(query='p')) + assert len(json['repositories']) > 1 + + +class TestListRepos(ApiTestCase): + def test_listrepos_asguest(self): + json = self.getJsonResponse('api.list_repos', params=dict(public=True)) + assert len(json['repositories']) == 0 + + def test_listrepos_orgmember(self): + self.login(READ_ACCESS_USER) + json = self.getJsonResponse('api.list_repos', params=dict(public=True)) + assert len(json['repositories']) > 1 + + def test_listrepos_filter(self): + self.login(READ_ACCESS_USER) + json = self.getJsonResponse('api.list_repos', params=dict(namespace=ORGANIZATION, public=False)) + + for repo in json['repositories']: + self.assertEquals(ORGANIZATION, repo['namespace']) + + def test_listrepos_limit(self): + self.login(READ_ACCESS_USER) + json = self.getJsonResponse('api.list_repos', params=dict(limit=2)) + + assert len(json['repositories']) == 2 + + +class TestUpdateRepo(ApiTestCase): + def test_updatedescription(self): + self.login(ADMIN_ACCESS_USER) + + self.putJsonResponse('api.update_repo', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'), + data=dict(description='Some cool repo')) + + # Verify the repo description was updated. + json = self.getJsonResponse('api.get_repo', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + self.assertEquals('Some cool repo', json['description']) + + +class TestChangeRepoVisibility(ApiTestCase): + def test_changevisibility(self): + self.login(ADMIN_ACCESS_USER) + + # Make public. + self.postJsonResponse('api.change_repo_visibility', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'), + data=dict(visibility='public')) + + # Verify the visibility. + json = self.getJsonResponse('api.get_repo', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + self.assertEquals(True, json['is_public']) + + # Make private. + self.postJsonResponse('api.change_repo_visibility', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'), + data=dict(visibility='private')) + + # Verify the visibility. + json = self.getJsonResponse('api.get_repo', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + self.assertEquals(False, json['is_public']) + + +class TestDeleteRepository(ApiTestCase): + def test_deleterepo(self): + self.login(ADMIN_ACCESS_USER) + + self.deleteResponse('api.delete_repository', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + # Verify the repo was deleted. + self.getResponse('api.get_repo', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'), + expected_code=404) + +class TestGetRepository(ApiTestCase): + def test_getrepo_public_asguest(self): + json = self.getJsonResponse('api.get_repo', + params=dict(repository=PUBLIC_USER + '/publicrepo')) + + self.assertEquals(PUBLIC_USER, json['namespace']) + self.assertEquals('publicrepo', json['name']) + + self.assertEquals(True, json['is_public']) + self.assertEquals(False, json['is_organization']) + self.assertEquals(False, json['is_building']) + + self.assertEquals(False, json['can_write']) + self.assertEquals(False, json['can_admin']) + + assert 'latest' in json['tags'] + + def test_getrepo_public_asowner(self): + self.login(PUBLIC_USER) + + json = self.getJsonResponse('api.get_repo', + params=dict(repository=PUBLIC_USER + '/publicrepo')) + + self.assertEquals(False, json['is_organization']) + self.assertEquals(True, json['can_write']) + self.assertEquals(True, json['can_admin']) + + def test_getrepo_building(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_repo', + params=dict(repository=ADMIN_ACCESS_USER + '/building')) + + self.assertEquals(True, json['can_write']) + self.assertEquals(True, json['can_admin']) + self.assertEquals(True, json['is_building']) + self.assertEquals(False, json['is_organization']) + + def test_getrepo_org_asnonmember(self): + self.getResponse('api.get_repo', + params=dict(repository=ORGANIZATION + '/' + ORG_REPO), + expected_code=403) + + def test_getrepo_org_asreader(self): + self.login(READ_ACCESS_USER) + + json = self.getJsonResponse('api.get_repo', + params=dict(repository=ORGANIZATION + '/' + ORG_REPO)) + + self.assertEquals(ORGANIZATION, json['namespace']) + self.assertEquals(ORG_REPO, json['name']) + + self.assertEquals(False, json['can_write']) + self.assertEquals(False, json['can_admin']) + + self.assertEquals(True, json['is_organization']) + + def test_getrepo_org_asadmin(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_repo', + params=dict(repository=ORGANIZATION + '/' + ORG_REPO)) + + self.assertEquals(True, json['can_write']) + self.assertEquals(True, json['can_admin']) + + self.assertEquals(True, json['is_organization']) + + +class TestGetRepoBuilds(ApiTestCase): + def test_getrepo_nobuilds(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_repo_builds', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + assert len(json['builds']) == 0 + + def test_getrepobuilds(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.get_repo_builds', + params=dict(repository=ADMIN_ACCESS_USER + '/building')) + + assert len(json['builds']) > 0 + build = json['builds'][0] + + assert 'id' in build + assert 'status' in build + assert 'message' in build + + +class TestRequearRepoBuild(ApiTestCase): + def test_requestrepobuild(self): + self.login(ADMIN_ACCESS_USER) + + # Ensure where not yet building. + json = self.getJsonResponse('api.get_repo_builds', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + assert len(json['builds']) == 0 + + # Request a (fake) build. + self.postResponse('api.request_repo_build', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'), + data=dict(file_id = 'foobarbaz'), + expected_code=201) + + # Check for the build. + json = self.getJsonResponse('api.get_repo_builds', + params=dict(repository=ADMIN_ACCESS_USER + '/building')) + + assert len(json['builds']) > 0 + + +class TestWebhooks(ApiTestCase): + def test_webhooks(self): + self.login(ADMIN_ACCESS_USER) + + # Add a webhook. + json = self.postJsonResponse('api.create_webhook', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'), + data=dict(url='http://example.com')) + + self.assertEquals('http://example.com', json['parameters']['url']) + wid = json['public_id'] + + # Get the webhook. + json = self.getJsonResponse('api.get_webhook', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid)) + + self.assertEquals(wid, json['public_id']) + self.assertEquals('http://example.com', json['parameters']['url']) + + # Verify the webhook is listed. + json = self.getJsonResponse('api.list_webhooks', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + ids = [w['public_id'] for w in json['webhooks']] + assert wid in ids + + # Delete the webhook. + self.deleteResponse('api.delete_webhook', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid), + expected_code=204) + + # Verify the webhook is gone. + self.getResponse('api.get_webhook', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid), + expected_code=404) + + +class TestListAndGetImage(ApiTestCase): + def test_listandgetimages(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.list_repository_images', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + assert len(json['images']) > 0 + for image in json['images']: + assert 'id' in image + assert 'tags' in image + assert 'created' in image + assert 'comment' in image + assert 'command' in image + assert 'ancestors' in image + assert 'dbid' in image + assert 'size' in image + + ijson = self.getJsonResponse('api.get_image', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', + image_id=image['id'])) + + self.assertEquals(image['id'], ijson['id']) + +class TestGetImageChanges(ApiTestCase): + def test_getimagechanges(self): + self.login(ADMIN_ACCESS_USER) + + # Find an image to check. + json = self.getJsonResponse('api.list_repository_images', + params=dict(repository=ADMIN_ACCESS_USER + '/simple')) + + image_id = json['images'][0]['id'] + + # Lookup the image's changes. + # TODO: Fix me once we can get fake changes into the test data + #self.getJsonResponse('api.get_image_changes', + # params=dict(repository=ADMIN_ACCESS_USER + '/simple', + # image_id=image_id)) + + +class TestListAndDeleteTag(ApiTestCase): + def test_listtagimagesanddeletetag(self): + self.login(ADMIN_ACCESS_USER) + + # List the images for prod. + json = self.getJsonResponse('api.list_tag_images', + params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) + + prod_images = json['images'] + assert len(prod_images) > 0 + + # List the images for staging. + json = self.getJsonResponse('api.list_tag_images', + params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging')) + + staging_images = json['images'] + assert len(prod_images) == len(staging_images) + 1 + + # Delete prod. + self.deleteResponse('api.delete_full_tag', + params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'), + expected_code=204) + + # Make sure the tag is gone. + self.getResponse('api.list_tag_images', + params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'), + expected_code=404) + + # Make the sure the staging images are still there. + json = self.getJsonResponse('api.list_tag_images', + params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging')) + + self.assertEquals(staging_images, json['images']) + + + def test_deletesubtag(self): + self.login(ADMIN_ACCESS_USER) + + # List the images for prod. + json = self.getJsonResponse('api.list_tag_images', + params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) + + prod_images = json['images'] + assert len(prod_images) > 0 + + # Delete staging. + self.deleteResponse('api.delete_full_tag', + params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'), + expected_code=204) + + # Make sure the prod images are still around. + json = self.getJsonResponse('api.list_tag_images', + params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) + + self.assertEquals(prod_images, json['images']) + + +class TestRepoPermissions(ApiTestCase): + def listUserPermissions(self): + return self.getJsonResponse('api.list_repo_user_permissions', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'))['permissions'] + + def listTeamPermissions(self): + return self.getJsonResponse('api.list_repo_team_permissions', + params=dict(repository=ORGANIZATION + '/' + ORG_REPO))['permissions'] + + def test_userpermissions(self): + self.login(ADMIN_ACCESS_USER) + + # The repo should start with just the admin as a user perm. + permissions = self.listUserPermissions() + + self.assertEquals(1, len(permissions)) + assert ADMIN_ACCESS_USER in permissions + self.assertEquals('admin', permissions[ADMIN_ACCESS_USER]['role']) + + # Add another user. + self.putJsonResponse('api.change_user_permissions', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER), + data=dict(role='read')) + + # Verify the user is present. + permissions = self.listUserPermissions() + + self.assertEquals(2, len(permissions)) + assert NO_ACCESS_USER in permissions + self.assertEquals('read', permissions[NO_ACCESS_USER]['role']) + + json = self.getJsonResponse('api.get_user_permissions', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER)) + self.assertEquals('read', json['role']) + + # Change the user's permissions. + self.putJsonResponse('api.change_user_permissions', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER), + data=dict(role='admin')) + + # Verify. + permissions = self.listUserPermissions() + + self.assertEquals(2, len(permissions)) + assert NO_ACCESS_USER in permissions + self.assertEquals('admin', permissions[NO_ACCESS_USER]['role']) + + # Delete the user's permission. + self.deleteResponse('api.delete_user_permissions', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER)) + + # Verify. + permissions = self.listUserPermissions() + + self.assertEquals(1, len(permissions)) + assert not NO_ACCESS_USER in permissions + + + def test_teampermissions(self): + self.login(ADMIN_ACCESS_USER) + + # The repo should start with just the readers as a team perm. + permissions = self.listTeamPermissions() + + self.assertEquals(1, len(permissions)) + assert 'readers' in permissions + self.assertEquals('read', permissions['readers']['role']) + + # Add another team. + self.putJsonResponse('api.change_team_permissions', + params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'), + data=dict(role='write')) + + # Verify the team is present. + permissions = self.listTeamPermissions() + + self.assertEquals(2, len(permissions)) + assert 'owners' in permissions + self.assertEquals('write', permissions['owners']['role']) + + json = self.getJsonResponse('api.get_team_permissions', + params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners')) + self.assertEquals('write', json['role']) + + # Change the team's permissions. + self.putJsonResponse('api.change_team_permissions', + params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'), + data=dict(role='admin')) + + # Verify. + permissions = self.listTeamPermissions() + + self.assertEquals(2, len(permissions)) + assert 'owners' in permissions + self.assertEquals('admin', permissions['owners']['role']) + + # Delete the team's permission. + self.deleteResponse('api.delete_team_permissions', + params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners')) + + # Verify. + permissions = self.listTeamPermissions() + + self.assertEquals(1, len(permissions)) + assert not 'owners' in permissions + +class TestApiTokens(ApiTestCase): + def listTokens(self): + return self.getJsonResponse('api.list_repo_tokens', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'))['tokens'] + + def test_tokens(self): + self.login(ADMIN_ACCESS_USER) + + # Create a new token. + json = self.postJsonResponse('api.create_token', + params=dict(repository=ADMIN_ACCESS_USER + '/simple'), + data=dict(role='read', friendlyName='mytoken'), + expected_code=201) + + self.assertEquals('mytoken', json['friendlyName']) + self.assertEquals('read', json['role']) + token_code = json['code'] + + # Verify. + tokens = self.listTokens() + assert token_code in tokens + self.assertEquals('mytoken', tokens[token_code]['friendlyName']) + + json = self.getJsonResponse('api.get_tokens', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) + self.assertEquals(tokens[token_code], json) + + # Change the token's permission. + self.putJsonResponse('api.change_token', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code), + data=dict(role='write')) + + # Verify. + json = self.getJsonResponse('api.get_tokens', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) + self.assertEquals('write', json['role']) + + # Delete the token. + self.deleteResponse('api.delete_token', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) + + # Verify. + self.getResponse('api.get_tokens', + params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code), + expected_code=404) + + +class TestUserCard(ApiTestCase): + def test_getusercard(self): + self.login(ADMIN_ACCESS_USER) + json = self.getJsonResponse('api.get_user_card') + + self.assertEquals('4242', json['card']['last4']) + self.assertEquals('Visa', json['card']['type']) + + def test_setusercard_error(self): + self.login(ADMIN_ACCESS_USER) + json = self.postJsonResponse('api.set_user_card', + data=dict(token='sometoken'), + expected_code=402) + assert 'carderror' in json + + +class TestOrgCard(ApiTestCase): + def test_getorgcard(self): + self.login(ADMIN_ACCESS_USER) + json = self.getJsonResponse('api.get_org_card', + params=dict(orgname=ORGANIZATION)) + + self.assertEquals('4242', json['card']['last4']) + self.assertEquals('Visa', json['card']['type']) + + +class TestUserSubscription(ApiTestCase): + def getSubscription(self): + return self.getJsonResponse('api.get_user_subscription') + + def test_updateplan(self): + self.login(ADMIN_ACCESS_USER) + + # Change the plan. + self.putJsonResponse('api.update_user_subscription', + data=dict(plan='free')) + + # Verify + sub = self.getSubscription() + self.assertEquals('free', sub['plan']) + + # Change the plan. + self.putJsonResponse('api.update_user_subscription', + data=dict(plan='bus-large')) + + # Verify + sub = self.getSubscription() + self.assertEquals('bus-large', sub['plan']) + + +class TestOrgSubscription(ApiTestCase): + def getSubscription(self): + return self.getJsonResponse('api.get_org_subscription', params=dict(orgname=ORGANIZATION)) + + def test_updateplan(self): + self.login(ADMIN_ACCESS_USER) + + # Change the plan. + self.putJsonResponse('api.update_org_subscription', + params=dict(orgname=ORGANIZATION), + data=dict(plan='free')) + + # Verify + sub = self.getSubscription() + self.assertEquals('free', sub['plan']) + + # Change the plan. + self.putJsonResponse('api.update_org_subscription', + params=dict(orgname=ORGANIZATION), + data=dict(plan='bus-large')) + + # Verify + sub = self.getSubscription() + self.assertEquals('bus-large', sub['plan']) + + +class TestUserRobots(ApiTestCase): + def getRobotNames(self): + return [r['name'] for r in self.getJsonResponse('api.get_user_robots')['robots']] + + def test_robots(self): + self.login(NO_ACCESS_USER) + + # Create a robot. + json = self.putJsonResponse('api.create_user_robot', + params=dict(robot_shortname='bender'), + expected_code=201) + + self.assertEquals(NO_ACCESS_USER + '+bender', json['name']) + + # Verify. + robots = self.getRobotNames() + assert NO_ACCESS_USER + '+bender' in robots + + # Delete the robot. + self.deleteResponse('api.delete_user_robot', + params=dict(robot_shortname='bender')) + + # Verify. + robots = self.getRobotNames() + assert not NO_ACCESS_USER + '+bender' in robots + + +class TestOrgRobots(ApiTestCase): + def getRobotNames(self): + return [r['name'] for r in self.getJsonResponse('api.get_org_robots', + params=dict(orgname=ORGANIZATION))['robots']] + + def test_robots(self): + self.login(ADMIN_ACCESS_USER) + + # Create a robot. + json = self.putJsonResponse('api.create_org_robot', + params=dict(orgname=ORGANIZATION, robot_shortname='bender'), + expected_code=201) + + self.assertEquals(ORGANIZATION + '+bender', json['name']) + + # Verify. + robots = self.getRobotNames() + assert ORGANIZATION + '+bender' in robots + + # Delete the robot. + self.deleteResponse('api.delete_org_robot', + params=dict(orgname=ORGANIZATION, robot_shortname='bender')) + + # Verify. + robots = self.getRobotNames() + assert not ORGANIZATION + '+bender' in robots + + +class TestLogs(ApiTestCase): + def test_user_logs(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.list_user_logs') + assert 'logs' in json + assert 'start_time' in json + assert 'end_time' in json + + def test_org_logs(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.list_org_logs', params=dict(orgname=ORGANIZATION)) + assert 'logs' in json + assert 'start_time' in json + assert 'end_time' in json + + def test_performer(self): + self.login(ADMIN_ACCESS_USER) + + json = self.getJsonResponse('api.list_org_logs', params=dict(orgname=ORGANIZATION)) + all_logs = json['logs'] + + json = self.getJsonResponse('api.list_org_logs', + params=dict(performer=READ_ACCESS_USER, orgname=ORGANIZATION)) + + assert len(json['logs']) < len(all_logs) + for log in json['logs']: + self.assertEquals(READ_ACCESS_USER, log['performer']['name']) + +if __name__ == '__main__': + unittest.main()