This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/test_api_usage.py
jakedt 3f42d15335 Merge remote-tracking branch 'origin/master' into tagyourit
Conflicts:
	static/css/quay.css
	static/js/graphing.js
	static/partials/view-repo.html
	test/data/test.db
2014-04-15 15:58:30 -04:00

1941 lines
71 KiB
Python

import unittest
import json as py_json
from urllib import urlencode
from urlparse import urlparse, urlunparse, parse_qs
from endpoints.api import api_bp, api
from endpoints.webhooks import webhooks
from endpoints.trigger import BuildTrigger as BuildTriggerBase
from app import app
from initdb import setup_database_for_testing, finished_database_for_testing
from data import model, database
from endpoints.api.team import TeamMember, TeamMemberList, OrganizationTeam
from endpoints.api.tag import RepositoryTagImages, RepositoryTag
from endpoints.api.search import FindRepositories, EntitySearch
from endpoints.api.image import RepositoryImage, RepositoryImageList
from endpoints.api.build import RepositoryBuildStatus, RepositoryBuildLogs, RepositoryBuildList
from endpoints.api.robot import UserRobotList, OrgRobot, OrgRobotList, UserRobot
from endpoints.api.trigger import (BuildTriggerActivate, BuildTriggerSources, BuildTriggerSubdirs,
TriggerBuildList, ActivateBuildTrigger, BuildTrigger,
BuildTriggerList, BuildTriggerAnalyze)
from endpoints.api.webhook import Webhook, WebhookList
from endpoints.api.user import (PrivateRepositories, ConvertToOrganization, Signout, Signin, User,
UserAuthorizationList, UserAuthorization)
from endpoints.api.repotoken import RepositoryToken, RepositoryTokenList
from endpoints.api.prototype import PermissionPrototype, PermissionPrototypeList
from endpoints.api.logs import UserLogs, OrgLogs
from endpoints.api.billing import (UserCard, UserPlan, ListPlans, OrganizationCard,
OrganizationPlan)
from endpoints.api.discovery import DiscoveryResource
from endpoints.api.organization import (OrganizationList, OrganizationMember,
OrgPrivateRepositories, OrgnaizationMemberList,
Organization, ApplicationInformation,
OrganizationApplications, OrganizationApplicationResource,
OrganizationApplicationResetClientSecret)
from endpoints.api.repository import RepositoryList, RepositoryVisibility, Repository
from endpoints.api.permission import (RepositoryUserPermission, RepositoryTeamPermission,
RepositoryTeamPermissionList, RepositoryUserPermissionList)
try:
app.register_blueprint(api_bp, url_prefix='/api')
except ValueError:
# This blueprint was already registered
pass
app.register_blueprint(webhooks, url_prefix='/webhooks')
NO_ACCESS_USER = 'freshuser'
READ_ACCESS_USER = 'reader'
ADMIN_ACCESS_USER = 'devtable'
PUBLIC_USER = 'public'
ORG_REPO = 'orgrepo'
ORGANIZATION = 'buynlarge'
NEW_USER_DETAILS = {
'username': 'bobby',
'password': 'password',
'email': 'bobby@tables.com',
}
FAKE_APPLICATION_CLIENT_ID = 'deadbeef'
CSRF_TOKEN_KEY = '_csrf_token'
CSRF_TOKEN = '123csrfforme'
class ApiTestCase(unittest.TestCase):
@staticmethod
def _add_csrf(without_csrf):
parts = urlparse(without_csrf)
query = parse_qs(parts[4])
query[CSRF_TOKEN_KEY] = CSRF_TOKEN
return urlunparse(list(parts[0:4]) + [urlencode(query)] + list(parts[5:]))
def url_for(self, resource_name, params={}):
url = api.url_for(resource_name, **params)
url = ApiTestCase._add_csrf(url)
return url
def setUp(self):
setup_database_for_testing(self)
self.app = app.test_client()
self.ctx = app.test_request_context()
self.ctx.__enter__()
self.setCsrfToken(CSRF_TOKEN)
def tearDown(self):
finished_database_for_testing(self)
self.ctx.__exit__(True, None, None)
def setCsrfToken(self, token):
with self.app.session_transaction() as sess:
sess[CSRF_TOKEN_KEY] = token
def getJsonResponse(self, resource_name, params={}, expected_code=200):
rv = self.app.get(api.url_for(resource_name, **params))
self.assertEquals(expected_code, rv.status_code)
data = rv.data
parsed = py_json.loads(data)
return parsed
def postResponse(self, resource_name, params={}, data={}, expected_code=200):
rv = self.app.post(self.url_for(resource_name, params),
data=py_json.dumps(data),
headers={"Content-Type": "application/json"})
self.assertEquals(rv.status_code, expected_code)
return rv.data
def getResponse(self, resource_name, params={}, expected_code=200):
rv = self.app.get(api.url_for(resource_name, **params))
self.assertEquals(rv.status_code, expected_code)
return rv.data
def putResponse(self, resource_name, params={}, data={}, expected_code=200):
rv = self.app.put(self.url_for(resource_name, params),
data=py_json.dumps(data),
headers={"Content-Type": "application/json"})
self.assertEquals(rv.status_code, expected_code)
return rv.data
def deleteResponse(self, resource_name, params={}, expected_code=204):
rv = self.app.delete(self.url_for(resource_name, params))
self.assertEquals(rv.status_code, expected_code)
return rv.data
def postJsonResponse(self, resource_name, params={}, data={},
expected_code=200):
rv = self.app.post(self.url_for(resource_name, params),
data=py_json.dumps(data),
headers={"Content-Type": "application/json"})
if rv.status_code != expected_code:
print 'Mismatch data for resource POST %s: %s' % (resource_name, rv.data)
self.assertEquals(rv.status_code, expected_code)
data = rv.data
parsed = py_json.loads(data)
return parsed
def putJsonResponse(self, resource_name, params={}, data={},
expected_code=200):
rv = self.app.put(self.url_for(resource_name, params),
data=py_json.dumps(data),
headers={"Content-Type": "application/json"})
if rv.status_code != expected_code:
print 'Mismatch data for resource PUT %s: %s' % (resource_name, rv.data)
self.assertEquals(rv.status_code, expected_code)
data = rv.data
parsed = py_json.loads(data)
return parsed
def login(self, username, password='password'):
return self.postJsonResponse(Signin, data=dict(username=username, password=password))
class TestCSRFFailure(ApiTestCase):
def test_csrf_failure(self):
self.login(READ_ACCESS_USER)
# Make sure a simple post call succeeds.
self.putJsonResponse(User,
data=dict(password='newpasswordiscool'))
# Change the session's CSRF token.
self.setCsrfToken('someinvalidtoken')
# Verify that the call now fails.
self.putJsonResponse(User,
data=dict(password='newpasswordiscool'),
expected_code=403)
class TestDiscovery(ApiTestCase):
def test_discovery(self):
json = self.getJsonResponse(DiscoveryResource)
assert 'apis' in json
class TestPlans(ApiTestCase):
def test_plans(self):
json = self.getJsonResponse(ListPlans)
found = set([])
for method_info in json['plans']:
found.add(method_info['stripeId'])
assert 'free' in found
class TestLoggedInUser(ApiTestCase):
def test_guest(self):
self.getJsonResponse(User, expected_code=401)
def test_user(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(User)
assert json['anonymous'] == False
assert json['username'] == READ_ACCESS_USER
class TestGetUserPrivateAllowed(ApiTestCase):
def test_nonallowed(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(PrivateRepositories)
assert json['privateCount'] == 0
assert not json['privateAllowed']
def test_allowed(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(PrivateRepositories)
assert json['privateCount'] == 6
assert json['privateAllowed']
class TestConvertToOrganization(ApiTestCase):
def test_sameadminuser(self):
self.login(READ_ACCESS_USER)
json = self.postJsonResponse(ConvertToOrganization,
data={'adminUser': READ_ACCESS_USER,
'adminPassword': 'password',
'plan': 'free'},
expected_code=400)
self.assertEqual('The admin user is not valid', json['message'])
def test_invalidadminuser(self):
self.login(READ_ACCESS_USER)
json = self.postJsonResponse(ConvertToOrganization,
data={'adminUser': 'unknownuser',
'adminPassword': 'password',
'plan': 'free'},
expected_code=400)
self.assertEqual('The admin user credentials are not valid',
json['message'])
def test_invalidadminpassword(self):
self.login(READ_ACCESS_USER)
json = self.postJsonResponse(ConvertToOrganization,
data={'adminUser': ADMIN_ACCESS_USER,
'adminPassword': 'invalidpass',
'plan': 'free'},
expected_code=400)
self.assertEqual('The admin user credentials are not valid',
json['message'])
def test_convert(self):
self.login(READ_ACCESS_USER)
json = self.postJsonResponse(ConvertToOrganization,
data={'adminUser': ADMIN_ACCESS_USER,
'adminPassword': 'password',
'plan': 'free'})
self.assertEqual(True, json['success'])
# Verify the organization exists.
organization = model.get_organization(READ_ACCESS_USER)
assert organization is not None
# Verify the admin user is the org's admin.
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(Organization,
params=dict(orgname=READ_ACCESS_USER))
self.assertEquals(READ_ACCESS_USER, json['name'])
self.assertEquals(True, json['is_admin'])
class TestChangeUserDetails(ApiTestCase):
def test_changepassword(self):
self.login(READ_ACCESS_USER)
self.putJsonResponse(User,
data=dict(password='newpasswordiscool'))
self.login(READ_ACCESS_USER, password='newpasswordiscool')
def test_changeinvoiceemail(self):
self.login(READ_ACCESS_USER)
json = self.putJsonResponse(User,
data=dict(invoice_email=True))
self.assertEquals(True, json['invoice_email'])
json = self.putJsonResponse(User,
data=dict(invoice_email=False))
self.assertEquals(False, json['invoice_email'])
class TestCreateNewUser(ApiTestCase):
def test_existingusername(self):
json = self.postJsonResponse(User,
data=dict(username=READ_ACCESS_USER,
password='password',
email='test@example.com'),
expected_code=400)
self.assertEquals('The username already exists', json['message'])
def test_trycreatetooshort(self):
json = self.postJsonResponse(User,
data=dict(username='a',
password='password',
email='test@example.com'),
expected_code=400)
self.assertEquals('Invalid username a: Username must be between 4 and 30 characters in length', json['error_description'])
def test_trycreateregexmismatch(self):
json = self.postJsonResponse(User,
data=dict(username='auserName',
password='password',
email='test@example.com'),
expected_code=400)
self.assertEquals('Invalid username auserName: Username must match expression [a-z0-9_]+', json['error_description'])
def test_createuser(self):
data = self.postResponse(User,
data=NEW_USER_DETAILS,
expected_code=201)
self.assertEquals('"Created"', data)
class TestSignout(ApiTestCase):
def test_signout(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(User)
assert json['username'] == READ_ACCESS_USER
self.postResponse(Signout)
# Make sure we're now signed out.
self.getJsonResponse(User, expected_code=401)
class TestGetMatchingEntities(ApiTestCase):
def test_notinorg(self):
self.login(NO_ACCESS_USER)
json = self.getJsonResponse(EntitySearch,
params=dict(prefix='o', namespace=ORGANIZATION,
includeTeams='true'))
names = set([r['name'] for r in json['results']])
assert 'outsideorg' in names
assert not 'owners' in names
def test_inorg(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(EntitySearch,
params=dict(prefix='o', namespace=ORGANIZATION,
includeTeams='true'))
names = set([r['name'] for r in json['results']])
assert 'outsideorg' in names
assert 'owners' in names
class TestCreateOrganization(ApiTestCase):
def test_existinguser(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(OrganizationList,
data=dict(name=ADMIN_ACCESS_USER, email='testorg@example.com'),
expected_code=400)
self.assertEquals('A user or organization with this name already exists',
json['message'])
def test_existingorg(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(OrganizationList,
data=dict(name=ORGANIZATION, email='testorg@example.com'),
expected_code=400)
self.assertEquals('A user or organization with this name already exists',
json['message'])
def test_createorg(self):
self.login(ADMIN_ACCESS_USER)
data = self.postResponse(OrganizationList,
data=dict(name='neworg',
email='testorg@example.com'),
expected_code=201)
self.assertEquals('"Created"', data)
# Ensure the org was created.
organization = model.get_organization('neworg')
assert organization is not None
# Verify the admin user is the org's admin.
json = self.getJsonResponse(Organization,
params=dict(orgname='neworg'))
self.assertEquals('neworg', json['name'])
self.assertEquals(True, json['is_admin'])
class TestGetOrganization(ApiTestCase):
def test_unknownorg(self):
self.login(ADMIN_ACCESS_USER)
self.getResponse(Organization, params=dict(orgname='notvalid'),
expected_code=403)
def test_cannotaccess(self):
self.login(NO_ACCESS_USER)
self.getResponse(Organization, params=dict(orgname=ORGANIZATION),
expected_code=403)
def test_getorganization(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(Organization,
params=dict(orgname=ORGANIZATION))
self.assertEquals(ORGANIZATION, json['name'])
self.assertEquals(False, json['is_admin'])
def test_getorganization_asadmin(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(Organization,
params=dict(orgname=ORGANIZATION))
self.assertEquals(ORGANIZATION, json['name'])
self.assertEquals(True, json['is_admin'])
class TestChangeOrganizationDetails(ApiTestCase):
def test_changeinvoiceemail(self):
self.login(ADMIN_ACCESS_USER)
json = self.putJsonResponse(Organization,
params=dict(orgname=ORGANIZATION),
data=dict(invoice_email=True))
self.assertEquals(True, json['invoice_email'])
json = self.putJsonResponse(Organization,
params=dict(orgname=ORGANIZATION),
data=dict(invoice_email=False))
self.assertEquals(False, json['invoice_email'])
def test_changemail(self):
self.login(ADMIN_ACCESS_USER)
json = self.putJsonResponse(Organization,
params=dict(orgname=ORGANIZATION),
data=dict(email='newemail@example.com'))
self.assertEquals('newemail@example.com', json['email'])
class TestGetOrganizationPrototypes(ApiTestCase):
def test_getprototypes(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION))
assert len(json['prototypes']) > 0
class TestCreateOrganizationPrototypes(ApiTestCase):
def test_invaliduser(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION),
data=dict(activating_user={'name': 'unknownuser'},
role='read',
delegate={'kind': 'team', 'name': 'owners'}),
expected_code=400)
self.assertEquals('Unknown activating user', json['message'])
def test_missingdelegate(self):
self.login(ADMIN_ACCESS_USER)
self.postJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION),
data=dict(role='read'),
expected_code=400)
def test_createprototype(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION),
data=dict(role='read',
delegate={'kind': 'team',
'name': 'readers'}))
self.assertEquals('read', json['role'])
pid = json['id']
# Verify the prototype exists.
json = self.getJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION))
ids = set([p['id'] for p in json['prototypes']])
assert pid in ids
class TestDeleteOrganizationPrototypes(ApiTestCase):
def test_deleteprototype(self):
self.login(ADMIN_ACCESS_USER)
# Get the existing prototypes
json = self.getJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION))
ids = [p['id'] for p in json['prototypes']]
pid = ids[0]
# Delete a prototype.
self.deleteResponse(PermissionPrototype,
params=dict(orgname=ORGANIZATION, prototypeid=pid))
# Verify the prototype no longer exists.
json = self.getJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION))
newids = [p['id'] for p in json['prototypes']]
assert not pid in newids
class TestUpdateOrganizationPrototypes(ApiTestCase):
def test_updateprototype(self):
self.login(ADMIN_ACCESS_USER)
# Get the existing prototypes
json = self.getJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION))
ids = [p['id'] for p in json['prototypes']]
pid = ids[0]
# Update a prototype.
json = self.putJsonResponse(PermissionPrototype,
params=dict(orgname=ORGANIZATION,
prototypeid=pid), data=dict(role='admin'))
self.assertEquals('admin', json['role'])
class TestGetOrganiaztionMembers(ApiTestCase):
def test_getmembers(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrgnaizationMemberList,
params=dict(orgname=ORGANIZATION))
assert ADMIN_ACCESS_USER in json['members']
assert READ_ACCESS_USER in json['members']
assert not NO_ACCESS_USER in json['members']
def test_getspecificmember(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrganizationMember,
params=dict(orgname=ORGANIZATION,
membername=ADMIN_ACCESS_USER))
self.assertEquals(ADMIN_ACCESS_USER, json['member']['name'])
self.assertEquals('user', json['member']['kind'])
assert 'owners' in json['member']['teams']
class TestGetOrganizationPrivateAllowed(ApiTestCase):
def test_existingorg(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrgPrivateRepositories,
params=dict(orgname=ORGANIZATION))
self.assertEquals(True, json['privateAllowed'])
assert not 'reposAllowed' in json
def test_neworg(self):
self.login(ADMIN_ACCESS_USER)
data = self.postResponse(OrganizationList,
data=dict(name='neworg',
email='test@example.com'),
expected_code=201)
json = self.getJsonResponse(OrgPrivateRepositories,
params=dict(orgname='neworg'))
self.assertEquals(False, json['privateAllowed'])
class TestUpdateOrganizationTeam(ApiTestCase):
def test_updateexisting(self):
self.login(ADMIN_ACCESS_USER)
data = self.putJsonResponse(OrganizationTeam,
params=dict(orgname=ORGANIZATION,
teamname='readers'),
data=dict(description='My cool team',
role='creator'))
self.assertEquals('My cool team', data['description'])
self.assertEquals('creator', data['role'])
def test_attemptchangeroleonowners(self):
self.login(ADMIN_ACCESS_USER)
self.putJsonResponse(OrganizationTeam,
params=dict(orgname=ORGANIZATION, teamname='owners'),
data=dict(role = 'creator'),
expected_code=400)
def test_createnewteam(self):
self.login(ADMIN_ACCESS_USER)
data = self.putJsonResponse(OrganizationTeam,
params=dict(orgname=ORGANIZATION,
teamname='newteam'),
data=dict(description='My cool team',
role='member'))
self.assertEquals('My cool team', data['description'])
self.assertEquals('member', data['role'])
# Verify the team was created.
json = self.getJsonResponse(Organization,
params=dict(orgname=ORGANIZATION))
assert 'newteam' in json['teams']
class TestDeleteOrganizationTeam(ApiTestCase):
def test_deleteteam(self):
self.login(ADMIN_ACCESS_USER)
self.deleteResponse(OrganizationTeam,
params=dict(orgname=ORGANIZATION, teamname='readers'))
# Make sure the team was deleted
json = self.getJsonResponse(Organization,
params=dict(orgname=ORGANIZATION))
assert not 'readers' in json['teams']
def test_attemptdeleteowners(self):
self.login(ADMIN_ACCESS_USER)
self.deleteResponse(OrganizationTeam,
params=dict(orgname=ORGANIZATION, teamname='owners'),
expected_code=400)
class TestGetOrganizationTeamMembers(ApiTestCase):
def test_invalidteam(self):
self.login(ADMIN_ACCESS_USER)
self.getResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION, teamname='notvalid'),
expected_code=404)
def test_getmembers(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers'))
assert READ_ACCESS_USER in json['members']
class TestUpdateOrganizationTeamMember(ApiTestCase):
def test_addmember(self):
self.login(ADMIN_ACCESS_USER)
self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='readers',
membername=NO_ACCESS_USER))
# Verify the user was added to the team.
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers'))
assert NO_ACCESS_USER in json['members']
class TestDeleteOrganizationTeamMember(ApiTestCase):
def test_deletemember(self):
self.login(ADMIN_ACCESS_USER)
self.deleteResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='readers',
membername=READ_ACCESS_USER))
# Verify the user was removed from the team.
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers'))
assert not READ_ACCESS_USER in json['members']
class TestCreateRepo(ApiTestCase):
def test_duplicaterepo(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(RepositoryList,
data=dict(repository='simple',
visibility='public',
description=''),
expected_code=400)
self.assertEquals('Repository already exists', json['message'])
def test_createrepo(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(RepositoryList,
data=dict(repository='newrepo',
visibility='public',
description=''),
expected_code=201)
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
self.assertEquals('newrepo', json['name'])
def test_createrepo_underorg(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(RepositoryList,
data=dict(namespace=ORGANIZATION,
repository='newrepo',
visibility='private',
description=''),
expected_code=201)
self.assertEquals(ORGANIZATION, json['namespace'])
self.assertEquals('newrepo', json['name'])
class TestFindRepos(ApiTestCase):
def test_findrepos_asguest(self):
json = self.getJsonResponse(FindRepositories, params=dict(query='p'))
self.assertEquals(len(json['repositories']), 1)
self.assertEquals(json['repositories'][0]['namespace'], 'public')
self.assertEquals(json['repositories'][0]['name'], 'publicrepo')
def test_findrepos_asuser(self):
self.login(NO_ACCESS_USER)
json = self.getJsonResponse(FindRepositories, params=dict(query='p'))
self.assertEquals(len(json['repositories']), 1)
self.assertEquals(json['repositories'][0]['namespace'], 'public')
self.assertEquals(json['repositories'][0]['name'], 'publicrepo')
def test_findrepos_orgmember(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(FindRepositories, params=dict(query='p'))
self.assertGreater(len(json['repositories']), 1)
class TestListRepos(ApiTestCase):
def test_listrepos_asguest(self):
json = self.getJsonResponse(RepositoryList, params=dict(public=True))
self.assertEquals(len(json['repositories']), 1)
def test_listrepos_orgmember(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(RepositoryList, params=dict(public=True))
self.assertGreater(len(json['repositories']), 1)
def test_listrepos_filter(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(RepositoryList,
params=dict(namespace=ORGANIZATION,
public=False))
for repo in json['repositories']:
self.assertEquals(ORGANIZATION, repo['namespace'])
def test_listrepos_limit(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(RepositoryList, params=dict(limit=2))
self.assertEquals(len(json['repositories']), 2)
class TestUpdateRepo(ApiTestCase):
SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple'
def test_updatedescription(self):
self.login(ADMIN_ACCESS_USER)
self.putJsonResponse(Repository,
params=dict(repository=self.SIMPLE_REPO),
data=dict(description='Some cool repo'))
# Verify the repo description was updated.
json = self.getJsonResponse(Repository,
params=dict(repository=self.SIMPLE_REPO))
self.assertEquals('Some cool repo', json['description'])
class TestChangeRepoVisibility(ApiTestCase):
SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple'
def test_changevisibility(self):
self.login(ADMIN_ACCESS_USER)
# Make public.
self.postJsonResponse(RepositoryVisibility,
params=dict(repository=self.SIMPLE_REPO),
data=dict(visibility='public'))
# Verify the visibility.
json = self.getJsonResponse(Repository,
params=dict(repository=self.SIMPLE_REPO))
self.assertEquals(True, json['is_public'])
# Make private.
self.postJsonResponse(RepositoryVisibility,
params=dict(repository=self.SIMPLE_REPO),
data=dict(visibility='private'))
# Verify the visibility.
json = self.getJsonResponse(Repository,
params=dict(repository=self.SIMPLE_REPO))
self.assertEquals(False, json['is_public'])
class TestDeleteRepository(ApiTestCase):
SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple'
def test_deleterepo(self):
self.login(ADMIN_ACCESS_USER)
self.deleteResponse(Repository,
params=dict(repository=self.SIMPLE_REPO))
# Verify the repo was deleted.
self.getResponse(Repository,
params=dict(repository=self.SIMPLE_REPO),
expected_code=404)
class TestGetRepository(ApiTestCase):
PUBLIC_REPO = PUBLIC_USER + '/publicrepo'
def test_getrepo_badnames(self):
self.login(ADMIN_ACCESS_USER)
bad_names = ['logs', 'build', 'tokens', 'foo.bar', 'foo-bar', 'foo_bar']
# For each bad name, create the repo.
for bad_name in bad_names:
json = self.postJsonResponse(RepositoryList, expected_code=201,
data=dict(repository=bad_name, visibility='public',
description=''))
# Make sure we can retrieve its information.
json = self.getJsonResponse(Repository,
params=dict(repository=ADMIN_ACCESS_USER + '/' + bad_name))
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
self.assertEquals(bad_name, json['name'])
self.assertEquals(True, json['is_public'])
def test_getrepo_public_asguest(self):
json = self.getJsonResponse(Repository,
params=dict(repository=self.PUBLIC_REPO))
self.assertEquals(PUBLIC_USER, json['namespace'])
self.assertEquals('publicrepo', json['name'])
self.assertEquals(True, json['is_public'])
self.assertEquals(False, json['is_organization'])
self.assertEquals(False, json['is_building'])
self.assertEquals(False, json['can_write'])
self.assertEquals(False, json['can_admin'])
assert 'latest' in json['tags']
def test_getrepo_public_asowner(self):
self.login(PUBLIC_USER)
json = self.getJsonResponse(Repository,
params=dict(repository=self.PUBLIC_REPO))
self.assertEquals(False, json['is_organization'])
self.assertEquals(True, json['can_write'])
self.assertEquals(True, json['can_admin'])
def test_getrepo_building(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(Repository,
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
self.assertEquals(True, json['can_write'])
self.assertEquals(True, json['can_admin'])
self.assertEquals(True, json['is_building'])
self.assertEquals(False, json['is_organization'])
def test_getrepo_org_asnonmember(self):
self.getResponse(Repository,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO),
expected_code=401)
def test_getrepo_org_asreader(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(Repository,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO))
self.assertEquals(ORGANIZATION, json['namespace'])
self.assertEquals(ORG_REPO, json['name'])
self.assertEquals(False, json['can_write'])
self.assertEquals(False, json['can_admin'])
self.assertEquals(True, json['is_organization'])
def test_getrepo_org_asadmin(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(Repository,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO))
self.assertEquals(True, json['can_write'])
self.assertEquals(True, json['can_admin'])
self.assertEquals(True, json['is_organization'])
class TestRepoBuilds(ApiTestCase):
def test_getrepo_nobuilds(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
assert len(json['builds']) == 0
def test_getrepobuilds(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
assert len(json['builds']) > 0
build = json['builds'][0]
assert 'id' in build
assert 'status' in build
# Check the status endpoint.
status_json = self.getJsonResponse(RepositoryBuildStatus,
params=dict(repository=ADMIN_ACCESS_USER + '/building', build_uuid=build['id']))
self.assertEquals(status_json, build)
# Check the logs.
logs_json = self.getJsonResponse(RepositoryBuildLogs,
params=dict(repository=ADMIN_ACCESS_USER + '/building', build_uuid=build['id']))
assert 'logs' in logs_json
class TestRequestRepoBuild(ApiTestCase):
def test_requestrepobuild(self):
self.login(ADMIN_ACCESS_USER)
# Ensure we are not yet building.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
assert len(json['builds']) == 0
# Request a (fake) build.
self.postResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz'),
expected_code=201)
# Check for the build.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
assert len(json['builds']) > 0
def test_requestrepobuild_with_robot(self):
self.login(ADMIN_ACCESS_USER)
# Ensure we are not yet building.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
assert len(json['builds']) == 0
# Request a (fake) build.
pull_robot = ADMIN_ACCESS_USER + '+dtrobot'
self.postResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz', pull_robot=pull_robot),
expected_code=201)
# Check for the build.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
assert len(json['builds']) > 0
def test_requestrepobuild_with_invalid_robot(self):
self.login(ADMIN_ACCESS_USER)
# Request a (fake) build.
pull_robot = ADMIN_ACCESS_USER + '+invalidrobot'
self.postResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz', pull_robot=pull_robot),
expected_code=404)
def test_requestrepobuild_with_unauthorized_robot(self):
self.login(ADMIN_ACCESS_USER)
# Request a (fake) build.
pull_robot = 'freshuser+anotherrobot'
self.postResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz', pull_robot=pull_robot),
expected_code=403)
class TestWebhooks(ApiTestCase):
def test_webhooks(self):
self.login(ADMIN_ACCESS_USER)
# Add a webhook.
json = self.postJsonResponse(WebhookList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(url='http://example.com'),
expected_code=201)
self.assertEquals('http://example.com', json['parameters']['url'])
wid = json['public_id']
# Get the webhook.
json = self.getJsonResponse(Webhook,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid))
self.assertEquals(wid, json['public_id'])
self.assertEquals('http://example.com', json['parameters']['url'])
# Verify the webhook is listed.
json = self.getJsonResponse(WebhookList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
ids = [w['public_id'] for w in json['webhooks']]
assert wid in ids
# Delete the webhook.
self.deleteResponse(Webhook,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid),
expected_code=204)
# Verify the webhook is gone.
self.getResponse(Webhook,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', public_id=wid),
expected_code=404)
class TestListAndGetImage(ApiTestCase):
def test_listandgetimages(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(RepositoryImageList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
assert len(json['images']) > 0
for image in json['images']:
assert 'id' in image
assert 'tags' in image
assert 'created' in image
assert 'comment' in image
assert 'command' in image
assert 'ancestors' in image
assert 'dbid' in image
assert 'size' in image
ijson = self.getJsonResponse(RepositoryImage,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
image_id=image['id']))
self.assertEquals(image['id'], ijson['id'])
class TestGetImageChanges(ApiTestCase):
def test_getimagechanges(self):
self.login(ADMIN_ACCESS_USER)
# Find an image to check.
json = self.getJsonResponse(RepositoryImageList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
image_id = json['images'][0]['id']
# Lookup the image's changes.
# TODO: Fix me once we can get fake changes into the test data
#self.getJsonResponse(RepositoryImageChanges,
# params=dict(repository=ADMIN_ACCESS_USER + '/simple',
# image_id=image_id))
class TestListAndDeleteTag(ApiTestCase):
def test_listdeletecreateandmovetag(self):
self.login(ADMIN_ACCESS_USER)
# List the images for prod.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'))
prod_images = json['images']
assert len(prod_images) > 0
# List the images for staging.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'))
staging_images = json['images']
assert len(prod_images) == len(staging_images) + 1
# Delete prod.
self.deleteResponse(RepositoryTag,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'),
expected_code=204)
# Make sure the tag is gone.
self.getResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'),
expected_code=404)
# Make the sure the staging images are still there.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'))
self.assertEquals(staging_images, json['images'])
# Add a new tag to the staging image.
self.putResponse(RepositoryTag,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'),
data=dict(image=staging_images[0]['id']),
expected_code=201)
# Make sure the tag is present.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'))
sometag_images = json['images']
self.assertEquals(sometag_images, staging_images)
# Move the tag.
self.putResponse(RepositoryTag,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'),
data=dict(image=staging_images[-1]['id']),
expected_code=201)
# Make sure the tag has moved.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'))
sometag_new_images = json['images']
self.assertEquals(1, len(sometag_new_images))
self.assertEquals(staging_images[-1], sometag_new_images[0])
def test_deletesubtag(self):
self.login(ADMIN_ACCESS_USER)
# List the images for prod.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'))
prod_images = json['images']
assert len(prod_images) > 0
# Delete staging.
self.deleteResponse(RepositoryTag,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'),
expected_code=204)
# Make sure the prod images are still around.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'))
self.assertEquals(prod_images, json['images'])
class TestRepoPermissions(ApiTestCase):
def listUserPermissions(self):
return self.getJsonResponse(RepositoryUserPermissionList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))['permissions']
def listTeamPermissions(self):
return self.getJsonResponse(RepositoryTeamPermissionList,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO))['permissions']
def test_userpermissions(self):
self.login(ADMIN_ACCESS_USER)
# The repo should start with just the admin as a user perm.
permissions = self.listUserPermissions()
self.assertEquals(1, len(permissions))
assert ADMIN_ACCESS_USER in permissions
self.assertEquals('admin', permissions[ADMIN_ACCESS_USER]['role'])
# Add another user.
self.putJsonResponse(RepositoryUserPermission,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER),
data=dict(role='read'))
# Verify the user is present.
permissions = self.listUserPermissions()
self.assertEquals(2, len(permissions))
assert NO_ACCESS_USER in permissions
self.assertEquals('read', permissions[NO_ACCESS_USER]['role'])
json = self.getJsonResponse(RepositoryUserPermission,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER))
self.assertEquals('read', json['role'])
# Change the user's permissions.
self.putJsonResponse(RepositoryUserPermission,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER),
data=dict(role='admin'))
# Verify.
permissions = self.listUserPermissions()
self.assertEquals(2, len(permissions))
assert NO_ACCESS_USER in permissions
self.assertEquals('admin', permissions[NO_ACCESS_USER]['role'])
# Delete the user's permission.
self.deleteResponse(RepositoryUserPermission,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER))
# Verify.
permissions = self.listUserPermissions()
self.assertEquals(1, len(permissions))
assert not NO_ACCESS_USER in permissions
def test_teampermissions(self):
self.login(ADMIN_ACCESS_USER)
# The repo should start with just the readers as a team perm.
permissions = self.listTeamPermissions()
self.assertEquals(1, len(permissions))
assert 'readers' in permissions
self.assertEquals('read', permissions['readers']['role'])
# Add another team.
self.putJsonResponse(RepositoryTeamPermission,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'),
data=dict(role='write'))
# Verify the team is present.
permissions = self.listTeamPermissions()
self.assertEquals(2, len(permissions))
assert 'owners' in permissions
self.assertEquals('write', permissions['owners']['role'])
json = self.getJsonResponse(RepositoryTeamPermission,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'))
self.assertEquals('write', json['role'])
# Change the team's permissions.
self.putJsonResponse(RepositoryTeamPermission,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'),
data=dict(role='admin'))
# Verify.
permissions = self.listTeamPermissions()
self.assertEquals(2, len(permissions))
assert 'owners' in permissions
self.assertEquals('admin', permissions['owners']['role'])
# Delete the team's permission.
self.deleteResponse(RepositoryTeamPermission,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'))
# Verify.
permissions = self.listTeamPermissions()
self.assertEquals(1, len(permissions))
assert not 'owners' in permissions
class TestApiTokens(ApiTestCase):
def listTokens(self):
return self.getJsonResponse(RepositoryTokenList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))['tokens']
def test_tokens(self):
self.login(ADMIN_ACCESS_USER)
# Create a new token.
json = self.postJsonResponse(RepositoryTokenList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(role='read', friendlyName='mytoken'),
expected_code=201)
self.assertEquals('mytoken', json['friendlyName'])
self.assertEquals('read', json['role'])
token_code = json['code']
# Verify.
tokens = self.listTokens()
assert token_code in tokens
self.assertEquals('mytoken', tokens[token_code]['friendlyName'])
json = self.getJsonResponse(RepositoryToken,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code))
self.assertEquals(tokens[token_code], json)
# Change the token's permission.
self.putJsonResponse(RepositoryToken,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code),
data=dict(role='write'))
# Verify.
json = self.getJsonResponse(RepositoryToken,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code))
self.assertEquals('write', json['role'])
# Delete the token.
self.deleteResponse(RepositoryToken,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code))
# Verify.
self.getResponse(RepositoryToken,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code),
expected_code=404)
class TestUserCard(ApiTestCase):
def test_getusercard(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(UserCard)
self.assertEquals('4242', json['card']['last4'])
self.assertEquals('Visa', json['card']['type'])
def test_setusercard_error(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(UserCard,
data=dict(token='sometoken'),
expected_code=402)
assert 'carderror' in json
class TestOrgCard(ApiTestCase):
def test_getorgcard(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrganizationCard,
params=dict(orgname=ORGANIZATION))
self.assertEquals('4242', json['card']['last4'])
self.assertEquals('Visa', json['card']['type'])
class TestUserSubscription(ApiTestCase):
def getSubscription(self):
return self.getJsonResponse(UserPlan)
def test_updateplan(self):
self.login(ADMIN_ACCESS_USER)
# Change the plan.
self.putJsonResponse(UserPlan,
data=dict(plan='free'))
# Verify
sub = self.getSubscription()
self.assertEquals('free', sub['plan'])
# Change the plan.
self.putJsonResponse(UserPlan,
data=dict(plan='bus-large'))
# Verify
sub = self.getSubscription()
self.assertEquals('bus-large', sub['plan'])
class TestOrgSubscription(ApiTestCase):
def getSubscription(self):
return self.getJsonResponse(OrganizationPlan, params=dict(orgname=ORGANIZATION))
def test_updateplan(self):
self.login(ADMIN_ACCESS_USER)
# Change the plan.
self.putJsonResponse(OrganizationPlan,
params=dict(orgname=ORGANIZATION),
data=dict(plan='free'))
# Verify
sub = self.getSubscription()
self.assertEquals('free', sub['plan'])
# Change the plan.
self.putJsonResponse(OrganizationPlan,
params=dict(orgname=ORGANIZATION),
data=dict(plan='bus-large'))
# Verify
sub = self.getSubscription()
self.assertEquals('bus-large', sub['plan'])
class TestUserRobots(ApiTestCase):
def getRobotNames(self):
return [r['name'] for r in self.getJsonResponse(UserRobotList)['robots']]
def test_robots(self):
self.login(NO_ACCESS_USER)
# Create a robot.
json = self.putJsonResponse(UserRobot,
params=dict(robot_shortname='bender'),
expected_code=201)
self.assertEquals(NO_ACCESS_USER + '+bender', json['name'])
# Verify.
robots = self.getRobotNames()
assert NO_ACCESS_USER + '+bender' in robots
# Delete the robot.
self.deleteResponse(UserRobot,
params=dict(robot_shortname='bender'))
# Verify.
robots = self.getRobotNames()
assert not NO_ACCESS_USER + '+bender' in robots
class TestOrgRobots(ApiTestCase):
def getRobotNames(self):
return [r['name'] for r in self.getJsonResponse(OrgRobotList,
params=dict(orgname=ORGANIZATION))['robots']]
def test_robots(self):
self.login(ADMIN_ACCESS_USER)
# Create a robot.
json = self.putJsonResponse(OrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='bender'),
expected_code=201)
self.assertEquals(ORGANIZATION + '+bender', json['name'])
# Verify.
robots = self.getRobotNames()
assert ORGANIZATION + '+bender' in robots
# Delete the robot.
self.deleteResponse(OrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='bender'))
# Verify.
robots = self.getRobotNames()
assert not ORGANIZATION + '+bender' in robots
class TestLogs(ApiTestCase):
def test_user_logs(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(UserLogs)
assert 'logs' in json
assert 'start_time' in json
assert 'end_time' in json
def test_org_logs(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrgLogs, params=dict(orgname=ORGANIZATION))
assert 'logs' in json
assert 'start_time' in json
assert 'end_time' in json
def test_performer(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrgLogs, params=dict(orgname=ORGANIZATION))
all_logs = json['logs']
json = self.getJsonResponse(OrgLogs,
params=dict(performer=READ_ACCESS_USER, orgname=ORGANIZATION))
assert len(json['logs']) < len(all_logs)
for log in json['logs']:
self.assertEquals(READ_ACCESS_USER, log['performer']['name'])
class TestApplicationInformation(ApiTestCase):
def test_get_info(self):
json = self.getJsonResponse(ApplicationInformation, params=dict(client_id=FAKE_APPLICATION_CLIENT_ID))
assert 'name' in json
assert 'uri' in json
assert 'organization' in json
def test_get_invalid_info(self):
self.getJsonResponse(ApplicationInformation, params=dict(client_id='invalid-code'),
expected_code=404)
class TestOrganizationApplications(ApiTestCase):
def test_list_create_applications(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION))
self.assertEquals(2, len(json['applications']))
self.assertEquals(FAKE_APPLICATION_CLIENT_ID, json['applications'][0]['client_id'])
# Add a new application.
json = self.postJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION),
data=dict(name="Some cool app", description="foo"))
self.assertEquals("Some cool app", json['name'])
self.assertEquals("foo", json['description'])
# Retrieve the apps list again
list_json = self.getJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION))
self.assertEquals(3, len(list_json['applications']))
self.assertEquals(json, list_json['applications'][2])
class TestOrganizationApplicationResource(ApiTestCase):
def test_get_edit_delete_application(self):
self.login(ADMIN_ACCESS_USER)
# Retrieve the application.
json = self.getJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
self.assertEquals(FAKE_APPLICATION_CLIENT_ID, json['client_id'])
# Edit the application.
edit_json = self.putJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID),
data=dict(name="Some App", description="foo", application_uri="bar",
redirect_uri="baz", gravatar_email="meh"))
self.assertEquals(FAKE_APPLICATION_CLIENT_ID, edit_json['client_id'])
self.assertEquals("Some App", edit_json['name'])
self.assertEquals("foo", edit_json['description'])
self.assertEquals("bar", edit_json['application_uri'])
self.assertEquals("baz", edit_json['redirect_uri'])
self.assertEquals("meh", edit_json['gravatar_email'])
# Retrieve the application again.
json = self.getJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
self.assertEquals(json, edit_json)
# Delete the application.
self.deleteResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
# Make sure the application is gone.
self.getJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID),
expected_code=404)
class TestOrganizationApplicationResetClientSecret(ApiTestCase):
def test_reset_client_secret(self):
self.login(ADMIN_ACCESS_USER)
# Retrieve the application.
json = self.getJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
self.assertEquals(FAKE_APPLICATION_CLIENT_ID, json['client_id'])
# Reset the client secret.
reset_json = self.postJsonResponse(OrganizationApplicationResetClientSecret,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
self.assertEquals(FAKE_APPLICATION_CLIENT_ID, reset_json['client_id'])
self.assertNotEquals(reset_json['client_secret'], json['client_secret'])
# Verify it was changed in the DB.
json = self.getJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
self.assertEquals(reset_json['client_secret'], json['client_secret'])
class FakeBuildTrigger(BuildTriggerBase):
@classmethod
def service_name(cls):
return 'fakeservice'
def list_build_sources(self, auth_token):
return [{'first': 'source'}, {'second': auth_token}]
def list_build_subdirs(self, auth_token, config):
return [auth_token, 'foo', 'bar', config['somevalue']]
def handle_trigger_request(self, request, auth_token, config):
return ('foo', ['bar'], 'build-name', 'subdir')
def is_active(self, config):
return 'active' in config and config['active']
def activate(self, trigger_uuid, standard_webhook_url, auth_token, config):
config['active'] = True
return config
def deactivate(self, auth_token, config):
config['active'] = False
return config
def manual_start(self, auth_token, config):
return ('foo', ['bar'], 'build-name', 'subdir')
def dockerfile_url(self, auth_token, config):
return 'http://some/url'
def load_dockerfile_contents(self, auth_token, config):
if not 'dockerfile' in config:
return None
return config['dockerfile']
class TestBuildTriggers(ApiTestCase):
def test_list_build_triggers(self):
self.login(ADMIN_ACCESS_USER)
# Check a repo with no known triggers.
json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
self.assertEquals(0, len(json['triggers']))
# Check a repo with one known trigger.
json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building'))
self.assertEquals(1, len(json['triggers']))
trigger = json['triggers'][0]
assert 'id' in trigger
assert 'is_active' in trigger
assert 'config' in trigger
assert 'service' in trigger
# Verify the get trigger method.
trigger_json = self.getJsonResponse(BuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/building',
trigger_uuid=trigger['id']))
self.assertEquals(trigger, trigger_json)
# Check the recent builds for the trigger.
builds_json = self.getJsonResponse(TriggerBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/building',
trigger_uuid=trigger['id']))
assert 'builds' in builds_json
def test_delete_build_trigger(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building'))
self.assertEquals(1, len(json['triggers']))
trigger = json['triggers'][0]
# Delete the trigger.
self.deleteResponse(BuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/building',
trigger_uuid=trigger['id']))
# Verify it was deleted.
json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building'))
self.assertEquals(0, len(json['triggers']))
def test_analyze_fake_trigger(self):
self.login(ADMIN_ACCESS_USER)
database.BuildTriggerService.create(name='fakeservice')
# Add a new fake trigger.
repo = model.get_repository(ADMIN_ACCESS_USER, 'simple')
user = model.get_user(ADMIN_ACCESS_USER)
trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
# Analyze the trigger's dockerfile: First, no dockerfile.
trigger_config = {}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('error', analyze_json['status'])
self.assertEquals('Could not read the Dockerfile for the trigger', analyze_json['message'])
# Analyze the trigger's dockerfile: Second, missing FROM in dockerfile.
trigger_config = {'dockerfile': 'MAINTAINER me'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('warning', analyze_json['status'])
self.assertEquals('No FROM line found in the Dockerfile', analyze_json['message'])
# Analyze the trigger's dockerfile: Third, dockerfile with public repo.
trigger_config = {'dockerfile': 'FROM somerepo'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('publicbase', analyze_json['status'])
# Analyze the trigger's dockerfile: Fourth, dockerfile with private repo with an invalid path.
trigger_config = {'dockerfile': 'FROM localhost:5000/somepath'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('warning', analyze_json['status'])
self.assertEquals('"localhost:5000/somepath" is not a valid Quay repository path', analyze_json['message'])
# Analyze the trigger's dockerfile: Fifth, dockerfile with private repo that does not exist.
trigger_config = {'dockerfile': 'FROM localhost:5000/nothere/randomrepo'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('error', analyze_json['status'])
self.assertEquals('Repository "localhost:5000/nothere/randomrepo" was not found', analyze_json['message'])
# Analyze the trigger's dockerfile: Sixth, dockerfile with private repo that the user cannot see.
trigger_config = {'dockerfile': 'FROM localhost:5000/randomuser/randomrepo'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('error', analyze_json['status'])
self.assertEquals('Repository "localhost:5000/randomuser/randomrepo" was not found', analyze_json['message'])
# Analyze the trigger's dockerfile: Seventh, dockerfile with private repo that the user see.
trigger_config = {'dockerfile': 'FROM localhost:5000/devtable/complex'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('analyzed', analyze_json['status'])
self.assertEquals('devtable', analyze_json['namespace'])
self.assertEquals('complex', analyze_json['name'])
self.assertEquals(False, analyze_json['is_public'])
self.assertEquals(ADMIN_ACCESS_USER + '+dtrobot', analyze_json['robots'][0]['name'])
def test_fake_trigger(self):
self.login(ADMIN_ACCESS_USER)
database.BuildTriggerService.create(name='fakeservice')
# Add a new fake trigger.
repo = model.get_repository(ADMIN_ACCESS_USER, 'simple')
user = model.get_user(ADMIN_ACCESS_USER)
trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
# Verify the trigger.
json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
self.assertEquals(1, len(json['triggers']))
self.assertEquals(trigger.uuid, json['triggers'][0]['id'])
self.assertEquals(trigger.service.name, json['triggers'][0]['service'])
self.assertEquals(False, json['triggers'][0]['is_active'])
# List the trigger's sources.
source_json = self.getJsonResponse(BuildTriggerSources, params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid))
self.assertEquals([{'first': 'source'}, {'second': 'sometoken'}], source_json['sources'])
# List the trigger's subdirs.
subdir_json = self.postJsonResponse(BuildTriggerSubdirs,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'somevalue': 'meh'})
self.assertEquals({'status': 'success', 'subdir': ['sometoken', 'foo', 'bar', 'meh']}, subdir_json)
# Activate the trigger.
trigger_config = {}
activate_json = self.postJsonResponse(BuildTriggerActivate,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals(True, activate_json['is_active'])
# Make sure the trigger has a write token.
trigger = model.get_build_trigger(ADMIN_ACCESS_USER, 'simple', trigger.uuid)
self.assertNotEquals(None, trigger.write_token)
self.assertEquals(True, py_json.loads(trigger.config)['active'])
# Make sure we cannot activate again.
self.postResponse(BuildTriggerActivate,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config},
expected_code=400)
# Start a manual build.
start_json = self.postJsonResponse(ActivateBuildTrigger,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
expected_code=201)
assert 'id' in start_json
self.assertEquals("build-name", start_json['display_name'])
self.assertEquals(['bar'], start_json['job_config']['docker_tags'])
def test_invalid_robot_account(self):
self.login(ADMIN_ACCESS_USER)
database.BuildTriggerService.create(name='fakeservice')
# Add a new fake trigger.
repo = model.get_repository(ADMIN_ACCESS_USER, 'simple')
user = model.get_user(ADMIN_ACCESS_USER)
trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
# Try to activate it with an invalid robot account.
trigger_config = {}
activate_json = self.postJsonResponse(BuildTriggerActivate,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config, 'pull_robot': 'someinvalidrobot'},
expected_code=404)
def test_unauthorized_robot_account(self):
self.login(ADMIN_ACCESS_USER)
database.BuildTriggerService.create(name='fakeservice')
# Add a new fake trigger.
repo = model.get_repository(ADMIN_ACCESS_USER, 'simple')
user = model.get_user(ADMIN_ACCESS_USER)
trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
# Try to activate it with a robot account in the wrong namespace.
trigger_config = {}
activate_json = self.postJsonResponse(BuildTriggerActivate,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config, 'pull_robot': 'freshuser+anotherrobot'},
expected_code=403)
def test_robot_account(self):
self.login(ADMIN_ACCESS_USER)
database.BuildTriggerService.create(name='fakeservice')
# Add a new fake trigger.
repo = model.get_repository(ADMIN_ACCESS_USER, 'simple')
user = model.get_user(ADMIN_ACCESS_USER)
trigger = model.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
# Try to activate it with a robot account.
trigger_config = {}
activate_json = self.postJsonResponse(BuildTriggerActivate,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
data={'config': trigger_config, 'pull_robot': ADMIN_ACCESS_USER + '+dtrobot'})
# Verify that the robot was saved.
self.assertEquals(True, activate_json['is_active'])
self.assertEquals(ADMIN_ACCESS_USER + '+dtrobot', activate_json['pull_robot']['name'])
# Start a manual build.
start_json = self.postJsonResponse(ActivateBuildTrigger,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid),
expected_code=201)
assert 'id' in start_json
self.assertEquals("build-name", start_json['display_name'])
self.assertEquals(['bar'], start_json['job_config']['docker_tags'])
class TestUserAuthorizations(ApiTestCase):
def test_list_get_delete_user_authorizations(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(UserAuthorizationList)
self.assertEquals(1, len(json['authorizations']))
authorization = json['authorizations'][0]
assert 'uuid' in authorization
assert 'scopes' in authorization
assert 'application' in authorization
# Retrieve the authorization.
get_json = self.getJsonResponse(UserAuthorization, params=dict(access_token_uuid = authorization['uuid']))
self.assertEquals(authorization, get_json)
# Delete the authorization.
self.deleteResponse(UserAuthorization, params=dict(access_token_uuid = authorization['uuid']))
# Verify it has been deleted.
self.getJsonResponse(UserAuthorization, params=dict(access_token_uuid = authorization['uuid']),
expected_code=404)
if __name__ == '__main__':
unittest.main()