import unittest import json from flask import url_for from endpoints.api import api from app import app from initdb import setup_database_for_testing, finished_database_for_testing from specs import build_specs from data import model app.register_blueprint(api, url_prefix='/api') NO_ACCESS_USER = 'freshuser' READ_ACCESS_USER = 'reader' ADMIN_ACCESS_USER = 'devtable' PUBLIC_USER = 'public' ORG_REPO = 'orgrepo' ORGANIZATION = 'buynlarge' NEW_USER_DETAILS = { 'username': 'bobby', 'password': 'password', 'email': 'bobby@tables.com', } class ApiTestCase(unittest.TestCase): def setUp(self): setup_database_for_testing(self) self.app = app.test_client() self.ctx = app.test_request_context() self.ctx.__enter__() def tearDown(self): finished_database_for_testing(self) self.ctx.__exit__(True, None, None) def getJsonResponse(self, method_name, params={}): rv = self.app.get(url_for(method_name, **params)) self.assertEquals(200, rv.status_code) data = rv.data parsed = json.loads(data) return parsed def postResponse(self, method_name, params={}, data={}, expected_code=200): rv = self.app.post(url_for(method_name, **params), data=json.dumps(data), headers={"Content-Type": "application/json"}) self.assertEquals(rv.status_code, expected_code) return rv.data def getResponse(self, method_name, params={}, expected_code=200): rv = self.app.get(url_for(method_name, **params)) self.assertEquals(rv.status_code, expected_code) return rv.data def deleteResponse(self, method_name, params={}, expected_code=204): rv = self.app.delete(url_for(method_name, **params)) self.assertEquals(rv.status_code, expected_code) return rv.data def postJsonResponse(self, method_name, params={}, data={}, expected_code=200): rv = self.app.post(url_for(method_name, **params), data=json.dumps(data), headers={"Content-Type": "application/json"}) if rv.status_code != expected_code: print 'Mismatch data for method %s: %s' % (method_name, rv.data) self.assertEquals(rv.status_code, expected_code) data = rv.data parsed = json.loads(data) return parsed def putJsonResponse(self, method_name, params={}, data={}, expected_code=200): rv = self.app.put(url_for(method_name, **params), data=json.dumps(data), headers={"Content-Type": "application/json"}) if rv.status_code != expected_code: print 'Mismatch data for method %s: %s' % (method_name, rv.data) self.assertEquals(rv.status_code, expected_code) data = rv.data parsed = json.loads(data) return parsed def login(self, username, password='password'): return self.postJsonResponse('api.signin_user', data=dict(username=username, password=password)) class TestDiscovery(ApiTestCase): def test_discovery(self): """ Basic sanity check that discovery returns valid JSON in the expected format. """ json = self.getJsonResponse('api.discovery') found = set([]) for method_info in json['endpoints']: found.add(method_info['name']) assert 'discovery' in found class TestPlans(ApiTestCase): def test_plans(self): """ Basic sanity check that the plans are returned in the expected format. """ json = self.getJsonResponse('api.list_plans') found = set([]) for method_info in json['plans']: found.add(method_info['stripeId']) assert 'free' in found class TestLoggedInUser(ApiTestCase): def test_guest(self): json = self.getJsonResponse('api.get_logged_in_user') assert json['anonymous'] == True def test_user(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse('api.get_logged_in_user') assert json['anonymous'] == False assert json['username'] == READ_ACCESS_USER class TestGetUserPrivateCount(ApiTestCase): def test_nonallowed(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse('api.get_user_private_count') assert json['privateCount'] == 0 assert json['reposAllowed'] == 0 def test_allowed(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.get_user_private_count') assert json['privateCount'] == 6 assert json['reposAllowed'] > 0 class TestConvertToOrganization(ApiTestCase): def test_sameadminuser(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse('api.convert_user_to_organization', data={'adminUser': READ_ACCESS_USER, 'adminPassword': 'password'}, expected_code=400) self.assertEqual('The admin user is not valid', json['message']) def test_invalidadminuser(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse('api.convert_user_to_organization', data={'adminUser': 'unknownuser', 'adminPassword': 'password'}, expected_code=400) self.assertEqual('The admin user credentials are not valid', json['message']) def test_invalidadminpassword(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse('api.convert_user_to_organization', data={'adminUser': ADMIN_ACCESS_USER, 'adminPassword': 'invalidpass'}, expected_code=400) self.assertEqual('The admin user credentials are not valid', json['message']) def test_convert(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse('api.convert_user_to_organization', data={'adminUser': ADMIN_ACCESS_USER, 'adminPassword': 'password', 'plan': 'free'}) self.assertEqual(True, json['success']) # Verify the organization exists. organization = model.get_organization(READ_ACCESS_USER) assert organization is not None # Verify the admin user is the org's admin. self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.get_organization', params=dict(orgname=READ_ACCESS_USER)) self.assertEquals(READ_ACCESS_USER, json['name']) self.assertEquals(True, json['is_admin']) class TestChangeUserDetails(ApiTestCase): def test_changepassword(self): self.login(READ_ACCESS_USER) self.putJsonResponse('api.change_user_details', data=dict(password='newpasswordiscool')) self.login(READ_ACCESS_USER, password='newpasswordiscool') def test_changeinvoiceemail(self): self.login(READ_ACCESS_USER) json = self.putJsonResponse('api.change_user_details', data=dict(invoice_email=True)) self.assertEquals(True, json['invoice_email']) json = self.putJsonResponse('api.change_user_details', data=dict(invoice_email=False)) self.assertEquals(False, json['invoice_email']) class TestCreateNewUser(ApiTestCase): def test_existingusername(self): json = self.postJsonResponse('api.create_new_user', data=dict(username=READ_ACCESS_USER, password='password', email='test@example.com'), expected_code=400) self.assertEquals('The username already exists', json['message']) def test_createuser(self): data = self.postResponse('api.create_new_user', data=NEW_USER_DETAILS, expected_code=201) self.assertEquals('Created', data) class TestSignout(ApiTestCase): def test_signout(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse('api.get_logged_in_user') assert json['username'] == READ_ACCESS_USER self.postResponse('api.logout') json = self.getJsonResponse('api.get_logged_in_user') assert json['anonymous'] == True class TestGetMatchingEntities(ApiTestCase): def test_notinorg(self): self.login(NO_ACCESS_USER) json = self.getJsonResponse('api.get_matching_entities', params=dict(prefix='o', namespace=ORGANIZATION, includeTeams=True)) names = set([r['name'] for r in json['results']]) assert 'outsideorg' in names assert not 'owners' in names def test_inorg(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.get_matching_entities', params=dict(prefix='o', namespace=ORGANIZATION, includeTeams=True)) names = set([r['name'] for r in json['results']]) assert 'outsideorg' in names assert 'owners' in names class TestCreateOrganization(ApiTestCase): def test_existinguser(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse('api.create_organization', data=dict(name=ADMIN_ACCESS_USER), expected_code=400) self.assertEquals('A user or organization with this name already exists', json['message']) def test_existingorg(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse('api.create_organization', data=dict(name=ORGANIZATION), expected_code=400) self.assertEquals('A user or organization with this name already exists', json['message']) def test_createorg(self): self.login(ADMIN_ACCESS_USER) data = self.postResponse('api.create_organization', data=dict(name='neworg', email='test@example.com'), expected_code=201) self.assertEquals('Created', data) # Ensure the org was created. organization = model.get_organization('neworg') assert organization is not None # Verify the admin user is the org's admin. json = self.getJsonResponse('api.get_organization', params=dict(orgname='neworg')) self.assertEquals('neworg', json['name']) self.assertEquals(True, json['is_admin']) class TestGetOrganization(ApiTestCase): def test_unknownorg(self): self.login(ADMIN_ACCESS_USER) self.getResponse('api.get_organization', params=dict(orgname='notvalid'), expected_code=403) def test_cannotaccess(self): self.login(NO_ACCESS_USER) self.getResponse('api.get_organization', params=dict(orgname=ORGANIZATION), expected_code=403) def test_getorganization(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse('api.get_organization', params=dict(orgname=ORGANIZATION)) self.assertEquals(ORGANIZATION, json['name']) self.assertEquals(False, json['is_admin']) def test_getorganization_asadmin(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.get_organization', params=dict(orgname=ORGANIZATION)) self.assertEquals(ORGANIZATION, json['name']) self.assertEquals(True, json['is_admin']) class TestChangeOrganizationDetails(ApiTestCase): def test_changeinvoiceemail(self): self.login(ADMIN_ACCESS_USER) json = self.putJsonResponse('api.change_organization_details', params=dict(orgname=ORGANIZATION), data=dict(invoice_email=True)) self.assertEquals(True, json['invoice_email']) json = self.putJsonResponse('api.change_organization_details', params=dict(orgname=ORGANIZATION), data=dict(invoice_email=False)) self.assertEquals(False, json['invoice_email']) def test_changemail(self): self.login(ADMIN_ACCESS_USER) json = self.putJsonResponse('api.change_organization_details', params=dict(orgname=ORGANIZATION), data=dict(email='newemail@example.com')) self.assertEquals('newemail@example.com', json['email']) class TestGetOrganizationPrototypes(ApiTestCase): def test_getprototypes(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.get_organization_prototype_permissions', params=dict(orgname=ORGANIZATION)) assert len(json['prototypes']) > 0 class TestCreateOrganizationPrototypes(ApiTestCase): def test_invaliduser(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse('api.create_organization_prototype_permission', params=dict(orgname=ORGANIZATION), data=dict(activating_user={'name': 'unknownuser'}, role='read', delegate={'kind': 'team', 'name': 'owners'}), expected_code=400) self.assertEquals('Unknown activating user', json['message']) def test_missingdelegate(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse('api.create_organization_prototype_permission', params=dict(orgname=ORGANIZATION), data=dict(role='read'), expected_code=400) self.assertEquals('Missing delegate user or team', json['message']) def test_createprototype(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse('api.create_organization_prototype_permission', params=dict(orgname=ORGANIZATION), data=dict(role='read', delegate={'kind': 'team', 'name': 'readers'})) self.assertEquals('read', json['role']) pid = json['id'] # Verify the prototype exists. json = self.getJsonResponse('api.get_organization_prototype_permissions', params=dict(orgname=ORGANIZATION)) ids = set([p['id'] for p in json['prototypes']]) assert pid in ids class TestDeleteOrganizationPrototypes(ApiTestCase): def test_deleteprototype(self): self.login(ADMIN_ACCESS_USER) # Get the existing prototypes json = self.getJsonResponse('api.get_organization_prototype_permissions', params=dict(orgname=ORGANIZATION)) ids = [p['id'] for p in json['prototypes']] pid = ids[0] # Delete a prototype. self.deleteResponse('api.delete_organization_prototype_permission', params=dict(orgname=ORGANIZATION, prototypeid=pid)) # Verify the prototype no longer exists. json = self.getJsonResponse('api.get_organization_prototype_permissions', params=dict(orgname=ORGANIZATION)) newids = [p['id'] for p in json['prototypes']] assert not pid in newids class TestUpdateOrganizationPrototypes(ApiTestCase): def test_updateprototype(self): self.login(ADMIN_ACCESS_USER) # Get the existing prototypes json = self.getJsonResponse('api.get_organization_prototype_permissions', params=dict(orgname=ORGANIZATION)) ids = [p['id'] for p in json['prototypes']] pid = ids[0] # Update a prototype. json = self.putJsonResponse('api.delete_organization_prototype_permission', params=dict(orgname=ORGANIZATION, prototypeid=pid), data=dict(role='admin')) self.assertEquals('admin', json['role']) class TestGetOrganiaztionMembers(ApiTestCase): def test_getmembers(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.get_organization_members', params=dict(orgname=ORGANIZATION)) assert ADMIN_ACCESS_USER in json['members'] assert READ_ACCESS_USER in json['members'] assert not NO_ACCESS_USER in json['members'] def test_getspecificmember(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.get_organization_member', params=dict(orgname=ORGANIZATION, membername=ADMIN_ACCESS_USER)) self.assertEquals(ADMIN_ACCESS_USER, json['member']['name']) self.assertEquals('user', json['member']['kind']) assert 'owners' in json['member']['teams'] class TestGetOrganizationPrivateAllowed(ApiTestCase): def test_existingorg(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.get_organization_private_allowed', params=dict(orgname=ORGANIZATION)) self.assertEquals(True, json['privateAllowed']) assert not 'reposAllowed' in json def test_neworg(self): self.login(ADMIN_ACCESS_USER) data = self.postResponse('api.create_organization', data=dict(name='neworg', email='test@example.com'), expected_code=201) json = self.getJsonResponse('api.get_organization_private_allowed', params=dict(orgname='neworg')) self.assertEquals(False, json['privateAllowed']) class TestUpdateOrganizationTeam(ApiTestCase): def test_updateexisting(self): self.login(ADMIN_ACCESS_USER) data = self.postJsonResponse('api.update_organization_team', params=dict(orgname=ORGANIZATION, teamname='readers'), data=dict(description = 'My cool team', role = 'creator')) self.assertEquals('My cool team', data['description']) self.assertEquals('creator', data['role']) def test_attemptchangeroleonowners(self): self.login(ADMIN_ACCESS_USER) self.postResponse('api.update_organization_team', params=dict(orgname=ORGANIZATION, teamname='owners'), data=dict(role = 'creator'), expected_code=400) def test_createnewteam(self): self.login(ADMIN_ACCESS_USER) data = self.putJsonResponse('api.update_organization_team', params=dict(orgname=ORGANIZATION, teamname='newteam'), data=dict(description = 'My cool team', role = 'member'), expected_code=201) self.assertEquals('My cool team', data['description']) self.assertEquals('member', data['role']) # Verify the team was created. json = self.getJsonResponse('api.get_organization', params=dict(orgname=ORGANIZATION)) assert 'newteam' in json['teams'] class TestDeleteOrganizationTeam(ApiTestCase): def test_deleteteam(self): self.login(ADMIN_ACCESS_USER) self.deleteResponse('api.delete_organization_team', params=dict(orgname=ORGANIZATION, teamname='readers')) # Make sure the team was deleted json = self.getJsonResponse('api.get_organization', params=dict(orgname=ORGANIZATION)) assert not 'readers' in json['teams'] def test_attemptdeleteowners(self): self.login(ADMIN_ACCESS_USER) self.deleteResponse('api.delete_organization_team', params=dict(orgname=ORGANIZATION, teamname='owners'), expected_code=400) class TestGetOrganizationTeamMembers(ApiTestCase): def test_invalidteam(self): self.login(ADMIN_ACCESS_USER) self.getResponse('api.get_organization_team_members', params=dict(orgname=ORGANIZATION, teamname='notvalid'), expected_code=404) def test_getmembers(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.get_organization_team_members', params=dict(orgname=ORGANIZATION, teamname='readers')) assert READ_ACCESS_USER in json['members'] class TestUpdateOrganizationTeamMember(ApiTestCase): def test_addmember(self): self.login(ADMIN_ACCESS_USER) self.postJsonResponse('api.update_organization_team_member', params=dict(orgname=ORGANIZATION, teamname='readers', membername=NO_ACCESS_USER)) # Verify the user was added to the team. json = self.getJsonResponse('api.get_organization_team_members', params=dict(orgname=ORGANIZATION, teamname='readers')) assert NO_ACCESS_USER in json['members'] class TestDeleteOrganizationTeamMember(ApiTestCase): def test_deletemember(self): self.login(ADMIN_ACCESS_USER) self.deleteResponse('api.delete_organization_team_member', params=dict(orgname=ORGANIZATION, teamname='readers', membername=READ_ACCESS_USER)) # Verify the user was removed from the team. json = self.getJsonResponse('api.get_organization_team_members', params=dict(orgname=ORGANIZATION, teamname='readers')) assert not READ_ACCESS_USER in json['members'] class TestCreateRepo(ApiTestCase): def test_duplicaterepo(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse('api.create_repo', data=dict(repository='simple', visibility='public'), expected_code=400) self.assertEquals('Repository already exists', json['message']) def test_createrepo(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse('api.create_repo', data=dict(repository='newrepo', visibility='public', description='')) self.assertEquals(ADMIN_ACCESS_USER, json['namespace']) self.assertEquals('newrepo', json['name']) def test_createrepo_underorg(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse('api.create_repo', data=dict(namespace=ORGANIZATION, repository='newrepo', visibility='private', description='')) self.assertEquals(ORGANIZATION, json['namespace']) self.assertEquals('newrepo', json['name']) class TestFindRepos(ApiTestCase): def test_findrepos_asguest(self): json = self.getJsonResponse('api.find_repos', params=dict(query='p')) assert len(json['repositories']) == 1 self.assertEquals(json['repositories'][0]['namespace'], 'public') self.assertEquals(json['repositories'][0]['name'], 'publicrepo') def test_findrepos_asuser(self): self.login(NO_ACCESS_USER) json = self.getJsonResponse('api.find_repos', params=dict(query='p')) assert len(json['repositories']) == 1 self.assertEquals(json['repositories'][0]['namespace'], 'public') self.assertEquals(json['repositories'][0]['name'], 'publicrepo') def test_findrepos_orgmember(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse('api.find_repos', params=dict(query='p')) assert len(json['repositories']) > 1 class TestListRepos(ApiTestCase): def test_listrepos_asguest(self): json = self.getJsonResponse('api.list_repos', params=dict(public=True)) assert len(json['repositories']) == 0 def test_listrepos_orgmember(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse('api.list_repos', params=dict(public=True)) assert len(json['repositories']) > 1 def test_listrepos_filter(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse('api.list_repos', params=dict(namespace=ORGANIZATION, public=False)) for repo in json['repositories']: self.assertEquals(ORGANIZATION, repo['namespace']) def test_listrepos_limit(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse('api.list_repos', params=dict(limit=2)) assert len(json['repositories']) == 2 class TestUpdateRepo(ApiTestCase): def test_updatedescription(self): self.login(ADMIN_ACCESS_USER) self.putJsonResponse('api.update_repo', params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(description='Some cool repo')) # Verify the repo description was updated. json = self.getJsonResponse('api.get_repo', params=dict(repository=ADMIN_ACCESS_USER + '/simple')) self.assertEquals('Some cool repo', json['description']) class TestChangeRepoVisibility(ApiTestCase): def test_changevisibility(self): self.login(ADMIN_ACCESS_USER) # Make public. self.postJsonResponse('api.change_repo_visibility', params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(visibility='public')) # Verify the visibility. json = self.getJsonResponse('api.get_repo', params=dict(repository=ADMIN_ACCESS_USER + '/simple')) self.assertEquals(True, json['is_public']) # Make private. self.postJsonResponse('api.change_repo_visibility', params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(visibility='private')) # Verify the visibility. json = self.getJsonResponse('api.get_repo', params=dict(repository=ADMIN_ACCESS_USER + '/simple')) self.assertEquals(False, json['is_public']) class TestDeleteRepository(ApiTestCase): def test_deleterepo(self): self.login(ADMIN_ACCESS_USER) self.deleteResponse('api.delete_repository', params=dict(repository=ADMIN_ACCESS_USER + '/simple')) # Verify the repo was deleted. self.getResponse('api.get_repo', params=dict(repository=ADMIN_ACCESS_USER + '/simple'), expected_code=404) class TestGetRepository(ApiTestCase): def test_getrepo_public_asguest(self): json = self.getJsonResponse('api.get_repo', params=dict(repository=PUBLIC_USER + '/publicrepo')) self.assertEquals(PUBLIC_USER, json['namespace']) self.assertEquals('publicrepo', json['name']) self.assertEquals(True, json['is_public']) self.assertEquals(False, json['is_organization']) self.assertEquals(False, json['is_building']) self.assertEquals(False, json['can_write']) self.assertEquals(False, json['can_admin']) assert 'latest' in json['tags'] def test_getrepo_public_asowner(self): self.login(PUBLIC_USER) json = self.getJsonResponse('api.get_repo', params=dict(repository=PUBLIC_USER + '/publicrepo')) self.assertEquals(False, json['is_organization']) self.assertEquals(True, json['can_write']) self.assertEquals(True, json['can_admin']) def test_getrepo_building(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.get_repo', params=dict(repository=ADMIN_ACCESS_USER + '/building')) self.assertEquals(True, json['can_write']) self.assertEquals(True, json['can_admin']) self.assertEquals(True, json['is_building']) self.assertEquals(False, json['is_organization']) def test_getrepo_org_asnonmember(self): self.getResponse('api.get_repo', params=dict(repository=ORGANIZATION + '/' + ORG_REPO), expected_code=403) def test_getrepo_org_asreader(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse('api.get_repo', params=dict(repository=ORGANIZATION + '/' + ORG_REPO)) self.assertEquals(ORGANIZATION, json['namespace']) self.assertEquals(ORG_REPO, json['name']) self.assertEquals(False, json['can_write']) self.assertEquals(False, json['can_admin']) self.assertEquals(True, json['is_organization']) def test_getrepo_org_asadmin(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.get_repo', params=dict(repository=ORGANIZATION + '/' + ORG_REPO)) self.assertEquals(True, json['can_write']) self.assertEquals(True, json['can_admin']) self.assertEquals(True, json['is_organization']) class TestGetRepoBuilds(ApiTestCase): def test_getrepo_nobuilds(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.get_repo_builds', params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['builds']) == 0 def test_getrepobuilds(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.get_repo_builds', params=dict(repository=ADMIN_ACCESS_USER + '/building')) assert len(json['builds']) > 0 build = json['builds'][0] assert 'id' in build assert 'status' in build assert 'message' in build class TestRequearRepoBuild(ApiTestCase): def test_requestrepobuild(self): self.login(ADMIN_ACCESS_USER) # Ensure where not yet building. json = self.getJsonResponse('api.get_repo_builds', params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['builds']) == 0 # Request a (fake) build. self.postResponse('api.request_repo_build', params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id = 'foobarbaz'), expected_code=201) # Check for the build. json = self.getJsonResponse('api.get_repo_builds', params=dict(repository=ADMIN_ACCESS_USER + '/building')) assert len(json['builds']) > 0 class TestWebhooks(ApiTestCase): def test_webhooks(self): self.login(ADMIN_ACCESS_USER) # Add a webhook. json = self.postJsonResponse('api.create_webhook', params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(url='http://example.com')) self.assertEquals('http://example.com', json['parameters']['url']) wid = json['public_id'] # Get the webhook. json = self.getJsonResponse('api.get_webhook', params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid)) self.assertEquals(wid, json['public_id']) self.assertEquals('http://example.com', json['parameters']['url']) # Verify the webhook is listed. json = self.getJsonResponse('api.list_webhooks', params=dict(repository=ADMIN_ACCESS_USER + '/simple')) ids = [w['public_id'] for w in json['webhooks']] assert wid in ids # Delete the webhook. self.deleteResponse('api.delete_webhook', params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid), expected_code=204) # Verify the webhook is gone. self.getResponse('api.get_webhook', params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid), expected_code=404) class TestListAndGetImage(ApiTestCase): def test_listandgetimages(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.list_repository_images', params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['images']) > 0 for image in json['images']: assert 'id' in image assert 'tags' in image assert 'created' in image assert 'comment' in image assert 'command' in image assert 'ancestors' in image assert 'dbid' in image assert 'size' in image ijson = self.getJsonResponse('api.get_image', params=dict(repository=ADMIN_ACCESS_USER + '/simple', image_id=image['id'])) self.assertEquals(image['id'], ijson['id']) class TestGetImageChanges(ApiTestCase): def test_getimagechanges(self): self.login(ADMIN_ACCESS_USER) # Find an image to check. json = self.getJsonResponse('api.list_repository_images', params=dict(repository=ADMIN_ACCESS_USER + '/simple')) image_id = json['images'][0]['id'] # Lookup the image's changes. # TODO: Fix me once we can get fake changes into the test data #self.getJsonResponse('api.get_image_changes', # params=dict(repository=ADMIN_ACCESS_USER + '/simple', # image_id=image_id)) class TestListAndDeleteTag(ApiTestCase): def test_listtagimagesanddeletetag(self): self.login(ADMIN_ACCESS_USER) # List the images for prod. json = self.getJsonResponse('api.list_tag_images', params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) prod_images = json['images'] assert len(prod_images) > 0 # List the images for staging. json = self.getJsonResponse('api.list_tag_images', params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging')) staging_images = json['images'] assert len(prod_images) == len(staging_images) + 1 # Delete prod. self.deleteResponse('api.delete_full_tag', params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'), expected_code=204) # Make sure the tag is gone. self.getResponse('api.list_tag_images', params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'), expected_code=404) # Make the sure the staging images are still there. json = self.getJsonResponse('api.list_tag_images', params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging')) self.assertEquals(staging_images, json['images']) def test_deletesubtag(self): self.login(ADMIN_ACCESS_USER) # List the images for prod. json = self.getJsonResponse('api.list_tag_images', params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) prod_images = json['images'] assert len(prod_images) > 0 # Delete staging. self.deleteResponse('api.delete_full_tag', params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'), expected_code=204) # Make sure the prod images are still around. json = self.getJsonResponse('api.list_tag_images', params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) self.assertEquals(prod_images, json['images']) class TestRepoPermissions(ApiTestCase): def listUserPermissions(self): return self.getJsonResponse('api.list_repo_user_permissions', params=dict(repository=ADMIN_ACCESS_USER + '/simple'))['permissions'] def listTeamPermissions(self): return self.getJsonResponse('api.list_repo_team_permissions', params=dict(repository=ORGANIZATION + '/' + ORG_REPO))['permissions'] def test_userpermissions(self): self.login(ADMIN_ACCESS_USER) # The repo should start with just the admin as a user perm. permissions = self.listUserPermissions() self.assertEquals(1, len(permissions)) assert ADMIN_ACCESS_USER in permissions self.assertEquals('admin', permissions[ADMIN_ACCESS_USER]['role']) # Add another user. self.putJsonResponse('api.change_user_permissions', params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER), data=dict(role='read')) # Verify the user is present. permissions = self.listUserPermissions() self.assertEquals(2, len(permissions)) assert NO_ACCESS_USER in permissions self.assertEquals('read', permissions[NO_ACCESS_USER]['role']) json = self.getJsonResponse('api.get_user_permissions', params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER)) self.assertEquals('read', json['role']) # Change the user's permissions. self.putJsonResponse('api.change_user_permissions', params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER), data=dict(role='admin')) # Verify. permissions = self.listUserPermissions() self.assertEquals(2, len(permissions)) assert NO_ACCESS_USER in permissions self.assertEquals('admin', permissions[NO_ACCESS_USER]['role']) # Delete the user's permission. self.deleteResponse('api.delete_user_permissions', params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER)) # Verify. permissions = self.listUserPermissions() self.assertEquals(1, len(permissions)) assert not NO_ACCESS_USER in permissions def test_teampermissions(self): self.login(ADMIN_ACCESS_USER) # The repo should start with just the readers as a team perm. permissions = self.listTeamPermissions() self.assertEquals(1, len(permissions)) assert 'readers' in permissions self.assertEquals('read', permissions['readers']['role']) # Add another team. self.putJsonResponse('api.change_team_permissions', params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'), data=dict(role='write')) # Verify the team is present. permissions = self.listTeamPermissions() self.assertEquals(2, len(permissions)) assert 'owners' in permissions self.assertEquals('write', permissions['owners']['role']) json = self.getJsonResponse('api.get_team_permissions', params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners')) self.assertEquals('write', json['role']) # Change the team's permissions. self.putJsonResponse('api.change_team_permissions', params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'), data=dict(role='admin')) # Verify. permissions = self.listTeamPermissions() self.assertEquals(2, len(permissions)) assert 'owners' in permissions self.assertEquals('admin', permissions['owners']['role']) # Delete the team's permission. self.deleteResponse('api.delete_team_permissions', params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners')) # Verify. permissions = self.listTeamPermissions() self.assertEquals(1, len(permissions)) assert not 'owners' in permissions class TestApiTokens(ApiTestCase): def listTokens(self): return self.getJsonResponse('api.list_repo_tokens', params=dict(repository=ADMIN_ACCESS_USER + '/simple'))['tokens'] def test_tokens(self): self.login(ADMIN_ACCESS_USER) # Create a new token. json = self.postJsonResponse('api.create_token', params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(role='read', friendlyName='mytoken'), expected_code=201) self.assertEquals('mytoken', json['friendlyName']) self.assertEquals('read', json['role']) token_code = json['code'] # Verify. tokens = self.listTokens() assert token_code in tokens self.assertEquals('mytoken', tokens[token_code]['friendlyName']) json = self.getJsonResponse('api.get_tokens', params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) self.assertEquals(tokens[token_code], json) # Change the token's permission. self.putJsonResponse('api.change_token', params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code), data=dict(role='write')) # Verify. json = self.getJsonResponse('api.get_tokens', params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) self.assertEquals('write', json['role']) # Delete the token. self.deleteResponse('api.delete_token', params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) # Verify. self.getResponse('api.get_tokens', params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code), expected_code=404) class TestUserCard(ApiTestCase): def test_getusercard(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.get_user_card') self.assertEquals('4242', json['card']['last4']) self.assertEquals('Visa', json['card']['type']) def test_setusercard_error(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse('api.set_user_card', data=dict(token='sometoken'), expected_code=402) assert 'carderror' in json class TestOrgCard(ApiTestCase): def test_getorgcard(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.get_org_card', params=dict(orgname=ORGANIZATION)) self.assertEquals('4242', json['card']['last4']) self.assertEquals('Visa', json['card']['type']) class TestUserSubscription(ApiTestCase): def getSubscription(self): return self.getJsonResponse('api.get_user_subscription') def test_updateplan(self): self.login(ADMIN_ACCESS_USER) # Change the plan. self.putJsonResponse('api.update_user_subscription', data=dict(plan='free')) # Verify sub = self.getSubscription() self.assertEquals('free', sub['plan']) # Change the plan. self.putJsonResponse('api.update_user_subscription', data=dict(plan='bus-large')) # Verify sub = self.getSubscription() self.assertEquals('bus-large', sub['plan']) class TestOrgSubscription(ApiTestCase): def getSubscription(self): return self.getJsonResponse('api.get_org_subscription', params=dict(orgname=ORGANIZATION)) def test_updateplan(self): self.login(ADMIN_ACCESS_USER) # Change the plan. self.putJsonResponse('api.update_org_subscription', params=dict(orgname=ORGANIZATION), data=dict(plan='free')) # Verify sub = self.getSubscription() self.assertEquals('free', sub['plan']) # Change the plan. self.putJsonResponse('api.update_org_subscription', params=dict(orgname=ORGANIZATION), data=dict(plan='bus-large')) # Verify sub = self.getSubscription() self.assertEquals('bus-large', sub['plan']) class TestUserRobots(ApiTestCase): def getRobotNames(self): return [r['name'] for r in self.getJsonResponse('api.get_user_robots')['robots']] def test_robots(self): self.login(NO_ACCESS_USER) # Create a robot. json = self.putJsonResponse('api.create_user_robot', params=dict(robot_shortname='bender'), expected_code=201) self.assertEquals(NO_ACCESS_USER + '+bender', json['name']) # Verify. robots = self.getRobotNames() assert NO_ACCESS_USER + '+bender' in robots # Delete the robot. self.deleteResponse('api.delete_user_robot', params=dict(robot_shortname='bender')) # Verify. robots = self.getRobotNames() assert not NO_ACCESS_USER + '+bender' in robots class TestOrgRobots(ApiTestCase): def getRobotNames(self): return [r['name'] for r in self.getJsonResponse('api.get_org_robots', params=dict(orgname=ORGANIZATION))['robots']] def test_robots(self): self.login(ADMIN_ACCESS_USER) # Create a robot. json = self.putJsonResponse('api.create_org_robot', params=dict(orgname=ORGANIZATION, robot_shortname='bender'), expected_code=201) self.assertEquals(ORGANIZATION + '+bender', json['name']) # Verify. robots = self.getRobotNames() assert ORGANIZATION + '+bender' in robots # Delete the robot. self.deleteResponse('api.delete_org_robot', params=dict(orgname=ORGANIZATION, robot_shortname='bender')) # Verify. robots = self.getRobotNames() assert not ORGANIZATION + '+bender' in robots class TestLogs(ApiTestCase): def test_user_logs(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.list_user_logs') assert 'logs' in json assert 'start_time' in json assert 'end_time' in json def test_org_logs(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.list_org_logs', params=dict(orgname=ORGANIZATION)) assert 'logs' in json assert 'start_time' in json assert 'end_time' in json def test_performer(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse('api.list_org_logs', params=dict(orgname=ORGANIZATION)) all_logs = json['logs'] json = self.getJsonResponse('api.list_org_logs', params=dict(performer=READ_ACCESS_USER, orgname=ORGANIZATION)) assert len(json['logs']) < len(all_logs) for log in json['logs']: self.assertEquals(READ_ACCESS_USER, log['performer']['name']) if __name__ == '__main__': unittest.main()