# coding=utf-8 import unittest import datetime import logging import time import re import json as py_json from StringIO import StringIO from calendar import timegm from contextlib import contextmanager from httmock import urlmatch, HTTMock, all_requests from urllib import urlencode from urlparse import urlparse, urlunparse, parse_qs from playhouse.test_utils import assert_query_count, _QueryLogHandler from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend from mockldap import MockLdap from endpoints.api import api_bp, api from endpoints.building import PreparedBuild from endpoints.webhooks import webhooks from app import app, config_provider, all_queues, dockerfile_build_queue, notification_queue from buildtrigger.basehandler import BuildTriggerHandler from initdb import setup_database_for_testing, finished_database_for_testing from data import database, model from data.database import RepositoryActionCount, Repository as RepositoryTable from test.helpers import assert_action_logged from util.secscan.fake import fake_security_scanner from endpoints.api.team import (TeamMember, TeamMemberList, TeamMemberInvite, OrganizationTeam, TeamPermissions, InviteTeamMember) from endpoints.api.tag import RepositoryTagImages, RepositoryTag, RevertTag, ListRepositoryTags from endpoints.api.search import EntitySearch, ConductSearch from endpoints.api.image import RepositoryImage, RepositoryImageList from endpoints.api.build import RepositoryBuildStatus, RepositoryBuildList, RepositoryBuildResource from endpoints.api.robot import (UserRobotList, OrgRobot, OrgRobotList, UserRobot, RegenerateUserRobot, RegenerateOrgRobot) from endpoints.api.trigger import (BuildTriggerActivate, BuildTriggerSources, BuildTriggerSubdirs, TriggerBuildList, ActivateBuildTrigger, BuildTrigger, BuildTriggerList, BuildTriggerAnalyze, BuildTriggerFieldValues) from endpoints.api.repoemail import RepositoryAuthorizedEmail from endpoints.api.repositorynotification import (RepositoryNotification, RepositoryNotificationList, TestRepositoryNotification) from endpoints.api.user import (PrivateRepositories, ConvertToOrganization, Signout, Signin, User, UserAuthorizationList, UserAuthorization, UserNotification, UserNotificationList, StarredRepositoryList, StarredRepository) from endpoints.api.repotoken import RepositoryToken, RepositoryTokenList from endpoints.api.prototype import PermissionPrototype, PermissionPrototypeList from endpoints.api.logs import (UserLogs, OrgLogs, OrgAggregateLogs, UserAggregateLogs, RepositoryLogs, RepositoryAggregateLogs) from endpoints.api.billing import (UserCard, UserPlan, ListPlans, OrganizationCard, OrganizationPlan) from endpoints.api.discovery import DiscoveryResource from endpoints.api.error import Error from endpoints.api.organization import (OrganizationList, OrganizationMember, OrgPrivateRepositories, OrganizationMemberList, Organization, ApplicationInformation, OrganizationApplications, OrganizationApplicationResource, OrganizationApplicationResetClientSecret, Organization) from endpoints.api.repository import (RepositoryList, RepositoryVisibility, Repository, REPOS_PER_PAGE) from endpoints.api.permission import (RepositoryUserPermission, RepositoryTeamPermission, RepositoryTeamPermissionList, RepositoryUserPermissionList) from endpoints.api.superuser import (SuperUserLogs, SuperUserList, SuperUserManagement, SuperUserServiceKeyManagement, SuperUserServiceKey, SuperUserServiceKeyApproval, SuperUserTakeOwnership, SuperUserCustomCertificates, SuperUserCustomCertificate) from endpoints.api.globalmessages import (GlobalUserMessage, GlobalUserMessages,) from endpoints.api.secscan import RepositoryImageSecurity, RepositoryManifestSecurity from endpoints.api.suconfig import (SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile, SuperUserCreateInitialSuperUser) from endpoints.api.manifest import RepositoryManifestLabels, ManageRepositoryManifestLabel from test.test_ssl_util import generate_test_cert try: app.register_blueprint(api_bp, url_prefix='/api') except ValueError: # This blueprint was already registered pass app.register_blueprint(webhooks, url_prefix='/webhooks') # The number of queries we run for guests on API calls. BASE_QUERY_COUNT = 0 # The number of queries we run for logged in users on API calls. BASE_LOGGEDIN_QUERY_COUNT = BASE_QUERY_COUNT + 1 # The number of queries we run for logged in users on API calls that check # access permissions. BASE_PERM_ACCESS_QUERY_COUNT = BASE_LOGGEDIN_QUERY_COUNT + 2 NO_ACCESS_USER = 'freshuser' READ_ACCESS_USER = 'reader' ADMIN_ACCESS_USER = 'devtable' PUBLIC_USER = 'public' ADMIN_ACCESS_EMAIL = 'jschorr@devtable.com' ORG_REPO = 'orgrepo' ORGANIZATION = 'buynlarge' NEW_USER_DETAILS = { 'username': 'bobby', 'password': 'password', 'email': 'bobby@tables.com', } FAKE_APPLICATION_CLIENT_ID = 'deadbeef' CSRF_TOKEN_KEY = '_csrf_token' CSRF_TOKEN = '123csrfforme' class ApiTestCase(unittest.TestCase): maxDiff = None @staticmethod def _add_csrf(without_csrf): parts = urlparse(without_csrf) query = parse_qs(parts[4]) query[CSRF_TOKEN_KEY] = CSRF_TOKEN return urlunparse(list(parts[0:4]) + [urlencode(query)] + list(parts[5:])) def url_for(self, resource_name, params=None, skip_csrf=False): params = params or {} url = api.url_for(resource_name, **params) if not skip_csrf: url = ApiTestCase._add_csrf(url) return url def setUp(self): setup_database_for_testing(self) self.app = app.test_client() self.ctx = app.test_request_context() self.ctx.__enter__() self.setCsrfToken(CSRF_TOKEN) def tearDown(self): finished_database_for_testing(self) config_provider.clear() self.ctx.__exit__(True, None, None) def setCsrfToken(self, token): with self.app.session_transaction() as sess: sess[CSRF_TOKEN_KEY] = token @contextmanager def toggleFeature(self, name, enabled): import features previous_value = getattr(features, name) setattr(features, name, enabled) yield setattr(features, name, previous_value) def getJsonResponse(self, resource_name, params={}, expected_code=200): rv = self.app.get(api.url_for(resource_name, **params)) self.assertEquals(expected_code, rv.status_code) data = rv.data parsed = py_json.loads(data) return parsed def postResponse(self, resource_name, params={}, data={}, file=None, headers=None, expected_code=200): data = py_json.dumps(data) headers = headers or {} headers.update({"Content-Type": "application/json"}) if file is not None: data = {'file': file} headers = None rv = self.app.post(self.url_for(resource_name, params), data=data, headers=headers) self.assertEquals(rv.status_code, expected_code) return rv.data def getResponse(self, resource_name, params={}, expected_code=200): rv = self.app.get(api.url_for(resource_name, **params)) self.assertEquals(rv.status_code, expected_code) return rv.data def putResponse(self, resource_name, params={}, data={}, expected_code=200): rv = self.app.put(self.url_for(resource_name, params), data=py_json.dumps(data), headers={"Content-Type": "application/json"}) self.assertEquals(rv.status_code, expected_code) return rv.data def deleteResponse(self, resource_name, params={}, expected_code=204): rv = self.app.delete(self.url_for(resource_name, params)) if rv.status_code != expected_code: print 'Mismatch data for resource DELETE %s: %s' % (resource_name, rv.data) self.assertEquals(rv.status_code, expected_code) return rv.data def deleteEmptyResponse(self, resource_name, params={}, expected_code=204): rv = self.app.delete(self.url_for(resource_name, params)) self.assertEquals(rv.status_code, expected_code) self.assertEquals(rv.data, '') # ensure response body empty return def postJsonResponse(self, resource_name, params={}, data={}, expected_code=200): rv = self.app.post(self.url_for(resource_name, params), data=py_json.dumps(data), headers={"Content-Type": "application/json"}) if rv.status_code != expected_code: print 'Mismatch data for resource POST %s: %s' % (resource_name, rv.data) self.assertEquals(rv.status_code, expected_code) data = rv.data parsed = py_json.loads(data) return parsed def putJsonResponse(self, resource_name, params={}, data={}, expected_code=200, skip_csrf=False): rv = self.app.put(self.url_for(resource_name, params, skip_csrf), data=py_json.dumps(data), headers={"Content-Type": "application/json"}) if rv.status_code != expected_code: print 'Mismatch data for resource PUT %s: %s' % (resource_name, rv.data) self.assertEquals(rv.status_code, expected_code) data = rv.data parsed = py_json.loads(data) return parsed def assertNotInTeam(self, data, membername): for memberData in data['members']: if memberData['name'] == membername: self.fail(membername + ' found in team: ' + data['name']) def assertInTeam(self, data, membername): for member_data in data['members']: if member_data['name'] == membername: return self.fail(membername + ' not found in team: ' + data['name']) def login(self, username, password='password'): return self.postJsonResponse(Signin, data=dict(username=username, password=password)) class TestCSRFFailure(ApiTestCase): def test_csrf_failure(self): self.login(READ_ACCESS_USER) # Make sure a simple post call succeeds. self.putJsonResponse(User, data=dict(password='newpasswordiscool')) # Change the session's CSRF token. self.setCsrfToken('someinvalidtoken') # Verify that the call now fails. self.putJsonResponse(User, data=dict(password='newpasswordiscool'), expected_code=403) def test_csrf_failure_empty_token(self): self.login(READ_ACCESS_USER) # Change the session's CSRF token to be empty. self.setCsrfToken('') # Verify that the call now fails. self.putJsonResponse(User, data=dict(password='newpasswordiscool'), expected_code=403) def test_csrf_failure_missing_token(self): self.login(READ_ACCESS_USER) # Make sure a simple post call without a token at all fails. self.putJsonResponse(User, data=dict(password='newpasswordiscool'), skip_csrf=True, expected_code=403) # Change the session's CSRF token to be empty. self.setCsrfToken('') # Verify that the call still fails. self.putJsonResponse(User, data=dict(password='newpasswordiscool'), skip_csrf=True, expected_code=403) class TestDiscovery(ApiTestCase): def test_discovery(self): json = self.getJsonResponse(DiscoveryResource) assert 'paths' in json class TestErrorDescription(ApiTestCase): def test_get_error(self): json = self.getJsonResponse(Error, params=dict(error_type='not_found')) assert json['title'] == 'not_found' assert 'type' in json assert 'description' in json class TestPlans(ApiTestCase): def test_plans(self): json = self.getJsonResponse(ListPlans) found = set([]) for method_info in json['plans']: found.add(method_info['stripeId']) assert 'free' in found class TestLoggedInUser(ApiTestCase): def test_guest(self): self.getJsonResponse(User, expected_code=401) def test_user(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(User) assert json['anonymous'] == False assert json['username'] == READ_ACCESS_USER class TestUserStarredRepositoryList(ApiTestCase): def test_get_stars_guest(self): self.getJsonResponse(StarredRepositoryList, expected_code=401) def test_get_stars_user(self): self.login(READ_ACCESS_USER) # Queries: Base + the list query with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 1): self.getJsonResponse(StarredRepositoryList, expected_code=200) def test_star_repo_guest(self): self.postJsonResponse(StarredRepositoryList, data={ 'namespace': 'public', 'repository': 'publicrepo', }, expected_code=401) def test_star_and_unstar_repo_user(self): self.login(READ_ACCESS_USER) # Queries: Base + the list query with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 1): json = self.getJsonResponse(StarredRepositoryList) assert json['repositories'] == [] json = self.postJsonResponse(StarredRepositoryList, data={ 'namespace': 'public', 'repository': 'publicrepo', }, expected_code=201) assert json['namespace'] == 'public' assert json['repository'] == 'publicrepo' self.deleteEmptyResponse(StarredRepository, params=dict(repository='public/publicrepo'), expected_code=204) json = self.getJsonResponse(StarredRepositoryList) assert json['repositories'] == [] class TestUserNotification(ApiTestCase): def test_get(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(UserNotificationList) # Make sure each notification can be retrieved. for notification in json['notifications']: njson = self.getJsonResponse(UserNotification, params=dict(uuid=notification['id'])) self.assertEquals(notification['id'], njson['id']) # Update a notification. assert json['notifications'] assert not json['notifications'][0]['dismissed'] notification = json['notifications'][0] pjson = self.putJsonResponse(UserNotification, params=dict(uuid=notification['id']), data=dict(dismissed=True)) self.assertEquals(True, pjson['dismissed']) def test_org_notifications(self): # Create a notification on the organization. org = model.user.get_user_or_org(ORGANIZATION) model.notification.create_notification('test_notification', org, {'org': 'notification'}) # Ensure it is visible to the org admin. self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(UserNotificationList) notification = json['notifications'][0] self.assertEquals(notification['kind'], 'test_notification') self.assertEquals(notification['metadata'], {'org': 'notification'}) # Ensure it is not visible to an org member. self.login(READ_ACCESS_USER) json = self.getJsonResponse(UserNotificationList) self.assertEquals(0, len(json['notifications'])) class TestGetUserPrivateAllowed(ApiTestCase): def test_nonallowed(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(PrivateRepositories) assert json['privateCount'] == 0 assert not json['privateAllowed'] def test_allowed(self): self.login(ADMIN_ACCESS_USER) # Change the subscription of the namespace. self.putJsonResponse(UserPlan, data=dict(plan='personal-30')) json = self.getJsonResponse(PrivateRepositories) assert json['privateCount'] >= 6 assert not json['privateAllowed'] # Change the subscription of the namespace. self.putJsonResponse(UserPlan, data=dict(plan='bus-large-30')) json = self.getJsonResponse(PrivateRepositories) assert json['privateAllowed'] class TestConvertToOrganization(ApiTestCase): def test_sameadminuser(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse(ConvertToOrganization, data={'adminUser': READ_ACCESS_USER, 'adminPassword': 'password', 'plan': 'free'}, expected_code=400) self.assertEqual('The admin user is not valid', json['detail']) def test_sameadminuser_by_email(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse(ConvertToOrganization, data={'adminUser': 'no1@thanks.com', 'adminPassword': 'password', 'plan': 'free'}, expected_code=400) self.assertEqual('The admin user is not valid', json['detail']) def test_invalidadminuser(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse(ConvertToOrganization, data={'adminUser': 'unknownuser', 'adminPassword': 'password', 'plan': 'free'}, expected_code=400) self.assertEqual('The admin user credentials are not valid', json['detail']) def test_invalidadminpassword(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse(ConvertToOrganization, data={'adminUser': ADMIN_ACCESS_USER, 'adminPassword': 'invalidpass', 'plan': 'free'}, expected_code=400) self.assertEqual('The admin user credentials are not valid', json['detail']) def test_convert(self): self.login(READ_ACCESS_USER) # Add at least one permission for the read-user. read_user = model.user.get_user(READ_ACCESS_USER) simple_repo = model.repository.get_repository(ADMIN_ACCESS_USER, 'simple') read_role = database.Role.get(name='read') database.RepositoryPermission.create(user=read_user, repository=simple_repo, role=read_role) # Convert the read user into an organization. json = self.postJsonResponse(ConvertToOrganization, data={'adminUser': ADMIN_ACCESS_USER, 'adminPassword': 'password', 'plan': 'free'}) self.assertEqual(True, json['success']) # Verify the organization exists. organization = model.organization.get_organization(READ_ACCESS_USER) assert organization is not None # Verify the admin user is the org's admin. self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(Organization, params=dict(orgname=READ_ACCESS_USER)) self.assertEquals(READ_ACCESS_USER, json['name']) self.assertEquals(True, json['is_admin']) # Verify the now-org has no permissions. count = (database .RepositoryPermission .select() .where(database.RepositoryPermission.user == organization) .count()) self.assertEquals(0, count) def test_convert_via_email(self): self.login(READ_ACCESS_USER) json = self.postJsonResponse(ConvertToOrganization, data={'adminUser': ADMIN_ACCESS_EMAIL, 'adminPassword': 'password', 'plan': 'free'}) self.assertEqual(True, json['success']) # Verify the organization exists. organization = model.organization.get_organization(READ_ACCESS_USER) assert organization is not None # Verify the admin user is the org's admin. self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(Organization, params=dict(orgname=READ_ACCESS_USER)) self.assertEquals(READ_ACCESS_USER, json['name']) self.assertEquals(True, json['is_admin']) class TestChangeUserDetails(ApiTestCase): def test_changepassword(self): self.login(READ_ACCESS_USER) self.putJsonResponse(User, data=dict(password='newpasswordiscool')) self.login(READ_ACCESS_USER, password='newpasswordiscool') def test_changepassword_unicode(self): self.login(READ_ACCESS_USER) self.putJsonResponse(User, data=dict(password=u'someunicode北京市pass')) self.login(READ_ACCESS_USER, password=u'someunicode北京市pass') def test_changeeemail(self): self.login(READ_ACCESS_USER) self.putJsonResponse(User, data=dict(email='test+foo@devtable.com')) def test_changeinvoiceemail(self): self.login(READ_ACCESS_USER) json = self.putJsonResponse(User, data=dict(invoice_email=True)) self.assertEquals(True, json['invoice_email']) json = self.putJsonResponse(User, data=dict(invoice_email=False)) self.assertEquals(False, json['invoice_email']) def test_changeusername_temp(self): self.login(READ_ACCESS_USER) user = model.user.get_user(READ_ACCESS_USER) model.user.create_user_prompt(user, 'confirm_username') self.assertTrue(model.user.has_user_prompt(user, 'confirm_username')) # Add a robot under the user's namespace. model.user.create_robot('somebot', user) # Rename the user. json = self.putJsonResponse(User, data=dict(username='someotherusername')) # Ensure the username was changed. self.assertEquals('someotherusername', json['username']) self.assertFalse(model.user.has_user_prompt(user, 'confirm_username')) # Ensure the robot was changed. self.assertIsNone(model.user.get_user(READ_ACCESS_USER + '+somebot')) self.assertIsNotNone(model.user.get_user('someotherusername+somebot')) def test_changeusername_temp_samename(self): self.login(READ_ACCESS_USER) user = model.user.get_user(READ_ACCESS_USER) model.user.create_user_prompt(user, 'confirm_username') self.assertTrue(model.user.has_user_prompt(user, 'confirm_username')) json = self.putJsonResponse(User, data=dict(username=READ_ACCESS_USER)) # Ensure the username was not changed but they are no longer temporarily named. self.assertEquals(READ_ACCESS_USER, json['username']) self.assertFalse(model.user.has_user_prompt(user, 'confirm_username')) def test_changeusername_notallowed(self): with self.toggleFeature('USER_RENAME', False): self.login(ADMIN_ACCESS_USER) user = model.user.get_user(ADMIN_ACCESS_USER) self.assertFalse(model.user.has_user_prompt(user, 'confirm_username')) json = self.putJsonResponse(User, data=dict(username='someotherusername')) self.assertEquals(ADMIN_ACCESS_USER, json['username']) self.assertTrue('prompts' in json) self.assertIsNone(model.user.get_user('someotherusername')) self.assertIsNotNone(model.user.get_user(ADMIN_ACCESS_USER)) def test_changeusername_allowed(self): with self.toggleFeature('USER_RENAME', True): self.login(ADMIN_ACCESS_USER) user = model.user.get_user(ADMIN_ACCESS_USER) self.assertFalse(model.user.has_user_prompt(user, 'confirm_username')) json = self.putJsonResponse(User, data=dict(username='someotherusername')) self.assertEquals('someotherusername', json['username']) self.assertTrue('prompts' in json) self.assertIsNotNone(model.user.get_user('someotherusername')) self.assertIsNone(model.user.get_user(ADMIN_ACCESS_USER)) def test_changeusername_already_used(self): self.login(READ_ACCESS_USER) user = model.user.get_user(READ_ACCESS_USER) model.user.create_user_prompt(user, 'confirm_username') self.assertTrue(model.user.has_user_prompt(user, 'confirm_username')) # Try to change to a used username. self.putJsonResponse(User, data=dict(username=ADMIN_ACCESS_USER), expected_code=400) # Change to a new username. self.putJsonResponse(User, data=dict(username='unusedusername')) class TestCreateNewUser(ApiTestCase): def test_existingusername(self): json = self.postJsonResponse(User, data=dict(username=READ_ACCESS_USER, password='password', email='test@example.com'), expected_code=400) self.assertEquals('The username already exists', json['detail']) def test_trycreatetooshort(self): json = self.postJsonResponse(User, data=dict(username='a', password='password', email='test@example.com'), expected_code=400) self.assertEquals('Invalid namespace a: Namespace must be between 2 and 255 characters in length', json['detail']) def test_trycreateregexmismatch(self): json = self.postJsonResponse(User, data=dict(username='auserName', password='password', email='test@example.com'), expected_code=400) self.assertEquals('Invalid namespace auserName: Namespace must match expression ^([a-z0-9]+(?:[._-][a-z0-9]+)*)$', json['detail']) def test_createuser(self): data = self.postJsonResponse(User, data=NEW_USER_DETAILS, expected_code=200) self.assertEquals(True, data['awaiting_verification']) def test_createuser_captcha(self): @urlmatch(netloc=r'(.*\.)?google.com', path='/recaptcha/api/siteverify') def captcha_endpoint(url, request): if url.query.find('response=somecode') > 0: return {'status_code': 200, 'content': py_json.dumps({'success': True})} else: return {'status_code': 400, 'content': py_json.dumps({'success': False})} with HTTMock(captcha_endpoint): with self.toggleFeature('RECAPTCHA', True): # Try with a missing captcha. self.postResponse(User, data=NEW_USER_DETAILS, expected_code=400) # Try with an invalid captcha. details = dict(NEW_USER_DETAILS) details['recaptcha_response'] = 'someinvalidcode' self.postResponse(User, data=details, expected_code=400) # Try with a valid captcha. details = dict(NEW_USER_DETAILS) details['recaptcha_response'] = 'somecode' self.postResponse(User, data=details, expected_code=200) def test_createuser_withteaminvite(self): inviter = model.user.get_user(ADMIN_ACCESS_USER) team = model.team.get_organization_team(ORGANIZATION, 'owners') invite = model.team.add_or_invite_to_team(inviter, team, None, NEW_USER_DETAILS['email']) details = { 'invite_code': invite.invite_token } details.update(NEW_USER_DETAILS) data = self.postJsonResponse(User, data=details, expected_code=200) # Make sure the user is verified since the email address of the user matches # that of the team invite. self.assertFalse('awaiting_verification' in data) # Make sure the user was not (yet) added to the team. self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='owners')) self.assertNotInTeam(json, NEW_USER_DETAILS['username']) def test_createuser_withteaminvite_differentemails(self): inviter = model.user.get_user(ADMIN_ACCESS_USER) team = model.team.get_organization_team(ORGANIZATION, 'owners') invite = model.team.add_or_invite_to_team(inviter, team, None, 'differentemail@example.com') details = { 'invite_code': invite.invite_token } details.update(NEW_USER_DETAILS) data = self.postJsonResponse(User, data=details, expected_code=200) # Make sure the user is *not* verified since the email address of the user # does not match that of the team invite. self.assertTrue(data['awaiting_verification']) # Make sure the user was not (yet) added to the team. self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='owners')) self.assertNotInTeam(json, NEW_USER_DETAILS['username']) def test_createuser_withmultipleteaminvites(self): inviter = model.user.get_user(ADMIN_ACCESS_USER) owners_team = model.team.get_organization_team(ORGANIZATION, 'owners') readers_team = model.team.get_organization_team(ORGANIZATION, 'readers') other_owners_team = model.team.get_organization_team('library', 'owners') owners_invite = model.team.add_or_invite_to_team(inviter, owners_team, None, NEW_USER_DETAILS['email']) readers_invite = model.team.add_or_invite_to_team(inviter, readers_team, None, NEW_USER_DETAILS['email']) other_owners_invite = model.team.add_or_invite_to_team(inviter, other_owners_team, None, NEW_USER_DETAILS['email']) # Create the user and ensure they have a verified email address. details = { 'invite_code': owners_invite.invite_token } details.update(NEW_USER_DETAILS) data = self.postJsonResponse(User, data=details, expected_code=200) # Make sure the user is verified since the email address of the user matches # that of the team invite. self.assertFalse('awaiting_verification' in data) # Make sure the user was not (yet) added to the teams. self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='owners')) self.assertNotInTeam(json, NEW_USER_DETAILS['username']) json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers')) self.assertNotInTeam(json, NEW_USER_DETAILS['username']) json = self.getJsonResponse(TeamMemberList, params=dict(orgname='library', teamname='owners')) self.assertNotInTeam(json, NEW_USER_DETAILS['username']) # Accept the first invitation. self.login(NEW_USER_DETAILS['username']) self.putJsonResponse(TeamMemberInvite, params=dict(code=owners_invite.invite_token)) # Make sure both codes are now invalid. self.putResponse(TeamMemberInvite, params=dict(code=owners_invite.invite_token), expected_code=400) self.putResponse(TeamMemberInvite, params=dict(code=readers_invite.invite_token), expected_code=400) # Make sure the user is now in the two invited teams under the organization, but not # in the other org's team. self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='owners')) self.assertInTeam(json, NEW_USER_DETAILS['username']) json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers')) self.assertInTeam(json, NEW_USER_DETAILS['username']) json = self.getJsonResponse(TeamMemberList, params=dict(orgname='library', teamname='owners')) self.assertNotInTeam(json, NEW_USER_DETAILS['username']) # Accept the second invitation. self.login(NEW_USER_DETAILS['username']) self.putJsonResponse(TeamMemberInvite, params=dict(code=other_owners_invite.invite_token)) # Make sure the user was added to the other organization. self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(TeamMemberList, params=dict(orgname='library', teamname='owners')) self.assertInTeam(json, NEW_USER_DETAILS['username']) # Make sure the invitation codes are now invalid. self.putResponse(TeamMemberInvite, params=dict(code=other_owners_invite.invite_token), expected_code=400) class TestDeleteNamespace(ApiTestCase): def test_deletenamespaces(self): self.login(ADMIN_ACCESS_USER) # Try to first delete the user. Since they are the sole admin of two orgs, it should fail. with check_transitive_deletes(): self.deleteResponse(User, expected_code=400) # Delete the two orgs, checking in between. with check_transitive_deletes(): self.deleteEmptyResponse(Organization, params=dict(orgname=ORGANIZATION), expected_code=204) self.deleteResponse(User, expected_code=400) # Should still fail. self.deleteEmptyResponse(Organization, params=dict(orgname='library'), expected_code=204) # Add some queue items for the user. notification_queue.put([ADMIN_ACCESS_USER, 'somerepo', 'somename'], '{}') dockerfile_build_queue.put([ADMIN_ACCESS_USER, 'anotherrepo'], '{}') # Now delete the user. with check_transitive_deletes(): self.deleteEmptyResponse(User, expected_code=204) # Ensure the queue items are gone. self.assertIsNone(notification_queue.get()) self.assertIsNone(dockerfile_build_queue.get()) def test_delete_federateduser(self): self.login(PUBLIC_USER) # Add some federated logins. user = model.user.get_user(PUBLIC_USER) model.user.attach_federated_login(user, 'github', 'something', {}) with check_transitive_deletes(): self.deleteEmptyResponse(User, expected_code=204) def test_delete_prompted_user(self): self.login('randomuser') with check_transitive_deletes(): self.deleteEmptyResponse(User, expected_code=204) class TestSignin(ApiTestCase): def test_signin_unicode(self): self.postResponse(Signin, data=dict(username=u'\xe5\x8c\x97\xe4\xba\xac\xe5\xb8\x82', password='password'), expected_code=403) def test_signin_invitecode(self): # Create a new user (unverified) data = self.postJsonResponse(User, data=NEW_USER_DETAILS, expected_code=200) self.assertTrue(data['awaiting_verification']) # Try to sign in without an invite code. data = self.postJsonResponse(Signin, data=NEW_USER_DETAILS, expected_code=403) self.assertTrue(data['needsEmailVerification']) # Try to sign in with an invalid invite code. details = { 'invite_code': 'someinvalidcode' } details.update(NEW_USER_DETAILS) data = self.postJsonResponse(Signin, data=details, expected_code=403) self.assertTrue(data['needsEmailVerification']) # Sign in with an invite code and ensure the user becomes verified. inviter = model.user.get_user(ADMIN_ACCESS_USER) team = model.team.get_organization_team(ORGANIZATION, 'owners') invite = model.team.add_or_invite_to_team(inviter, team, None, NEW_USER_DETAILS['email']) details = { 'invite_code': invite.invite_token } details.update(NEW_USER_DETAILS) data = self.postJsonResponse(Signin, data=details, expected_code=200) self.assertFalse('needsEmailVerification' in data) class TestSignout(ApiTestCase): def test_signout(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(User) assert json['username'] == READ_ACCESS_USER self.postResponse(Signout) # Make sure we're now signed out. self.getJsonResponse(User, expected_code=401) class TestConductSearch(ApiTestCase): def test_noaccess(self): self.login(NO_ACCESS_USER) json = self.getJsonResponse(ConductSearch, params=dict(query='read')) self.assertEquals(0, len(json['results'])) json = self.getJsonResponse(ConductSearch, params=dict(query='owners')) self.assertEquals(0, len(json['results'])) def test_nouser(self): json = self.getJsonResponse(ConductSearch, params=dict(query='read')) self.assertEquals(0, len(json['results'])) json = self.getJsonResponse(ConductSearch, params=dict(query='public')) self.assertEquals(2, len(json['results'])) self.assertEquals(json['results'][0]['kind'], 'user') self.assertEquals(json['results'][0]['name'], 'public') self.assertEquals(json['results'][1]['kind'], 'repository') self.assertEquals(json['results'][1]['name'], 'publicrepo') json = self.getJsonResponse(ConductSearch, params=dict(query='owners')) self.assertEquals(0, len(json['results'])) def test_orgmember(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(ConductSearch, params=dict(query='owners')) self.assertEquals(0, len(json['results'])) json = self.getJsonResponse(ConductSearch, params=dict(query='readers')) self.assertEquals(1, len(json['results'])) self.assertEquals(json['results'][0]['kind'], 'team') self.assertEquals(json['results'][0]['name'], 'readers') def test_orgadmin(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(ConductSearch, params=dict(query='owners')) self.assertEquals(2, len(json['results'])) self.assertEquals(json['results'][0]['kind'], 'team') self.assertEquals(json['results'][0]['name'], 'owners') json = self.getJsonResponse(ConductSearch, params=dict(query='readers')) self.assertEquals(1, len(json['results'])) self.assertEquals(json['results'][0]['kind'], 'team') self.assertEquals(json['results'][0]['name'], 'readers') def test_explicit_permission(self): self.login('reader') json = self.getJsonResponse(ConductSearch, params=dict(query='shared')) self.assertEquals(1, len(json['results'])) self.assertEquals(json['results'][0]['kind'], 'repository') self.assertEquals(json['results'][0]['name'], 'shared') def test_full_text(self): self.login(ADMIN_ACCESS_USER) # Make sure the repository is found via `full` and `text search`. json = self.getJsonResponse(ConductSearch, params=dict(query='full')) self.assertEquals(1, len(json['results'])) self.assertEquals(json['results'][0]['kind'], 'repository') self.assertEquals(json['results'][0]['name'], 'text-full-repo') json = self.getJsonResponse(ConductSearch, params=dict(query='text search')) self.assertEquals(1, len(json['results'])) self.assertEquals(json['results'][0]['kind'], 'repository') self.assertEquals(json['results'][0]['name'], 'text-full-repo') class TestGetMatchingEntities(ApiTestCase): def test_simple_lookup(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(EntitySearch, params=dict(prefix=ADMIN_ACCESS_USER, namespace=ORGANIZATION, includeTeams='true')) self.assertEquals(1, len(json['results'])) def test_simple_lookup_noorg(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(EntitySearch, params=dict(prefix=ADMIN_ACCESS_USER)) self.assertEquals(1, len(json['results'])) def test_unicode_search(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(EntitySearch, params=dict(prefix='北京市', namespace=ORGANIZATION, includeTeams='true')) self.assertEquals(0, len(json['results'])) def test_notinorg(self): self.login(NO_ACCESS_USER) json = self.getJsonResponse(EntitySearch, params=dict(prefix='o', namespace=ORGANIZATION, includeTeams='true')) names = set([r['name'] for r in json['results']]) assert 'outsideorg' in names assert not 'owners' in names def test_inorg(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(EntitySearch, params=dict(prefix='o', namespace=ORGANIZATION, includeTeams='true')) names = set([r['name'] for r in json['results']]) assert 'outsideorg' in names assert 'owners' in names def test_inorg_withorgs(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(EntitySearch, params=dict(prefix=ORGANIZATION[0], namespace=ORGANIZATION, includeOrgs='true')) names = set([r['name'] for r in json['results']]) assert ORGANIZATION in names class TestCreateOrganization(ApiTestCase): def test_existinguser(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(OrganizationList, data=dict(name=ADMIN_ACCESS_USER, email='testorg@example.com'), expected_code=400) self.assertEquals('A user or organization with this name already exists', json['detail']) def test_existingorg(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(OrganizationList, data=dict(name=ORGANIZATION, email='testorg@example.com'), expected_code=400) self.assertEquals('A user or organization with this name already exists', json['detail']) def test_createorg(self): self.login(ADMIN_ACCESS_USER) data = self.postResponse(OrganizationList, data=dict(name='neworg', email='testorg@example.com'), expected_code=201) self.assertEquals('"Created"', data.strip()) # Ensure the org was created. organization = model.organization.get_organization('neworg') assert organization is not None # Verify the admin user is the org's admin. json = self.getJsonResponse(Organization, params=dict(orgname='neworg')) self.assertEquals('neworg', json['name']) self.assertEquals(True, json['is_admin']) def test_createorg_viaoauth(self): # Attempt with no auth. self.postResponse(OrganizationList, data=dict(name='neworg', email='testorg@example.com'), expected_code=401) # Attempt with auth with invalid scope. dt_user = model.user.get_user(ADMIN_ACCESS_USER) token = model.oauth.create_access_token_for_testing(dt_user, 'deadbeef', 'repo:read', access_token='foo') self.postResponse(OrganizationList, data=dict(name='neworg', email='testorg@example.com'), headers=dict(Authorization='Bearer ' + token.access_token), expected_code=403) # Create OAuth token with user:admin scope. token = model.oauth.create_access_token_for_testing(dt_user, 'deadbeef', 'user:admin', access_token='bar') data = self.postResponse(OrganizationList, data=dict(name='neworg', email='testorg@example.com'), headers=dict(Authorization='Bearer ' + token.access_token), expected_code=201) self.assertEquals('"Created"', data.strip()) class TestGetOrganization(ApiTestCase): def test_unknownorg(self): self.login(ADMIN_ACCESS_USER) self.getResponse(Organization, params=dict(orgname='notvalid'), expected_code=404) def test_cannotaccess(self): self.login(NO_ACCESS_USER) self.getResponse(Organization, params=dict(orgname=ORGANIZATION), expected_code=200) def test_getorganization(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(Organization, params=dict(orgname=ORGANIZATION)) self.assertEquals(ORGANIZATION, json['name']) self.assertEquals(False, json['is_admin']) def test_getorganization_asadmin(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(Organization, params=dict(orgname=ORGANIZATION)) self.assertEquals(ORGANIZATION, json['name']) self.assertEquals(True, json['is_admin']) class TestChangeOrganizationDetails(ApiTestCase): def test_changeinvoiceemail(self): self.login(ADMIN_ACCESS_USER) json = self.putJsonResponse(Organization, params=dict(orgname=ORGANIZATION), data=dict(invoice_email=True)) self.assertEquals(True, json['invoice_email']) json = self.putJsonResponse(Organization, params=dict(orgname=ORGANIZATION), data=dict(invoice_email=False)) self.assertEquals(False, json['invoice_email']) def test_changemail(self): self.login(ADMIN_ACCESS_USER) json = self.putJsonResponse(Organization, params=dict(orgname=ORGANIZATION), data=dict(email='newemail@example.com')) self.assertEquals('newemail@example.com', json['email']) class TestGetOrganizationPrototypes(ApiTestCase): def test_getprototypes(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION)) assert len(json['prototypes']) > 0 class TestCreateOrganizationPrototypes(ApiTestCase): def test_invaliduser(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION), data=dict(activating_user={'name': 'unknownuser'}, role='read', delegate={'kind': 'team', 'name': 'owners'}), expected_code=400) self.assertEquals('Unknown activating user', json['detail']) def test_missingdelegate(self): self.login(ADMIN_ACCESS_USER) self.postJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION), data=dict(role='read'), expected_code=400) def test_createprototype(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION), data=dict(role='read', delegate={'kind': 'team', 'name': 'readers'})) self.assertEquals('read', json['role']) pid = json['id'] # Verify the prototype exists. json = self.getJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION)) ids = set([p['id'] for p in json['prototypes']]) assert pid in ids class TestDeleteOrganizationPrototypes(ApiTestCase): def test_deleteprototype(self): self.login(ADMIN_ACCESS_USER) # Get the existing prototypes json = self.getJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION)) ids = [p['id'] for p in json['prototypes']] pid = ids[0] # Delete a prototype. self.deleteEmptyResponse(PermissionPrototype, params=dict(orgname=ORGANIZATION, prototypeid=pid)) # Verify the prototype no longer exists. json = self.getJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION)) newids = [p['id'] for p in json['prototypes']] assert not pid in newids class TestUpdateOrganizationPrototypes(ApiTestCase): def test_updateprototype(self): self.login(ADMIN_ACCESS_USER) # Get the existing prototypes json = self.getJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION)) ids = [p['id'] for p in json['prototypes']] pid = ids[0] # Update a prototype. json = self.putJsonResponse(PermissionPrototype, params=dict(orgname=ORGANIZATION, prototypeid=pid), data=dict(role='admin')) self.assertEquals('admin', json['role']) class TestGetOrganizationMembers(ApiTestCase): def test_getmembers(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrganizationMemberList, params=dict(orgname=ORGANIZATION)) membernames = [member['name'] for member in json['members']] assert ADMIN_ACCESS_USER in membernames assert READ_ACCESS_USER in membernames assert not NO_ACCESS_USER in membernames for member in json['members']: membername = member['name'] response = self.getJsonResponse(OrganizationMember, params=dict(orgname=ORGANIZATION, membername=membername)) self.assertEquals(member, response) class TestRemoveOrganizationMember(ApiTestCase): def test_try_remove_only_admin(self): self.login(ADMIN_ACCESS_USER) self.deleteResponse(OrganizationMember, params=dict(orgname=ORGANIZATION, membername=ADMIN_ACCESS_USER), expected_code=400) def test_remove_member(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrganizationMemberList, params=dict(orgname=ORGANIZATION)) membernames = [member['name'] for member in json['members']] assert ADMIN_ACCESS_USER in membernames assert READ_ACCESS_USER in membernames self.deleteEmptyResponse(OrganizationMember, params=dict(orgname=ORGANIZATION, membername=READ_ACCESS_USER)) json = self.getJsonResponse(OrganizationMemberList, params=dict(orgname=ORGANIZATION)) membernames = [member['name'] for member in json['members']] assert ADMIN_ACCESS_USER in membernames assert not READ_ACCESS_USER in membernames def test_remove_member_repo_permission(self): self.login(ADMIN_ACCESS_USER) # Add read user as a direct permission on the admin user's repo. model.permission.set_user_repo_permission(READ_ACCESS_USER, ADMIN_ACCESS_USER, 'simple', 'read') # Verify the user has a permission on the admin user's repo. admin_perms = [p.user.username for p in model.user.get_all_repo_users(ADMIN_ACCESS_USER, 'simple')] assert READ_ACCESS_USER in admin_perms # Add read user as a direct permission on the org repo. model.permission.set_user_repo_permission(READ_ACCESS_USER, ORGANIZATION, ORG_REPO, 'read') # Verify the user has a permission on the org repo. org_perms = [p.user.username for p in model.user.get_all_repo_users(ORGANIZATION, ORG_REPO)] assert READ_ACCESS_USER in org_perms # Remove the user from the org. self.deleteEmptyResponse(OrganizationMember, params=dict(orgname=ORGANIZATION, membername=READ_ACCESS_USER)) # Verify that the user's permission on the org repo is gone, but it is still # present on the other repo. org_perms = [p.user.username for p in model.user.get_all_repo_users(ORGANIZATION, ORG_REPO)] assert not READ_ACCESS_USER in org_perms admin_perms = [p.user.username for p in model.user.get_all_repo_users(ADMIN_ACCESS_USER, 'simple')] assert READ_ACCESS_USER in admin_perms class TestGetOrganizationPrivateAllowed(ApiTestCase): def test_existingorg(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrgPrivateRepositories, params=dict(orgname=ORGANIZATION)) self.assertEquals(True, json['privateAllowed']) assert not 'reposAllowed' in json def test_neworg(self): self.login(ADMIN_ACCESS_USER) data = self.postResponse(OrganizationList, data=dict(name='neworg', email='test@example.com'), expected_code=201) json = self.getJsonResponse(OrgPrivateRepositories, params=dict(orgname='neworg')) self.assertEquals(False, json['privateAllowed']) class TestUpdateOrganizationTeam(ApiTestCase): def test_updateexisting(self): self.login(ADMIN_ACCESS_USER) data = self.putJsonResponse(OrganizationTeam, params=dict(orgname=ORGANIZATION, teamname='readers'), data=dict(description='My cool team', role='creator')) self.assertEquals('My cool team', data['description']) self.assertEquals('creator', data['role']) def test_attemptchangeroleonowners(self): self.login(ADMIN_ACCESS_USER) self.putJsonResponse(OrganizationTeam, params=dict(orgname=ORGANIZATION, teamname='owners'), data=dict(role='creator'), expected_code=400) def test_createnewteam(self): self.login(ADMIN_ACCESS_USER) data = self.putJsonResponse(OrganizationTeam, params=dict(orgname=ORGANIZATION, teamname='newteam'), data=dict(description='My cool team', role='member')) self.assertEquals('My cool team', data['description']) self.assertEquals('member', data['role']) # Verify the team was created. json = self.getJsonResponse(Organization, params=dict(orgname=ORGANIZATION)) assert 'newteam' in json['teams'] class TestDeleteOrganizationTeam(ApiTestCase): def test_deleteteam(self): self.login(ADMIN_ACCESS_USER) self.deleteEmptyResponse(OrganizationTeam, params=dict(orgname=ORGANIZATION, teamname='readers')) # Make sure the team was deleted json = self.getJsonResponse(Organization, params=dict(orgname=ORGANIZATION)) assert not 'readers' in json['teams'] def test_attemptdeleteowners(self): self.login(ADMIN_ACCESS_USER) resp = self.deleteResponse(OrganizationTeam, params=dict(orgname=ORGANIZATION, teamname='owners'), expected_code=400) data = py_json.loads(resp) msg = ("Deleting team 'owners' would remove admin ability for user " + "'devtable' in organization 'buynlarge'") self.assertEquals(msg, data['message']) class TestTeamPermissions(ApiTestCase): def test_team_permissions(self): self.login(ADMIN_ACCESS_USER) resp = self.getJsonResponse(TeamPermissions, params=dict(orgname=ORGANIZATION, teamname='readers')) self.assertEquals(1, len(resp['permissions'])) class TestGetOrganizationTeamMembers(ApiTestCase): def test_invalidteam(self): self.login(ADMIN_ACCESS_USER) self.getResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='notvalid'), expected_code=404) def test_getmembers(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers')) self.assertEquals(READ_ACCESS_USER, json['members'][1]['name']) class TestUpdateOrganizationTeamMember(ApiTestCase): def test_addmember_alreadyteammember(self): self.login(ADMIN_ACCESS_USER) membername = READ_ACCESS_USER self.putResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='readers', membername=membername), expected_code=400) def test_addmember_orgmember(self): self.login(ADMIN_ACCESS_USER) membername = READ_ACCESS_USER self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='owners', membername=membername)) # Verify the user was added to the team. json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='owners')) self.assertInTeam(json, membername) def test_addmember_robot(self): self.login(ADMIN_ACCESS_USER) membername = ORGANIZATION + '+coolrobot' self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='readers', membername=membername)) # Verify the user was added to the team. json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers')) self.assertInTeam(json, membername) def test_addmember_invalidrobot(self): self.login(ADMIN_ACCESS_USER) membername = 'freshuser+anotherrobot' self.putResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='readers', membername=membername), expected_code=400) def test_addmember_nonorgmember(self): self.login(ADMIN_ACCESS_USER) membername = NO_ACCESS_USER response = self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='owners', membername=membername)) self.assertEquals(True, response['invited']) # Make sure the user is not (yet) part of the team. json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers')) for member in json['members']: self.assertNotEqual(membername, member['name']) class TestAcceptTeamMemberInvite(ApiTestCase): def test_accept(self): self.login(ADMIN_ACCESS_USER) # Create the invite. membername = NO_ACCESS_USER response = self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='owners', membername=membername)) self.assertEquals(True, response['invited']) # Login as the user. self.login(membername) # Accept the invite. user = model.user.get_user(membername) invites = list(model.team.lookup_team_invites(user)) self.assertEquals(1, len(invites)) self.putJsonResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token)) # Verify the user is now on the team. json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='owners')) self.assertInTeam(json, membername) # Verify the accept now fails. self.putResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token), expected_code=400) def test_accept_via_email(self): self.login(ADMIN_ACCESS_USER) # Create the invite. member = model.user.get_user(NO_ACCESS_USER) response = self.putJsonResponse(InviteTeamMember, params=dict(orgname=ORGANIZATION, teamname='owners', email=member.email)) self.assertEquals(True, response['invited']) # Login as the user. self.login(member.username) # Accept the invite. invites = list(model.team.lookup_team_invites_by_email(member.email)) self.assertEquals(1, len(invites)) self.putJsonResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token)) # Verify the user is now on the team. json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='owners')) self.assertInTeam(json, member.username) # Verify the accept now fails. self.putResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token), expected_code=400) def test_accept_invite_different_user(self): self.login(ADMIN_ACCESS_USER) # Create the invite. response = self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='owners', membername=NO_ACCESS_USER)) self.assertEquals(True, response['invited']) # Login as a different user. self.login(PUBLIC_USER) # Try to accept the invite. user = model.user.get_user(NO_ACCESS_USER) invites = list(model.team.lookup_team_invites(user)) self.assertEquals(1, len(invites)) self.putResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token), expected_code=400) # Ensure the invite is still valid. user = model.user.get_user(NO_ACCESS_USER) invites = list(model.team.lookup_team_invites(user)) self.assertEquals(1, len(invites)) # Ensure the user is *not* a member of the team. self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='owners')) self.assertNotInTeam(json, PUBLIC_USER) def test_accept_invite_different_email(self): self.login(ADMIN_ACCESS_USER) # Create the invite. response = self.putJsonResponse(InviteTeamMember, params=dict(orgname=ORGANIZATION, teamname='owners', email='someemail@example.com')) self.assertEquals(True, response['invited']) # Login as a different user. self.login(PUBLIC_USER) # Try to accept the invite. invites = list(model.team.lookup_team_invites_by_email('someemail@example.com')) self.assertEquals(1, len(invites)) self.putResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token), expected_code=400) # Ensure the invite is still valid. invites = list(model.team.lookup_team_invites_by_email('someemail@example.com')) self.assertEquals(1, len(invites)) # Ensure the user is *not* a member of the team. self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='owners')) self.assertNotInTeam(json, PUBLIC_USER) class TestDeclineTeamMemberInvite(ApiTestCase): def test_decline_wronguser(self): self.login(ADMIN_ACCESS_USER) # Create the invite. membername = NO_ACCESS_USER response = self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='owners', membername=membername)) self.assertEquals(True, response['invited']) # Try to decline the invite. user = model.user.get_user(membername) invites = list(model.team.lookup_team_invites(user)) self.assertEquals(1, len(invites)) self.deleteResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token), expected_code=400) def test_decline(self): self.login(ADMIN_ACCESS_USER) # Create the invite. membername = NO_ACCESS_USER response = self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='owners', membername=membername)) self.assertEquals(True, response['invited']) # Login as the user. self.login(membername) # Decline the invite. user = model.user.get_user(membername) invites = list(model.team.lookup_team_invites(user)) self.assertEquals(1, len(invites)) self.deleteEmptyResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token)) # Make sure the invite was deleted. self.deleteResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token), expected_code=400) class TestDeleteOrganizationTeamMember(ApiTestCase): def test_deletememberinvite(self): self.login(ADMIN_ACCESS_USER) # Verify the initial member count json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers', includePending=True)) self.assertEquals(len(json['members']), 3) membername = NO_ACCESS_USER response = self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='readers', membername=membername)) self.assertEquals(True, response['invited']) # Verify the invite was added. json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers', includePending=True)) self.assertEquals(len(json['members']), 4) # Delete the invite. self.deleteEmptyResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='readers', membername=membername)) # Verify the user was removed from the team. json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers', includePending=True)) self.assertEquals(len(json['members']), 3) def test_deletemember(self): self.login(ADMIN_ACCESS_USER) self.deleteEmptyResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='readers', membername=READ_ACCESS_USER)) # Verify the user was removed from the team. json = self.getJsonResponse(TeamMemberList, params=dict(orgname=ORGANIZATION, teamname='readers')) self.assertEquals(len(json['members']), 1) class TestCreateRepo(ApiTestCase): def test_invalidreponame(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(RepositoryList, data=dict(repository='some/repo', visibility='public', description=''), expected_code=400) self.assertEquals('Invalid repository name', json['detail']) def test_duplicaterepo(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(RepositoryList, data=dict(repository='simple', visibility='public', description=''), expected_code=400) self.assertEquals('Repository already exists', json['detail']) def test_createrepo(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(RepositoryList, data=dict(repository='newrepo', visibility='public', description=''), expected_code=201) self.assertEquals(ADMIN_ACCESS_USER, json['namespace']) self.assertEquals('newrepo', json['name']) def test_createrepo_underorg(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(RepositoryList, data=dict(namespace=ORGANIZATION, repository='newrepo', visibility='private', description=''), expected_code=201) self.assertEquals(ORGANIZATION, json['namespace']) self.assertEquals('newrepo', json['name']) class TestListRepos(ApiTestCase): def test_listrepos_asguest(self): # Queries: Base + the list query with assert_query_count(BASE_QUERY_COUNT + 1): json = self.getJsonResponse(RepositoryList, params=dict(public=True)) self.assertEquals(len(json['repositories']), 1) def assertPublicRepos(self, has_extras=False): public_user = model.user.get_user('public') # Delete all existing repos under the namespace. for repo in list(RepositoryTable.select().where(RepositoryTable.namespace_user == public_user)): model.repository.purge_repository(public_user.username, repo.name) # Add public repos until we have enough for a few pages. required = set() for i in range(0, REPOS_PER_PAGE * 3): name = 'publicrepo%s' % i model.repository.create_repository('public', name, public_user, visibility='public') required.add(name) # Request results until we no longer have any. next_page = None while True: json = self.getJsonResponse(RepositoryList, params=dict(public=True, next_page=next_page)) for repo in json['repositories']: name = repo['name'] if name in required: required.remove(name) else: self.assertTrue(has_extras, "Could not find name %s in repos created" % name) if 'next_page' in json: self.assertEquals(len(json['repositories']), REPOS_PER_PAGE) else: break next_page = json['next_page'] def test_listrepos_asguest_withpages(self): self.assertPublicRepos() def test_listrepos_asorgmember_withpages(self): self.login(READ_ACCESS_USER) self.assertPublicRepos(has_extras=True) def test_listrepos_filter(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(RepositoryList, params=dict(namespace=ORGANIZATION, public=False)) self.assertGreater(len(json['repositories']), 0) for repo in json['repositories']: self.assertEquals(ORGANIZATION, repo['namespace']) def test_listrepos_allparams(self): # Add a repository action count entry for one of the org repos. repo = model.repository.get_repository(ORGANIZATION, ORG_REPO) RepositoryActionCount.create(repository=repo, count=10, date=datetime.datetime.utcnow()) self.login(ADMIN_ACCESS_USER) # Queries: Base + the list query + the popularity and last modified queries + full perms load with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 4): json = self.getJsonResponse(RepositoryList, params=dict(namespace=ORGANIZATION, public=False, last_modified=True, popularity=True)) self.assertGreater(len(json['repositories']), 0) for repository in json['repositories']: self.assertEquals(ORGANIZATION, repository['namespace']) if repository['name'] == ORG_REPO: self.assertGreater(repository['popularity'], 0) def test_listrepos_starred_nouser(self): self.getResponse(RepositoryList, params=dict(last_modified=True, popularity=True, starred=True), expected_code=400) def test_listrepos_starred(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(RepositoryList, params=dict(last_modified=True, popularity=True, starred=True)) self.assertTrue(len(json['repositories']) > 0) for repo in json['repositories']: self.assertTrue(repo['is_starred']) def test_listrepos_asguest_allparams(self): json = self.getJsonResponse(RepositoryList, params=dict(namespace=ORGANIZATION, public=False, last_modified=True)) for repo in json['repositories']: self.assertEquals(ORGANIZATION, repo['namespace']) def assertRepositoryVisible(self, namespace, name): json = self.getJsonResponse(RepositoryList, params=dict(namespace=namespace, public=False)) self.assertEquals(1, len(json['repositories'])) self.assertEquals(name, json['repositories'][0]['name']) def assertRepositoryNotVisible(self, namespace, name): json = self.getJsonResponse(RepositoryList, params=dict(namespace=namespace, public=False)) for repo in json['repositories']: self.assertNotEquals(name, repo['name']) json = self.getJsonResponse(RepositoryList, params=dict(starred=True)) for repo in json['repositories']: self.assertNotEquals(name, repo['name']) def test_listrepos_starred_filtered(self): admin_user = model.user.get_user(ADMIN_ACCESS_USER) reader_user = model.user.get_user(READ_ACCESS_USER) # Create a new organization. new_org = model.organization.create_organization('neworg', 'neworg@devtable.com', admin_user) admin_team = model.team.create_team('admin', new_org, 'admin') # Add a repository to the organization. repo = model.repository.create_repository('neworg', 'somerepo', admin_user) with self.add_to_team_temporarily(reader_user, admin_team): # Star the repository for the user. model.repository.star_repository(reader_user, repo) # Verify that the user cannot see the repo, since they are no longer allowed to do so. self.login(READ_ACCESS_USER) self.assertRepositoryNotVisible('neworg', 'somerepo') @contextmanager def add_to_team_temporarily(self, user, team): model.team.add_user_to_team(user, team) yield model.team.remove_user_from_team(team.organization.username, team.name, user.username, ADMIN_ACCESS_USER) def test_listrepos_org_filtered(self): admin_user = model.user.get_user(ADMIN_ACCESS_USER) reader_user = model.user.get_user(READ_ACCESS_USER) # Create a new organization. new_org = model.organization.create_organization('neworg', 'neworg@devtable.com', admin_user) admin_team = model.team.create_team('admin', new_org, 'admin') creator_team = model.team.create_team('creators', new_org, 'creator') member_team = model.team.create_team('members', new_org, 'member') # Add a repository to the organization. model.repository.create_repository('neworg', 'somerepo', admin_user) # Verify that the admin user can view it. self.login(ADMIN_ACCESS_USER) self.assertRepositoryVisible('neworg', 'somerepo') # Add reader to a creator team under the org and verify they *cannot* see the repository. with self.add_to_team_temporarily(reader_user, creator_team): self.login(READ_ACCESS_USER) self.assertRepositoryNotVisible('neworg', 'somerepo') # Add reader to a member team under the org and verify they *cannot* see the repository. with self.add_to_team_temporarily(reader_user, member_team): self.login(READ_ACCESS_USER) self.assertRepositoryNotVisible('neworg', 'somerepo') # Add reader to an admin team under the org and verify they *can* see the repository. with self.add_to_team_temporarily(reader_user, admin_team): self.login(READ_ACCESS_USER) self.assertRepositoryVisible('neworg', 'somerepo') # Verify that the public user cannot see the repository. self.login(PUBLIC_USER) self.assertRepositoryNotVisible('neworg', 'somerepo') class TestViewPublicRepository(ApiTestCase): def test_normalview(self): resp = self.getJsonResponse(Repository, params=dict(repository='public/publicrepo')) self.assertFalse('stats' in resp) def test_normalview_withstats(self): resp = self.getJsonResponse(Repository, params=dict(repository='public/publicrepo', includeStats=True)) self.assertTrue('stats' in resp) def test_anon_access_disabled(self): import features features.ANONYMOUS_ACCESS = False try: self.getResponse(Repository, params=dict(repository='public/publicrepo'), expected_code=401) finally: features.ANONYMOUS_ACCESS = True class TestUpdateRepo(ApiTestCase): SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple' def test_updatedescription(self): self.login(ADMIN_ACCESS_USER) self.putJsonResponse(Repository, params=dict(repository=self.SIMPLE_REPO), data=dict(description='Some cool repo')) # Verify the repo description was updated. json = self.getJsonResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) self.assertEquals('Some cool repo', json['description']) class TestChangeRepoVisibility(ApiTestCase): SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple' def test_trychangevisibility(self): self.login(ADMIN_ACCESS_USER) # Make public. self.postJsonResponse(RepositoryVisibility, params=dict(repository=self.SIMPLE_REPO), data=dict(visibility='public')) # Verify the visibility. json = self.getJsonResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) self.assertEquals(True, json['is_public']) # Change the subscription of the namespace. self.putJsonResponse(UserPlan, data=dict(plan='personal-30')) # Try to make private. self.postJsonResponse(RepositoryVisibility, params=dict(repository=self.SIMPLE_REPO), data=dict(visibility='private'), expected_code=402) # Verify the visibility. json = self.getJsonResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) self.assertEquals(True, json['is_public']) def test_changevisibility(self): self.login(ADMIN_ACCESS_USER) # Make public. self.postJsonResponse(RepositoryVisibility, params=dict(repository=self.SIMPLE_REPO), data=dict(visibility='public')) # Verify the visibility. json = self.getJsonResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) self.assertEquals(True, json['is_public']) # Make private. self.postJsonResponse(RepositoryVisibility, params=dict(repository=self.SIMPLE_REPO), data=dict(visibility='private')) # Verify the visibility. json = self.getJsonResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) self.assertEquals(False, json['is_public']) class log_queries(object): def __init__(self, query_filter=None): self.filter = query_filter def get_queries(self): queries = [q.msg[0] for q in self._handler.queries] if self.filter: queries = [q for q in queries if re.match(self.filter, q)] return queries def __enter__(self): logger = logging.getLogger('peewee') self._handler = _QueryLogHandler() logger.setLevel(logging.DEBUG) logger.addHandler(self._handler) return self def __exit__(self, exc_type, exc_val, exc_tb): logger = logging.getLogger('peewee') logger.removeHandler(self._handler) class check_transitive_deletes(log_queries): def __init__(self): super(check_transitive_deletes, self).__init__(query_filter=r'^DELETE.+IN \(SELECT.+$') def __exit__(self, exc_type, exc_val, exc_tb): super(check_transitive_deletes, self).__exit__(exc_type, exc_val, exc_tb) queries = self.get_queries() if queries: raise Exception('Detected transitive deletion in queries: %s' % queries) class TestDeleteRepository(ApiTestCase): SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple' COMPLEX_REPO = ADMIN_ACCESS_USER + '/complex' def test_deleterepo(self): self.login(ADMIN_ACCESS_USER) # Verify the repo exists. self.getResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) # Add a build queue item for the repo. dockerfile_build_queue.put([ADMIN_ACCESS_USER, 'simple'], '{}') # Delete the repository. self.deleteEmptyResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) # Ensure the queue item is gone. self.assertIsNone(dockerfile_build_queue.get()) # Verify the repo was deleted. self.getResponse(Repository, params=dict(repository=self.SIMPLE_REPO), expected_code=404) def test_verify_queue_removal(self): self.login(ADMIN_ACCESS_USER) # Verify the repo exists. self.getResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) # Add a build queue item for the repo and another repo. dockerfile_build_queue.put([ADMIN_ACCESS_USER, 'simple'], '{}', available_after=-1) dockerfile_build_queue.put([ADMIN_ACCESS_USER, 'anotherrepo'], '{}', available_after=-1) # Delete the repository. self.deleteEmptyResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) # Ensure the other queue item is still present. self.assertIsNotNone(dockerfile_build_queue.get()) def test_deleterepo2(self): self.login(ADMIN_ACCESS_USER) # Verify the repo exists. self.getResponse(Repository, params=dict(repository=self.COMPLEX_REPO)) self.deleteEmptyResponse(Repository, params=dict(repository=self.COMPLEX_REPO)) # Verify the repo was deleted. self.getResponse(Repository, params=dict(repository=self.COMPLEX_REPO), expected_code=404) def test_populate_and_delete_repo(self): self.login(ADMIN_ACCESS_USER) # Verify the repo exists. self.getResponse(Repository, params=dict(repository=self.COMPLEX_REPO)) # Make sure the repository has some images and tags. self.assertTrue(len(list(model.image.get_repository_images(ADMIN_ACCESS_USER, 'complex'))) > 0) self.assertTrue(len(list(model.tag.list_repository_tags(ADMIN_ACCESS_USER, 'complex'))) > 0) # Add some data for the repository, in addition to is already existing images and tags. repository = model.repository.get_repository(ADMIN_ACCESS_USER, 'complex') # Create some access tokens. access_token = model.token.create_access_token(repository, 'read') model.token.create_access_token(repository, 'write') delegate_token = model.token.create_delegate_token(ADMIN_ACCESS_USER, 'complex', 'sometoken', 'read') model.token.create_delegate_token(ADMIN_ACCESS_USER, 'complex', 'sometoken', 'write') # Create some repository builds. model.build.create_repository_build(repository, access_token, {}, 'someid', 'foobar') model.build.create_repository_build(repository, delegate_token, {}, 'someid2', 'foobar2') # Create some notifications. model.notification.create_repo_notification(repository, 'repo_push', 'hipchat', {}, {}) model.notification.create_repo_notification(repository, 'build_queued', 'slack', {}, {}) # Create some logs. model.log.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository) model.log.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository) # Create some build triggers. user = model.user.get_user(ADMIN_ACCESS_USER) model.build.create_build_trigger(repository, 'github', 'sometoken', user) model.build.create_build_trigger(repository, 'github', 'anothertoken', user) # Create some email authorizations. model.repository.create_email_authorization_for_repo(ADMIN_ACCESS_USER, 'complex', 'a@b.com') model.repository.create_email_authorization_for_repo(ADMIN_ACCESS_USER, 'complex', 'b@c.com') # Create some repository action count entries. RepositoryActionCount.create(repository=repository, date=datetime.datetime.now(), count=1) RepositoryActionCount.create(repository=repository, date=datetime.datetime.now() - datetime.timedelta(days=2), count=2) RepositoryActionCount.create(repository=repository, date=datetime.datetime.now() - datetime.timedelta(days=5), count=6) # Create some labels. pre_delete_label_count = database.Label.select().count() tag_manifest = model.tag.load_tag_manifest(ADMIN_ACCESS_USER, 'complex', 'prod') model.label.create_manifest_label(tag_manifest, 'foo', 'bar', 'manifest') model.label.create_manifest_label(tag_manifest, 'foo', 'baz', 'manifest') model.label.create_manifest_label(tag_manifest, 'something', '{}', 'api', media_type_name='application/json') model.label.create_manifest_label(tag_manifest, 'something', '{"some": "json"}', 'manifest') # Delete the repository. with check_transitive_deletes(): self.deleteEmptyResponse(Repository, params=dict(repository=self.COMPLEX_REPO)) # Verify the repo was deleted. self.getResponse(Repository, params=dict(repository=self.COMPLEX_REPO), expected_code=404) # Verify the labels are gone. post_delete_label_count = database.Label.select().count() self.assertEquals(post_delete_label_count, pre_delete_label_count) class TestGetRepository(ApiTestCase): PUBLIC_REPO = PUBLIC_USER + '/publicrepo' def test_getrepo_badnames(self): self.login(ADMIN_ACCESS_USER) bad_names = ['logs', 'build', 'tokens', 'foo.bar', 'foo-bar', 'foo_bar'] # For each bad name, create the repo. for bad_name in bad_names: json = self.postJsonResponse(RepositoryList, expected_code=201, data=dict(repository=bad_name, visibility='public', description='')) # Make sure we can retrieve its information. json = self.getJsonResponse(Repository, params=dict(repository=ADMIN_ACCESS_USER + '/' + bad_name)) self.assertEquals(ADMIN_ACCESS_USER, json['namespace']) self.assertEquals(bad_name, json['name']) self.assertEquals(True, json['is_public']) def test_getrepo_public_asguest(self): json = self.getJsonResponse(Repository, params=dict(repository=self.PUBLIC_REPO)) self.assertEquals(PUBLIC_USER, json['namespace']) self.assertEquals('publicrepo', json['name']) self.assertEquals(True, json['is_public']) self.assertEquals(False, json['is_organization']) self.assertEquals(False, json['can_write']) self.assertEquals(False, json['can_admin']) assert 'latest' in json['tags'] def test_getrepo_public_asowner(self): self.login(PUBLIC_USER) json = self.getJsonResponse(Repository, params=dict(repository=self.PUBLIC_REPO)) self.assertEquals(False, json['is_organization']) self.assertEquals(True, json['can_write']) self.assertEquals(True, json['can_admin']) def test_getrepo_building(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(Repository, params=dict(repository=ADMIN_ACCESS_USER + '/building')) self.assertEquals(True, json['can_write']) self.assertEquals(True, json['can_admin']) self.assertEquals(False, json['is_organization']) def test_getrepo_org_asnonmember(self): self.getResponse(Repository, params=dict(repository=ORGANIZATION + '/' + ORG_REPO), expected_code=401) def test_getrepo_org_asreader(self): self.login(READ_ACCESS_USER) json = self.getJsonResponse(Repository, params=dict(repository=ORGANIZATION + '/' + ORG_REPO)) self.assertEquals(ORGANIZATION, json['namespace']) self.assertEquals(ORG_REPO, json['name']) self.assertEquals(False, json['can_write']) self.assertEquals(False, json['can_admin']) self.assertEquals(True, json['is_organization']) def test_getrepo_org_asadmin(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(Repository, params=dict(repository=ORGANIZATION + '/' + ORG_REPO)) self.assertEquals(True, json['can_write']) self.assertEquals(True, json['can_admin']) self.assertEquals(True, json['is_organization']) class TestRepositoryBuildResource(ApiTestCase): def test_repo_build_invalid_url(self): self.login(ADMIN_ACCESS_USER) self.postJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(archive_url='hppt://quay.io'), expected_code=400) def test_cancel_invalidbuild(self): self.login(ADMIN_ACCESS_USER) self.deleteResponse(RepositoryBuildResource, params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid='invalid'), expected_code=404) def test_cancel_waitingbuild(self): self.login(ADMIN_ACCESS_USER) # Request a (fake) build. json = self.postJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz'), expected_code=201) uuid = json['id'] # Check for the build. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) self.assertEquals(1, len(json['builds'])) self.assertEquals(uuid, json['builds'][0]['id']) # Find the build's queue item. build_ref = database.RepositoryBuild.get(uuid=uuid) queue_item = database.QueueItem.get(id=build_ref.queue_id) self.assertTrue(queue_item.available) self.assertTrue(queue_item.retries_remaining > 0) # Cancel the build. self.deleteResponse(RepositoryBuildResource, params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid=uuid), expected_code=201) # Check for the build. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) self.assertEquals(1, len(json['builds'])) self.assertEquals('cancelled', json['builds'][0]['phase']) # Check for the build's queue item. try: database.QueueItem.get(id=build_ref.queue_id) self.fail('QueueItem still exists for build') except database.QueueItem.DoesNotExist: pass def test_attemptcancel_scheduledbuild(self): self.login(ADMIN_ACCESS_USER) # Request a (fake) build. json = self.postJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz'), expected_code=201) uuid = json['id'] # Check for the build. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) self.assertEquals(1, len(json['builds'])) self.assertEquals(uuid, json['builds'][0]['id']) # Set queue item to be picked up. build_ref = database.RepositoryBuild.get(uuid=uuid) qi = database.QueueItem.get(id=build_ref.queue_id) qi.available = False qi.save() # Try to cancel the build. self.deleteResponse(RepositoryBuildResource, params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid=uuid), expected_code=201) def test_attemptcancel_workingbuild(self): self.login(ADMIN_ACCESS_USER) # Request a (fake) build. json = self.postJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz'), expected_code=201) uuid = json['id'] # Check for the build. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) self.assertEquals(1, len(json['builds'])) self.assertEquals(uuid, json['builds'][0]['id']) # Set the build to a different phase. rb = database.RepositoryBuild.get(uuid=uuid) rb.phase = database.BUILD_PHASE.BUILDING rb.save() # Try to cancel the build. self.deleteResponse(RepositoryBuildResource, params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid=uuid), expected_code=400) class TestRepoBuilds(ApiTestCase): def test_getrepo_nobuilds(self): self.login(ADMIN_ACCESS_USER) # Queries: Permission + the list query with assert_query_count(2): json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['builds']) == 0 def test_getrepobuilds(self): self.login(ADMIN_ACCESS_USER) # Queries: Permission + the list query with assert_query_count(2): json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/building')) assert len(json['builds']) > 0 build = json['builds'][-1] assert 'id' in build assert 'status' in build # Check the status endpoint. status_json = self.getJsonResponse(RepositoryBuildStatus, params=dict(repository=ADMIN_ACCESS_USER + '/building', build_uuid=build['id'])) self.assertEquals(status_json['id'], build['id']) self.assertEquals(status_json['resource_key'], build['resource_key']) self.assertEquals(status_json['trigger'], build['trigger']) class TestRequestRepoBuild(ApiTestCase): def test_requestbuild_noidurl(self): self.login(ADMIN_ACCESS_USER) # Request a (fake) build without a file ID or URL. self.postResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(), expected_code=400) def test_requestbuild_invalidurls(self): self.login(ADMIN_ACCESS_USER) # Request a (fake) build with and invalid URL. self.postResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(archive_url='foobarbaz'), expected_code=400) self.postResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(archive_url='file://foobarbaz'), expected_code=400) def test_requestrepobuild_withurl(self): self.login(ADMIN_ACCESS_USER) # Ensure we are not yet building. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['builds']) == 0 # Request a (fake) build. self.postResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(archive_url='http://quay.io/robots.txt'), expected_code=201) # Check for the build. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['builds']) > 0 self.assertEquals('http://quay.io/robots.txt', json['builds'][0]['archive_url']) def test_requestrepobuild_withfile(self): self.login(ADMIN_ACCESS_USER) # Ensure we are not yet building. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['builds']) == 0 # Request a (fake) build. self.postResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz'), expected_code=201) # Check for the build. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['builds']) > 0 def test_requestrepobuild_with_robot(self): self.login(ADMIN_ACCESS_USER) # Ensure we are not yet building. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['builds']) == 0 # Request a (fake) build. pull_robot = ADMIN_ACCESS_USER + '+dtrobot' self.postResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz', pull_robot=pull_robot), expected_code=201) # Check for the build. json = self.getJsonResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/building')) assert len(json['builds']) > 0 def test_requestrepobuild_with_invalid_robot(self): self.login(ADMIN_ACCESS_USER) # Request a (fake) build. pull_robot = ADMIN_ACCESS_USER + '+invalidrobot' self.postResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz', pull_robot=pull_robot), expected_code=404) def test_requestrepobuild_with_unauthorized_robot(self): self.login(ADMIN_ACCESS_USER) # Request a (fake) build. pull_robot = 'freshuser+anotherrobot' self.postResponse(RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(file_id='foobarbaz', pull_robot=pull_robot), expected_code=403) class TestRepositoryEmail(ApiTestCase): def test_emailnotauthorized(self): self.login(ADMIN_ACCESS_USER) # Verify the e-mail address is not authorized. self.getResponse(RepositoryAuthorizedEmail, params=dict(repository=ADMIN_ACCESS_USER + '/simple', email='test@example.com'), expected_code=404) def test_emailnotauthorized_butsent(self): self.login(ADMIN_ACCESS_USER) # Verify the e-mail address is not authorized. json = self.getJsonResponse(RepositoryAuthorizedEmail, params=dict(repository=ADMIN_ACCESS_USER + '/simple', email='jschorr+other@devtable.com')) self.assertEquals(False, json['confirmed']) self.assertEquals(ADMIN_ACCESS_USER, json['namespace']) self.assertEquals('simple', json['repository']) def test_emailauthorized(self): self.login(ADMIN_ACCESS_USER) # Verify the e-mail address is authorized. json = self.getJsonResponse(RepositoryAuthorizedEmail, params=dict(repository=ADMIN_ACCESS_USER + '/simple', email='jschorr@devtable.com')) self.assertEquals(True, json['confirmed']) self.assertEquals(ADMIN_ACCESS_USER, json['namespace']) self.assertEquals('simple', json['repository']) def test_send_email_authorization(self): self.login(ADMIN_ACCESS_USER) # Send the email. json = self.postJsonResponse(RepositoryAuthorizedEmail, params=dict(repository=ADMIN_ACCESS_USER + '/simple', email='jschorr+foo@devtable.com')) self.assertEquals(False, json['confirmed']) self.assertEquals(ADMIN_ACCESS_USER, json['namespace']) self.assertEquals('simple', json['repository']) class TestRepositoryNotifications(ApiTestCase): def test_testnotification(self): self.login(ADMIN_ACCESS_USER) # Add a notification. json = self.postJsonResponse(RepositoryNotificationList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(config={'url': 'http://example.com'}, event='repo_push', method='webhook', eventConfig={}), expected_code=201) uuid = json['uuid'] self.assertIsNone(notification_queue.get()) # Issue a test notification. self.postJsonResponse(TestRepositoryNotification, params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=uuid)) # Ensure the item is in the queue. time.sleep(1) # Makes sure the queue get works on MySQL with its second-level precision. found = notification_queue.get() self.assertIsNotNone(found) self.assertTrue('notification_uuid' in found['body']) def test_webhooks(self): self.login(ADMIN_ACCESS_USER) # Add a notification. json = self.postJsonResponse(RepositoryNotificationList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(config={'url': 'http://example.com'}, event='repo_push', method='webhook', eventConfig={}), expected_code=201) self.assertEquals('repo_push', json['event']) self.assertEquals('webhook', json['method']) self.assertEquals('http://example.com', json['config']['url']) self.assertIsNone(json['title']) wid = json['uuid'] # Get the notification. json = self.getJsonResponse(RepositoryNotification, params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=wid)) self.assertEquals(wid, json['uuid']) self.assertEquals('repo_push', json['event']) self.assertEquals('webhook', json['method']) self.assertIsNone(json['title']) # Verify the notification is listed. json = self.getJsonResponse(RepositoryNotificationList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) ids = [w['uuid'] for w in json['notifications']] assert wid in ids # Delete the notification. self.deleteEmptyResponse(RepositoryNotification, params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=wid), expected_code=204) # Verify the notification is gone. self.getResponse(RepositoryNotification, params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=wid), expected_code=404) # Add another notification. json = self.postJsonResponse(RepositoryNotificationList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(config={'url': 'http://example.com'}, event='repo_push', method='webhook', title='Some Notification', eventConfig={}), expected_code=201) self.assertEquals('repo_push', json['event']) self.assertEquals('webhook', json['method']) self.assertEquals('http://example.com', json['config']['url']) self.assertEquals('Some Notification', json['title']) wid = json['uuid'] # Get the notification. json = self.getJsonResponse(RepositoryNotification, params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=wid)) self.assertEquals(wid, json['uuid']) self.assertEquals('repo_push', json['event']) self.assertEquals('webhook', json['method']) self.assertEquals('Some Notification', json['title']) class TestListAndGetImage(ApiTestCase): def test_listandgetimages(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(RepositoryImageList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) assert len(json['images']) > 0 for image in json['images']: assert 'id' in image assert 'tags' in image assert 'created' in image assert 'comment' in image assert 'command' in image assert 'ancestors' in image assert 'size' in image ijson = self.getJsonResponse(RepositoryImage, params=dict(repository=ADMIN_ACCESS_USER + '/simple', image_id=image['id'])) self.assertEquals(image['id'], ijson['id']) class TestGetImageChanges(ApiTestCase): def test_getimagechanges(self): self.login(ADMIN_ACCESS_USER) # Find an image to check. json = self.getJsonResponse(RepositoryImageList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) image_id = json['images'][0]['id'] # Lookup the image's changes. # TODO: Fix me once we can get fake changes into the test data #self.getJsonResponse(RepositoryImageChanges, # params=dict(repository=ADMIN_ACCESS_USER + '/simple', # image_id=image_id)) class TestRevertTag(ApiTestCase): def test_reverttag_invalidtag(self): self.login(ADMIN_ACCESS_USER) self.postResponse(RevertTag, params=dict(repository=ADMIN_ACCESS_USER + '/history', tag='invalidtag'), data=dict(image='invalid_image'), expected_code=404) def test_reverttag_invalidimage(self): self.login(ADMIN_ACCESS_USER) self.postResponse(RevertTag, params=dict(repository=ADMIN_ACCESS_USER + '/history', tag='latest'), data=dict(image='invalid_image'), expected_code=400) def test_reverttag(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(ListRepositoryTags, params=dict(repository=ADMIN_ACCESS_USER + '/history', tag='latest')) self.assertEquals(2, len(json['tags'])) self.assertFalse('end_ts' in json['tags'][0]) previous_image_id = json['tags'][1]['docker_image_id'] self.postJsonResponse(RevertTag, params=dict(repository=ADMIN_ACCESS_USER + '/history', tag='latest'), data=dict(image=previous_image_id)) json = self.getJsonResponse(ListRepositoryTags, params=dict(repository=ADMIN_ACCESS_USER + '/history', tag='latest')) self.assertEquals(3, len(json['tags'])) self.assertFalse('end_ts' in json['tags'][0]) self.assertEquals(previous_image_id, json['tags'][0]['docker_image_id']) class TestListAndDeleteTag(ApiTestCase): def test_invalid_tags(self): self.login(ADMIN_ACCESS_USER) # List the images for staging. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging')) staging_images = json['images'] # Try to add some invalid tags. self.putResponse(RepositoryTag, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='-fail'), data=dict(image=staging_images[0]['id']), expected_code=400) self.putResponse(RepositoryTag, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='北京'), data=dict(image=staging_images[0]['id']), expected_code=400) def test_listdeletecreateandmovetag(self): self.login(ADMIN_ACCESS_USER) # List the images for prod. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) prod_images = json['images'] assert len(prod_images) > 0 # List the images for staging. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging')) staging_images = json['images'] assert len(prod_images) == len(staging_images) + 1 # Delete prod. self.deleteEmptyResponse(RepositoryTag, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'), expected_code=204) # Make sure the tag is gone. self.getResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'), expected_code=404) # Make the sure the staging images are still there. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging')) self.assertEquals(staging_images, json['images']) # Require a valid tag name. self.putResponse(RepositoryTag, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='-fail'), data=dict(image=staging_images[0]['id']), expected_code=400) # Add a new tag to the staging image. self.putResponse(RepositoryTag, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'), data=dict(image=staging_images[0]['id']), expected_code=201) # Make sure the tag is present. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag')) sometag_images = json['images'] self.assertEquals(sometag_images, staging_images) # Move the tag. self.putResponse(RepositoryTag, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'), data=dict(image=staging_images[-1]['id']), expected_code=201) # Make sure the tag has moved. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag')) sometag_new_images = json['images'] self.assertEquals(1, len(sometag_new_images)) self.assertEquals(staging_images[-1], sometag_new_images[0]) def test_deletesubtag(self): self.login(ADMIN_ACCESS_USER) # List the images for prod. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) prod_images = json['images'] assert len(prod_images) > 0 # Delete staging. self.deleteEmptyResponse(RepositoryTag, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'), expected_code=204) # Make sure the prod images are still around. json = self.getJsonResponse(RepositoryTagImages, params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod')) self.assertEquals(prod_images, json['images']) def test_listtagpagination(self): self.login(ADMIN_ACCESS_USER) latest_image = model.tag.get_tag_image(ADMIN_ACCESS_USER, "complex", "prod") # Create 10 tags in an empty repo. user = model.user.get_user_or_org(ADMIN_ACCESS_USER) repo = model.repository.create_repository(ADMIN_ACCESS_USER, "empty", user) image = model.image.find_create_or_link_image(latest_image.docker_image_id, repo, ADMIN_ACCESS_USER, {}, ['local_us']) remaining_tags = set() for i in xrange(1, 11): tag_name = "tag" + str(i) remaining_tags.add(tag_name) model.tag.create_or_update_tag(ADMIN_ACCESS_USER, "empty", tag_name, image.docker_image_id) # Make sure we can iterate over all of them. json = self.getJsonResponse(ListRepositoryTags, params=dict(repository=ADMIN_ACCESS_USER + '/empty', page=1, limit=5)) self.assertEquals(1, json['page']) self.assertEquals(5, len(json['tags'])) self.assertTrue(json['has_additional']) names = {tag['name'] for tag in json['tags']} remaining_tags = remaining_tags - names self.assertEquals(5, len(remaining_tags)) json = self.getJsonResponse(ListRepositoryTags, params=dict(repository=ADMIN_ACCESS_USER + '/empty', page=2, limit=5)) self.assertEquals(2, json['page']) self.assertEquals(5, len(json['tags'])) self.assertFalse(json['has_additional']) names = {tag['name'] for tag in json['tags']} remaining_tags = remaining_tags - names self.assertEquals(0, len(remaining_tags)) json = self.getJsonResponse(ListRepositoryTags, params=dict(repository=ADMIN_ACCESS_USER + '/empty', page=3, limit=5)) self.assertEquals(3, json['page']) self.assertEquals(0, len(json['tags'])) self.assertFalse(json['has_additional']) class TestRepoPermissions(ApiTestCase): def listUserPermissions(self, namespace=ADMIN_ACCESS_USER, repo='simple'): return self.getJsonResponse(RepositoryUserPermissionList, params=dict(repository=namespace + '/' + repo))['permissions'] def listTeamPermissions(self): response = self.getJsonResponse(RepositoryTeamPermissionList, params=dict(repository=ORGANIZATION + '/' + ORG_REPO)) return response['permissions'] def test_userpermissions_underorg(self): self.login(ADMIN_ACCESS_USER) permissions = self.listUserPermissions(namespace=ORGANIZATION, repo=ORG_REPO) self.assertEquals(1, len(permissions)) assert 'outsideorg' in permissions self.assertEquals('read', permissions['outsideorg']['role']) self.assertEquals(False, permissions['outsideorg']['is_org_member']) # Add another user. self.putJsonResponse(RepositoryUserPermission, params=dict(repository=ORGANIZATION + '/' + ORG_REPO, username=ADMIN_ACCESS_USER), data=dict(role='admin')) # Verify the user is present. permissions = self.listUserPermissions(namespace=ORGANIZATION, repo=ORG_REPO) self.assertEquals(2, len(permissions)) assert ADMIN_ACCESS_USER in permissions self.assertEquals('admin', permissions[ADMIN_ACCESS_USER]['role']) self.assertEquals(True, permissions[ADMIN_ACCESS_USER]['is_org_member']) def test_userpermissions(self): self.login(ADMIN_ACCESS_USER) # The repo should start with just the admin as a user perm. permissions = self.listUserPermissions() self.assertEquals(1, len(permissions)) assert ADMIN_ACCESS_USER in permissions self.assertEquals('admin', permissions[ADMIN_ACCESS_USER]['role']) self.assertFalse('is_org_member' in permissions[ADMIN_ACCESS_USER]) # Add another user. self.putJsonResponse(RepositoryUserPermission, params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER), data=dict(role='read')) # Verify the user is present. permissions = self.listUserPermissions() self.assertEquals(2, len(permissions)) assert NO_ACCESS_USER in permissions self.assertEquals('read', permissions[NO_ACCESS_USER]['role']) self.assertFalse('is_org_member' in permissions[NO_ACCESS_USER]) json = self.getJsonResponse(RepositoryUserPermission, params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER)) self.assertEquals('read', json['role']) # Change the user's permissions. self.putJsonResponse(RepositoryUserPermission, params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER), data=dict(role='admin')) # Verify. permissions = self.listUserPermissions() self.assertEquals(2, len(permissions)) assert NO_ACCESS_USER in permissions self.assertEquals('admin', permissions[NO_ACCESS_USER]['role']) # Delete the user's permission. self.deleteEmptyResponse(RepositoryUserPermission, params=dict(repository=ADMIN_ACCESS_USER + '/simple', username=NO_ACCESS_USER)) # Verify. permissions = self.listUserPermissions() self.assertEquals(1, len(permissions)) assert not NO_ACCESS_USER in permissions def test_teampermissions(self): self.login(ADMIN_ACCESS_USER) # The repo should start with just the readers as a team perm. permissions = self.listTeamPermissions() self.assertEquals(1, len(permissions)) assert 'readers' in permissions self.assertEquals('read', permissions['readers']['role']) # Add another team. self.putJsonResponse(RepositoryTeamPermission, params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'), data=dict(role='write')) # Verify the team is present. permissions = self.listTeamPermissions() self.assertEquals(2, len(permissions)) assert 'owners' in permissions self.assertEquals('write', permissions['owners']['role']) json = self.getJsonResponse(RepositoryTeamPermission, params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners')) self.assertEquals('write', json['role']) # Change the team's permissions. self.putJsonResponse(RepositoryTeamPermission, params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'), data=dict(role='admin')) # Verify. permissions = self.listTeamPermissions() self.assertEquals(2, len(permissions)) assert 'owners' in permissions self.assertEquals('admin', permissions['owners']['role']) # Delete the team's permission. self.deleteEmptyResponse(RepositoryTeamPermission, params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners')) # Verify. permissions = self.listTeamPermissions() self.assertEquals(1, len(permissions)) assert not 'owners' in permissions class TestApiTokens(ApiTestCase): def listTokens(self): return self.getJsonResponse(RepositoryTokenList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'))['tokens'] def test_tokens(self): self.login(ADMIN_ACCESS_USER) # Create a new token. json = self.postJsonResponse(RepositoryTokenList, params=dict(repository=ADMIN_ACCESS_USER + '/simple'), data=dict(role='read', friendlyName='mytoken'), expected_code=201) self.assertEquals('mytoken', json['friendlyName']) self.assertEquals('read', json['role']) token_code = json['code'] # Verify. tokens = self.listTokens() assert token_code in tokens self.assertEquals('mytoken', tokens[token_code]['friendlyName']) json = self.getJsonResponse(RepositoryToken, params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) self.assertEquals(tokens[token_code], json) # Change the token's permission. self.putJsonResponse(RepositoryToken, params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code), data=dict(role='write')) # Verify. json = self.getJsonResponse(RepositoryToken, params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) self.assertEquals('write', json['role']) # Delete the token. self.deleteEmptyResponse(RepositoryToken, params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code)) # Verify. self.getResponse(RepositoryToken, params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code), expected_code=404) class TestUserCard(ApiTestCase): def test_getusercard(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(UserCard) self.assertEquals('4242', json['card']['last4']) self.assertEquals('Visa', json['card']['type']) def test_setusercard_error(self): self.login(ADMIN_ACCESS_USER) json = self.postJsonResponse(UserCard, data=dict(token='sometoken'), expected_code=402) assert 'carderror' in json class TestOrgCard(ApiTestCase): def test_getorgcard(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrganizationCard, params=dict(orgname=ORGANIZATION)) self.assertEquals('4242', json['card']['last4']) self.assertEquals('Visa', json['card']['type']) class TestUserSubscription(ApiTestCase): def getSubscription(self): return self.getJsonResponse(UserPlan) def test_updateplan(self): self.login(ADMIN_ACCESS_USER) # Change the plan. self.putJsonResponse(UserPlan, data=dict(plan='free')) # Verify sub = self.getSubscription() self.assertEquals('free', sub['plan']) # Change the plan. self.putJsonResponse(UserPlan, data=dict(plan='bus-large-30')) # Verify sub = self.getSubscription() self.assertEquals('bus-large-30', sub['plan']) class TestOrgSubscription(ApiTestCase): def getSubscription(self): return self.getJsonResponse(OrganizationPlan, params=dict(orgname=ORGANIZATION)) def test_updateplan(self): self.login(ADMIN_ACCESS_USER) # Change the plan. self.putJsonResponse(OrganizationPlan, params=dict(orgname=ORGANIZATION), data=dict(plan='free')) # Verify sub = self.getSubscription() self.assertEquals('free', sub['plan']) # Change the plan. self.putJsonResponse(OrganizationPlan, params=dict(orgname=ORGANIZATION), data=dict(plan='bus-large-30')) # Verify sub = self.getSubscription() self.assertEquals('bus-large-30', sub['plan']) class TestUserRobots(ApiTestCase): def getRobotNames(self): return [r['name'] for r in self.getJsonResponse(UserRobotList)['robots']] def test_robot_list(self): self.login(NO_ACCESS_USER) # Create some robots. self.putJsonResponse(UserRobot, params=dict(robot_shortname='bender'), expected_code=201) self.putJsonResponse(UserRobot, params=dict(robot_shortname='goldy'), expected_code=201) self.putJsonResponse(UserRobot, params=dict(robot_shortname='coolbot'), expected_code=201) # Queries: Base + the lookup query with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 1): self.getJsonResponse(UserRobotList) # Queries: Base + the lookup query with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 1): self.getJsonResponse(UserRobotList, params=dict(permissions=True)) def test_robots(self): self.login(NO_ACCESS_USER) # Create a robot. json = self.putJsonResponse(UserRobot, params=dict(robot_shortname='bender'), expected_code=201) self.assertEquals(NO_ACCESS_USER + '+bender', json['name']) # Verify. robots = self.getRobotNames() assert NO_ACCESS_USER + '+bender' in robots # Delete the robot. self.deleteEmptyResponse(UserRobot, params=dict(robot_shortname='bender')) # Verify. robots = self.getRobotNames() assert not NO_ACCESS_USER + '+bender' in robots def test_regenerate(self): self.login(NO_ACCESS_USER) # Create a robot. json = self.putJsonResponse(UserRobot, params=dict(robot_shortname='bender'), expected_code=201) token = json['token'] # Regenerate the robot. json = self.postJsonResponse(RegenerateUserRobot, params=dict(robot_shortname='bender'), expected_code=200) # Verify the token changed. self.assertNotEquals(token, json['token']) json2 = self.getJsonResponse(UserRobot, params=dict(robot_shortname='bender'), expected_code=200) self.assertEquals(json['token'], json2['token']) class TestOrgRobots(ApiTestCase): def getRobotNames(self): return [r['name'] for r in self.getJsonResponse(OrgRobotList, params=dict(orgname=ORGANIZATION))['robots']] def test_create_robot_with_underscores(self): self.login(ADMIN_ACCESS_USER) # Create the robot. self.putJsonResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='mr_bender'), expected_code=201) # Add the robot to a team. membername = ORGANIZATION + '+mr_bender' self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='readers', membername=membername)) # Retrieve the robot's details. self.getJsonResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='mr_bender'), expected_code=200) def test_delete_robot_after_use(self): self.login(ADMIN_ACCESS_USER) # Create the robot. self.putJsonResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender'), expected_code=201) # Add the robot to a team. membername = ORGANIZATION + '+bender' self.putJsonResponse(TeamMember, params=dict(orgname=ORGANIZATION, teamname='readers', membername=membername)) # Add a repository permission. self.putJsonResponse(RepositoryUserPermission, params=dict(repository=ORGANIZATION + '/' + ORG_REPO, username=membername), data=dict(role='read')) # Add a permission prototype with the robot as the activating user. self.postJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION), data=dict(role='read', activating_user={'name': membername}, delegate={'kind': 'user', 'name': membername})) # Add a permission prototype with the robot as the delegating user. self.postJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION), data=dict(role='read', delegate={'kind': 'user', 'name': membername})) # Add a build trigger with the robot as the pull robot. database.BuildTriggerService.create(name='fakeservice') # Add a new fake trigger. repo = model.repository.get_repository(ORGANIZATION, ORG_REPO) user = model.user.get_user(ADMIN_ACCESS_USER) pull_robot = model.user.get_user(membername) trigger = model.build.create_build_trigger(repo, 'fakeservice', 'sometoken', user, pull_robot=pull_robot) # Add a fake build of the fake build trigger. token = model.token.create_access_token(repo, 'write', kind='build-worker', friendly_name='Repository Build Token') build = model.build.create_repository_build(repo, token, {}, 'fake-dockerfile', 'fake-name', trigger, pull_robot_name=membername) # Add some log entries for the robot. model.log.log_action('pull_repo', ORGANIZATION, performer=pull_robot, repository=repo) # Delete the robot and verify it works. self.deleteEmptyResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender')) # Verify the build is still present. self.assertIsNotNone(model.build.get_repository_build(build.uuid)) # All the above records should now be deleted, along with the robot. We verify a few of the # critical ones below. # Check the team. team = model.team.get_organization_team(ORGANIZATION, 'readers') members = [member.username for member in model.organization.get_organization_team_members(team.id)] self.assertFalse(membername in members) # Check the robot itself. self.assertIsNone(model.user.get_user(membername)) def test_robots(self): self.login(ADMIN_ACCESS_USER) # Create a robot. json = self.putJsonResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender'), expected_code=201) self.assertEquals(ORGANIZATION + '+bender', json['name']) # Verify. robots = self.getRobotNames() assert ORGANIZATION + '+bender' in robots # Delete the robot. self.deleteEmptyResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender')) # Verify. robots = self.getRobotNames() assert not ORGANIZATION + '+bender' in robots def test_regenerate(self): self.login(ADMIN_ACCESS_USER) # Create a robot. json = self.putJsonResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender'), expected_code=201) token = json['token'] # Regenerate the robot. json = self.postJsonResponse(RegenerateOrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender'), expected_code=200) # Verify the token changed. self.assertNotEquals(token, json['token']) json2 = self.getJsonResponse(OrgRobot, params=dict(orgname=ORGANIZATION, robot_shortname='bender'), expected_code=200) self.assertEquals(json['token'], json2['token']) class TestLogs(ApiTestCase): def test_repo_logs(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(RepositoryLogs, params=dict(repository='devtable/simple')) assert 'logs' in json assert 'start_time' in json assert 'end_time' in json def test_repo_logs_crossyear(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(RepositoryLogs, params=dict(repository='devtable/simple', starttime='12/01/2016', endtime='1/09/2017')) self.assertEquals('Thu, 01 Dec 2016 00:00:00 -0000', json['start_time']) self.assertEquals('Tue, 10 Jan 2017 00:00:00 -0000', json['end_time']) def test_repo_aggregate_logs(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(RepositoryAggregateLogs, params=dict(repository='devtable/simple')) assert 'aggregated' in json assert len(json['aggregated']) > 0 def test_user_logs(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(UserLogs) assert 'logs' in json assert 'start_time' in json assert 'end_time' in json def test_org_logs(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrgLogs, params=dict(orgname=ORGANIZATION)) assert 'logs' in json assert 'start_time' in json assert 'end_time' in json def test_user_aggregate_logs(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(UserAggregateLogs) assert 'aggregated' in json def test_org_logs(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrgAggregateLogs, params=dict(orgname=ORGANIZATION)) assert 'aggregated' in json def test_performer(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrgLogs, params=dict(orgname=ORGANIZATION)) all_logs = json['logs'] json = self.getJsonResponse(OrgLogs, params=dict(performer=READ_ACCESS_USER, orgname=ORGANIZATION)) assert len(json['logs']) < len(all_logs) for log in json['logs']: self.assertEquals(READ_ACCESS_USER, log['performer']['name']) class TestApplicationInformation(ApiTestCase): def test_get_info(self): json = self.getJsonResponse(ApplicationInformation, params=dict(client_id=FAKE_APPLICATION_CLIENT_ID)) assert 'name' in json assert 'uri' in json assert 'organization' in json def test_get_invalid_info(self): self.getJsonResponse(ApplicationInformation, params=dict(client_id='invalid-code'), expected_code=404) class TestOrganizationApplications(ApiTestCase): def test_list_create_applications(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION)) self.assertEquals(2, len(json['applications'])) found = False for application in json['applications']: if application['client_id'] == FAKE_APPLICATION_CLIENT_ID: found = True break self.assertTrue(found) # Add a new application. json = self.postJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION), data=dict(name="Some cool app", description="foo")) self.assertEquals("Some cool app", json['name']) self.assertEquals("foo", json['description']) # Retrieve the apps list again list_json = self.getJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION)) self.assertEquals(3, len(list_json['applications'])) class TestOrganizationApplicationResource(ApiTestCase): def test_get_edit_delete_application(self): self.login(ADMIN_ACCESS_USER) # Retrieve the application. json = self.getJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) self.assertEquals(FAKE_APPLICATION_CLIENT_ID, json['client_id']) # Edit the application. edit_json = self.putJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID), data=dict(name="Some App", description="foo", application_uri="bar", redirect_uri="baz", avatar_email="meh")) self.assertEquals(FAKE_APPLICATION_CLIENT_ID, edit_json['client_id']) self.assertEquals("Some App", edit_json['name']) self.assertEquals("foo", edit_json['description']) self.assertEquals("bar", edit_json['application_uri']) self.assertEquals("baz", edit_json['redirect_uri']) self.assertEquals("meh", edit_json['avatar_email']) # Retrieve the application again. json = self.getJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) self.assertEquals(json, edit_json) # Delete the application. self.deleteEmptyResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) # Make sure the application is gone. self.getJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID), expected_code=404) class TestOrganization(ApiTestCase): def test_change_send_billing_invoice(self): self.login(ADMIN_ACCESS_USER) self.putJsonResponse(Organization, params=dict(orgname=ORGANIZATION), data=dict(invoice_email=False, invoice_email_address=None)) class TestOrganizationApplicationResetClientSecret(ApiTestCase): def test_reset_client_secret(self): self.login(ADMIN_ACCESS_USER) # Retrieve the application. json = self.getJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) self.assertEquals(FAKE_APPLICATION_CLIENT_ID, json['client_id']) # Reset the client secret. reset_json = self.postJsonResponse(OrganizationApplicationResetClientSecret, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) self.assertEquals(FAKE_APPLICATION_CLIENT_ID, reset_json['client_id']) self.assertNotEquals(reset_json['client_secret'], json['client_secret']) # Verify it was changed in the DB. json = self.getJsonResponse(OrganizationApplicationResource, params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID)) self.assertEquals(reset_json['client_secret'], json['client_secret']) class FakeBuildTrigger(BuildTriggerHandler): @classmethod def service_name(cls): return 'fakeservice' def list_build_sources(self): return [{'first': 'source'}, {'second': self.auth_token}] def list_build_subdirs(self): return [self.auth_token, 'foo', 'bar', self.config['somevalue']] def handle_trigger_request(self, request): prepared = PreparedBuild(self.trigger) prepared.build_name = 'build-name' prepared.tags = ['bar'] prepared.dockerfile_id = 'foo' prepared.subdirectory = 'subdir' prepared.metadata = {'foo': 'bar'} prepared.is_manual = False return prepared def is_active(self): return 'active' in self.config and self.config['active'] def activate(self, standard_webhook_url): self.config['active'] = True return self.config, {} def deactivate(self): self.config['active'] = False return self.config def manual_start(self, run_parameters=None): prepared = PreparedBuild(self.trigger) prepared.build_name = 'build-name' prepared.tags = ['bar'] prepared.dockerfile_id = 'foo' prepared.subdirectory = 'subdir' prepared.metadata = {'foo': 'bar'} prepared.is_manual = True return prepared def get_repository_url(self): return 'http://foo/' + self.config['build_source'] def load_dockerfile_contents(self): if not 'dockerfile' in self.config: return None return self.config['dockerfile'] def list_field_values(self, field_name, limit=None): if field_name == 'test_field': return [1, 2, 3] return None class TestBuildTriggers(ApiTestCase): def test_list_build_triggers(self): self.login(ADMIN_ACCESS_USER) # Check a repo with no known triggers. json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) self.assertEquals(0, len(json['triggers'])) # Check a repo with one known trigger. json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building')) self.assertEquals(1, len(json['triggers'])) trigger = json['triggers'][0] assert 'id' in trigger assert 'is_active' in trigger assert 'config' in trigger assert 'service' in trigger # Verify the get trigger method. trigger_json = self.getJsonResponse(BuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/building', trigger_uuid=trigger['id'])) self.assertEquals(trigger, trigger_json) # Check the recent builds for the trigger. builds_json = self.getJsonResponse(TriggerBuildList, params=dict(repository=ADMIN_ACCESS_USER + '/building', trigger_uuid=trigger['id'])) assert 'builds' in builds_json def test_delete_build_trigger(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building')) self.assertEquals(1, len(json['triggers'])) trigger = json['triggers'][0] # Delete the trigger. self.deleteEmptyResponse(BuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/building', trigger_uuid=trigger['id'])) # Verify it was deleted. json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/building')) self.assertEquals(0, len(json['triggers'])) def test_analyze_fake_trigger(self): self.login(ADMIN_ACCESS_USER) database.BuildTriggerService.create(name='fakeservice') # Add a new fake trigger. repo = model.repository.get_repository(ADMIN_ACCESS_USER, 'simple') user = model.user.get_user(ADMIN_ACCESS_USER) trigger = model.build.create_build_trigger(repo, 'fakeservice', 'sometoken', user) # Analyze the trigger's dockerfile: First, no dockerfile. trigger_config = {} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('error', analyze_json['status']) self.assertEquals('Could not read the Dockerfile for the trigger', analyze_json['message']) # Analyze the trigger's dockerfile: Second, missing FROM in dockerfile. trigger_config = {'dockerfile': 'MAINTAINER me'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('warning', analyze_json['status']) self.assertEquals('No FROM line found in the Dockerfile', analyze_json['message']) # Analyze the trigger's dockerfile: Third, dockerfile with public repo. trigger_config = {'dockerfile': 'FROM somerepo'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('publicbase', analyze_json['status']) # Analyze the trigger's dockerfile: Fourth, dockerfile with private repo with an invalid path. trigger_config = {'dockerfile': 'FROM localhost:5000/somepath'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('warning', analyze_json['status']) self.assertEquals('"localhost:5000/somepath" is not a valid Quay repository path', analyze_json['message']) # Analyze the trigger's dockerfile: Fifth, dockerfile with private repo that does not exist. trigger_config = {'dockerfile': 'FROM localhost:5000/nothere/randomrepo'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('error', analyze_json['status']) nofound = 'Repository "localhost:5000/%s/randomrepo" referenced by the Dockerfile was not found' self.assertEquals(nofound % 'nothere', analyze_json['message']) # Analyze the trigger's dockerfile: Sixth, dockerfile with private repo that the user cannot see trigger_config = {'dockerfile': 'FROM localhost:5000/randomuser/randomrepo'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('error', analyze_json['status']) self.assertEquals(nofound % 'randomuser', analyze_json['message']) # Analyze the trigger's dockerfile: Seventh, dockerfile with private repo that the user see. trigger_config = {'dockerfile': 'FROM localhost:5000/devtable/complex'} analyze_json = self.postJsonResponse(BuildTriggerAnalyze, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals('analyzed', analyze_json['status']) self.assertEquals('devtable', analyze_json['namespace']) self.assertEquals('complex', analyze_json['name']) self.assertEquals(False, analyze_json['is_public']) self.assertEquals(ADMIN_ACCESS_USER + '+dtrobot', analyze_json['robots'][0]['name']) def test_fake_trigger(self): self.login(ADMIN_ACCESS_USER) database.BuildTriggerService.create(name='fakeservice') # Add a new fake trigger. repo = model.repository.get_repository(ADMIN_ACCESS_USER, 'simple') user = model.user.get_user(ADMIN_ACCESS_USER) trigger = model.build.create_build_trigger(repo, 'fakeservice', 'sometoken', user) # Verify the trigger. json = self.getJsonResponse(BuildTriggerList, params=dict(repository=ADMIN_ACCESS_USER + '/simple')) self.assertEquals(1, len(json['triggers'])) self.assertEquals(trigger.uuid, json['triggers'][0]['id']) self.assertEquals(trigger.service.name, json['triggers'][0]['service']) self.assertEquals(False, json['triggers'][0]['is_active']) # List the trigger's sources. source_json = self.getJsonResponse(BuildTriggerSources, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid)) self.assertEquals([{'first': 'source'}, {'second': 'sometoken'}], source_json['sources']) # List the trigger's subdirs. subdir_json = self.postJsonResponse(BuildTriggerSubdirs, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'somevalue': 'meh'}) self.assertEquals({'status': 'success', 'subdir': ['sometoken', 'foo', 'bar', 'meh']}, subdir_json) # Activate the trigger. trigger_config = { 'build_source': 'somesource' } activate_json = self.postJsonResponse(BuildTriggerActivate, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}) self.assertEquals(True, activate_json['is_active']) # Make sure the trigger has a write token. trigger = model.build.get_build_trigger(trigger.uuid) self.assertNotEquals(None, trigger.write_token) self.assertEquals(True, py_json.loads(trigger.config)['active']) # Make sure we cannot activate again. self.postResponse(BuildTriggerActivate, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config}, expected_code=400) # Retrieve values for a field. result = self.postJsonResponse(BuildTriggerFieldValues, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid, field_name="test_field")) self.assertEquals(result['values'], [1, 2, 3]) self.postResponse(BuildTriggerFieldValues, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid, field_name="another_field"), expected_code=404) # Start a manual build. start_json = self.postJsonResponse(ActivateBuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data=dict(), expected_code=201) assert 'id' in start_json self.assertEquals("build-name", start_json['display_name']) self.assertEquals(['bar'], start_json['tags']) self.assertEquals('subdir', start_json['subdirectory']) self.assertEquals('somesource', start_json['trigger']['build_source']) # Verify the metadata was added. build_obj = database.RepositoryBuild.get(database.RepositoryBuild.uuid == start_json['id']) self.assertEquals('bar', py_json.loads(build_obj.job_config)['trigger_metadata']['foo']) # Start another manual build, with a ref. start_json = self.postJsonResponse(ActivateBuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data=dict(refs={'kind': 'branch', 'name': 'foobar'}), expected_code=201) def test_invalid_robot_account(self): self.login(ADMIN_ACCESS_USER) database.BuildTriggerService.create(name='fakeservice') # Add a new fake trigger. repo = model.repository.get_repository(ADMIN_ACCESS_USER, 'simple') user = model.user.get_user(ADMIN_ACCESS_USER) trigger = model.build.create_build_trigger(repo, 'fakeservice', 'sometoken', user) # Try to activate it with an invalid robot account. trigger_config = {} self.postJsonResponse(BuildTriggerActivate, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config': trigger_config, 'pull_robot': 'someinvalidrobot'}, expected_code=404) def test_unauthorized_robot_account(self): self.login(ADMIN_ACCESS_USER) database.BuildTriggerService.create(name='fakeservice') # Add a new fake trigger. repo = model.repository.get_repository(ADMIN_ACCESS_USER, 'simple') user = model.user.get_user(ADMIN_ACCESS_USER) trigger = model.build.create_build_trigger(repo, 'fakeservice', 'sometoken', user) # Try to activate it with a robot account in the wrong namespace. trigger_config = {} self.postJsonResponse(BuildTriggerActivate, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config':trigger_config, 'pull_robot':'freshuser+anotherrobot'}, expected_code=403) def test_robot_account(self): self.login(ADMIN_ACCESS_USER) database.BuildTriggerService.create(name='fakeservice') # Add a new fake trigger. repo = model.repository.get_repository(ADMIN_ACCESS_USER, 'simple') user = model.user.get_user(ADMIN_ACCESS_USER) trigger = model.build.create_build_trigger(repo, 'fakeservice', 'sometoken', user) # Try to activate it with a robot account. trigger_config = {} activate_json = self.postJsonResponse(BuildTriggerActivate, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data={'config':trigger_config, 'pull_robot':ADMIN_ACCESS_USER + '+dtrobot'}) # Verify that the robot was saved. self.assertEquals(True, activate_json['is_active']) self.assertEquals(ADMIN_ACCESS_USER + '+dtrobot', activate_json['pull_robot']['name']) # Start a manual build. start_json = self.postJsonResponse(ActivateBuildTrigger, params=dict(repository=ADMIN_ACCESS_USER + '/simple', trigger_uuid=trigger.uuid), data=dict(refs=dict(kind='branch', name='foobar')), expected_code=201) assert 'id' in start_json self.assertEquals("build-name", start_json['display_name']) self.assertEquals(['bar'], start_json['tags']) class TestUserAuthorizations(ApiTestCase): def test_list_get_delete_user_authorizations(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(UserAuthorizationList) self.assertEquals(1, len(json['authorizations'])) authorization = json['authorizations'][0] assert 'uuid' in authorization assert 'scopes' in authorization assert 'application' in authorization # Retrieve the authorization. get_json = self.getJsonResponse(UserAuthorization, params=dict(access_token_uuid=authorization['uuid'])) self.assertEquals(authorization, get_json) # Delete the authorization. self.deleteEmptyResponse(UserAuthorization, params=dict(access_token_uuid=authorization['uuid'])) # Verify it has been deleted. self.getJsonResponse(UserAuthorization, params=dict(access_token_uuid=authorization['uuid']), expected_code=404) class TestSuperUserLogs(ApiTestCase): def test_get_logs(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(SuperUserLogs) assert 'logs' in json assert len(json['logs']) > 0 class TestSuperUserList(ApiTestCase): def test_get_users(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(SuperUserList) assert 'users' in json assert len(json['users']) > 0 class TestSuperUserCreateInitialSuperUser(ApiTestCase): def test_create_superuser(self): data = { 'username': 'newsuper', 'password': 'password', 'email': 'jschorr+fake@devtable.com', } # Try to write before some config. Should 403. self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403) # Add some fake config. fake_config = { 'AUTHENTICATION_TYPE': 'Database', 'SECRET_KEY': 'fakekey', } self.putJsonResponse(SuperUserConfig, data=dict(config=fake_config, hostname='fakehost')) # Try to write with config. Should 403 since there are users in the DB. self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403) # Delete all users in the DB. for user in list(database.User.select()): model.user.delete_user(user, all_queues, force=True) # Create the superuser. self.postJsonResponse(SuperUserCreateInitialSuperUser, data=data) # Ensure the user exists in the DB. self.assertIsNotNone(model.user.get_user('newsuper')) # Ensure that the current user is newsuper. json = self.getJsonResponse(User) self.assertEquals('newsuper', json['username']) # Ensure that the current user is a superuser in the config. json = self.getJsonResponse(SuperUserConfig) self.assertEquals(['newsuper'], json['config']['SUPER_USERS']) # Ensure that the current user is a superuser in memory by trying to call an API # that will fail otherwise. self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) class TestSuperUserConfig(ApiTestCase): def test_get_status_update_config(self): # With no config the status should be 'upload-license'. json = self.getJsonResponse(SuperUserRegistryStatus) self.assertEquals('upload-license', json['status']) # And the config should 401. self.getResponse(SuperUserConfig, expected_code=401) # Add a fake license file. config_provider.save_license('something') # With no config but a license the status should be 'config-db'. json = self.getJsonResponse(SuperUserRegistryStatus) self.assertEquals('config-db', json['status']) # Add some fake config. fake_config = { 'AUTHENTICATION_TYPE': 'Database', 'SECRET_KEY': 'fakekey', } json = self.putJsonResponse(SuperUserConfig, data=dict(config=fake_config, hostname='fakehost')) self.assertEquals('fakekey', json['config']['SECRET_KEY']) self.assertEquals('fakehost', json['config']['SERVER_HOSTNAME']) self.assertEquals('Database', json['config']['AUTHENTICATION_TYPE']) # With config the status should be 'setup-db'. json = self.getJsonResponse(SuperUserRegistryStatus) self.assertEquals('setup-db', json['status']) def test_config_file(self): # Try without an account. Should 403. self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403) # Login to a superuser. self.login(ADMIN_ACCESS_USER) # Try for an invalid file. Should 404. self.getResponse(SuperUserConfigFile, params=dict(filename='foobar'), expected_code=404) # Try for a valid filename. Should not exist. json = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) self.assertFalse(json['exists']) # Add the file. self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), file=(StringIO('my file contents'), 'ssl.cert')) # Should now exist. json = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert')) self.assertTrue(json['exists']) def test_update_with_external_auth(self): self.login(ADMIN_ACCESS_USER) # Run a mock LDAP. mockldap = MockLdap({ 'dc=quay,dc=io': {'dc': ['quay', 'io']}, 'ou=employees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'employees' }, 'uid=' + ADMIN_ACCESS_USER + ',ou=employees,dc=quay,dc=io': { 'dc': ['quay', 'io'], 'ou': 'employees', 'uid': [ADMIN_ACCESS_USER], 'userPassword': ['password'], 'mail': [ADMIN_ACCESS_EMAIL], }, }) config = { 'AUTHENTICATION_TYPE': 'LDAP', 'LDAP_BASE_DN': ['dc=quay', 'dc=io'], 'LDAP_ADMIN_DN': 'uid=devtable,ou=employees,dc=quay,dc=io', 'LDAP_ADMIN_PASSWD': 'password', 'LDAP_USER_RDN': ['ou=employees'], 'LDAP_UID_ATTR': 'uid', 'LDAP_EMAIL_ATTR': 'mail', } mockldap.start() try: # Try writing some config with an invalid password. self.putResponse(SuperUserConfig, data={'config': config, 'hostname': 'foo'}, expected_code=400) self.putResponse(SuperUserConfig, data={'config': config, 'password': 'invalid', 'hostname': 'foo'}, expected_code=400) # Write the config with the valid password. self.putResponse(SuperUserConfig, data={'config': config, 'password': 'password', 'hostname': 'foo'}, expected_code=200) # Ensure that the user row has been linked. self.assertEquals(ADMIN_ACCESS_USER, model.user.verify_federated_login('ldap', ADMIN_ACCESS_USER).username) finally: mockldap.stop() class TestRepositoryImageSecurity(ApiTestCase): def test_get_vulnerabilities(self): self.login(ADMIN_ACCESS_USER) tag = model.tag.get_active_tag(ADMIN_ACCESS_USER, 'simple', 'latest') layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, 'simple', 'latest') tag_manifest = database.TagManifest.get(tag=tag) # Grab the security info for the tag. It should be queued. manifest_response = self.getJsonResponse(RepositoryManifestSecurity, params=dict(repository=ADMIN_ACCESS_USER + '/simple', manifestref=tag_manifest.digest, vulnerabilities='true')) image_response = self.getJsonResponse(RepositoryImageSecurity, params=dict(repository=ADMIN_ACCESS_USER + '/simple', imageid=layer.docker_image_id, vulnerabilities='true')) self.assertEquals(manifest_response, image_response) self.assertEquals('queued', image_response['status']) # Mark the layer as indexed. layer.security_indexed = True layer.security_indexed_engine = app.config['SECURITY_SCANNER_ENGINE_VERSION_TARGET'] layer.save() # Grab the security info again. with fake_security_scanner() as security_scanner: security_scanner.add_layer(security_scanner.layer_id(layer)) manifest_response = self.getJsonResponse(RepositoryManifestSecurity, params=dict(repository=ADMIN_ACCESS_USER + '/simple', manifestref=tag_manifest.digest, vulnerabilities='true')) image_response = self.getJsonResponse(RepositoryImageSecurity, params=dict(repository=ADMIN_ACCESS_USER + '/simple', imageid=layer.docker_image_id, vulnerabilities='true')) self.assertEquals(manifest_response, image_response) self.assertEquals('scanned', image_response['status']) self.assertEquals(1, image_response['data']['Layer']['IndexedByVersion']) def test_get_vulnerabilities_read_failover(self): with ConfigForTesting(): self.login(ADMIN_ACCESS_USER) # Get a layer and mark it as indexed. layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, 'simple', 'latest') layer.security_indexed = True layer.security_indexed_engine = app.config['SECURITY_SCANNER_ENGINE_VERSION_TARGET'] layer.save() with fake_security_scanner(hostname='failoverscanner') as security_scanner: # Query the wrong security scanner URL without failover. self.getResponse(RepositoryImageSecurity, params=dict(repository=ADMIN_ACCESS_USER + '/simple', imageid=layer.docker_image_id, vulnerabilities='true'), expected_code=520) # Set the failover URL. app.config['SECURITY_SCANNER_READONLY_FAILOVER_ENDPOINTS'] = ['http://failoverscanner'] # Configure the API to return 200 for this layer. layer_id = security_scanner.layer_id(layer) security_scanner.set_ok_layer_id(layer_id) # Call the API and succeed on failover. self.getResponse(RepositoryImageSecurity, params=dict(repository=ADMIN_ACCESS_USER + '/simple', imageid=layer.docker_image_id, vulnerabilities='true'), expected_code=200) class TestSuperUserCustomCertificates(ApiTestCase): def test_custom_certificates(self): self.login(ADMIN_ACCESS_USER) # Upload a certificate. cert_contents, _ = generate_test_cert(hostname='somecoolhost', san_list=['DNS:bar', 'DNS:baz']) self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert'), file=(StringIO(cert_contents), 'testcert'), expected_code=204) # Make sure it is present. json = self.getJsonResponse(SuperUserCustomCertificates) self.assertEquals(1, len(json['certs'])) cert_info = json['certs'][0] self.assertEquals('testcert', cert_info['path']) self.assertEquals(set(['somecoolhost', 'bar', 'baz']), set(cert_info['names'])) self.assertFalse(cert_info['expired']) # Remove the certificate. self.deleteResponse(SuperUserCustomCertificate, params=dict(certpath='testcert')) # Make sure it is gone. json = self.getJsonResponse(SuperUserCustomCertificates) self.assertEquals(0, len(json['certs'])) def test_expired_custom_certificate(self): self.login(ADMIN_ACCESS_USER) # Upload a certificate. cert_contents, _ = generate_test_cert(hostname='somecoolhost', expires=-10) self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert'), file=(StringIO(cert_contents), 'testcert'), expected_code=204) # Make sure it is present. json = self.getJsonResponse(SuperUserCustomCertificates) self.assertEquals(1, len(json['certs'])) cert_info = json['certs'][0] self.assertEquals('testcert', cert_info['path']) self.assertEquals(set(['somecoolhost']), set(cert_info['names'])) self.assertTrue(cert_info['expired']) def test_invalid_custom_certificate(self): self.login(ADMIN_ACCESS_USER) # Upload an invalid certificate. self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert'), file=(StringIO('some contents'), 'testcert'), expected_code=204) # Make sure it is present but invalid. json = self.getJsonResponse(SuperUserCustomCertificates) self.assertEquals(1, len(json['certs'])) cert_info = json['certs'][0] self.assertEquals('testcert', cert_info['path']) self.assertEquals('no start line', cert_info['error']) def test_path_sanitization(self): self.login(ADMIN_ACCESS_USER) # Upload a certificate. cert_contents, _ = generate_test_cert(hostname='somecoolhost', expires=-10) self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert/../foobar'), file=(StringIO(cert_contents), 'testcert/../foobar'), expected_code=204) # Make sure it is present. json = self.getJsonResponse(SuperUserCustomCertificates) self.assertEquals(1, len(json['certs'])) cert_info = json['certs'][0] self.assertEquals('foobar', cert_info['path']) class TestSuperUserTakeOwnership(ApiTestCase): def test_take_ownership_superuser(self): self.login(ADMIN_ACCESS_USER) # Should fail to take ownership of a superuser. self.postResponse(SuperUserTakeOwnership, params=dict(namespace=ADMIN_ACCESS_USER), expected_code=400) def test_take_ownership_invalid_namespace(self): self.login(ADMIN_ACCESS_USER) self.postResponse(SuperUserTakeOwnership, params=dict(namespace='invalid'), expected_code=404) def test_take_ownership_non_superuser(self): self.login(READ_ACCESS_USER) self.postResponse(SuperUserTakeOwnership, params=dict(namespace='freshuser'), expected_code=403) def test_take_ownership_user(self): self.login(ADMIN_ACCESS_USER) with assert_action_logged('take_ownership'): # Take ownership of the read user. self.postResponse(SuperUserTakeOwnership, params=dict(namespace=READ_ACCESS_USER)) # Ensure that the read access user is now an org, with the superuser as the owner. reader = model.user.get_user_or_org(READ_ACCESS_USER) self.assertTrue(reader.organization) usernames = [admin.username for admin in model.organization.get_admin_users(reader)] self.assertIn(ADMIN_ACCESS_USER, usernames) def test_take_ownership_org(self): # Create a new org with another user as owner. public_user = model.user.get_user(PUBLIC_USER) org = model.organization.create_organization('someorg', 'some@example.com', public_user) # Ensure that the admin is not yet owner of the org. usernames = [admin.username for admin in model.organization.get_admin_users(org)] self.assertNotIn(ADMIN_ACCESS_USER, usernames) with assert_action_logged('take_ownership'): # Take ownership. self.login(ADMIN_ACCESS_USER) self.postResponse(SuperUserTakeOwnership, params=dict(namespace='someorg')) # Ensure now in the admin users. usernames = [admin.username for admin in model.organization.get_admin_users(org)] self.assertIn(ADMIN_ACCESS_USER, usernames) class TestSuperUserKeyManagement(ApiTestCase): def test_get_update_keys(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(SuperUserServiceKeyManagement) key_count = len(json['keys']) key = json['keys'][0] self.assertTrue('name' in key) self.assertTrue('service' in key) self.assertTrue('kid' in key) self.assertTrue('created_date' in key) self.assertTrue('expiration_date' in key) self.assertTrue('jwk' in key) self.assertTrue('approval' in key) self.assertTrue('metadata' in key) with assert_action_logged('service_key_modify'): # Update the key's name. self.putJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']), data=dict(name='somenewname')) # Ensure the key's name has been changed. json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid'])) self.assertEquals('somenewname', json['name']) with assert_action_logged('service_key_modify'): # Update the key's metadata. self.putJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']), data=dict(metadata=dict(foo='bar'))) # Ensure the key's metadata has been changed. json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid'])) self.assertEquals('bar', json['metadata']['foo']) with assert_action_logged('service_key_extend'): # Change the key's expiration. self.putJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']), data=dict(expiration=None)) # Ensure the key's expiration has been changed. json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid'])) self.assertIsNone(json['expiration_date']) with assert_action_logged('service_key_delete'): # Delete the key. self.deleteEmptyResponse(SuperUserServiceKey, params=dict(kid=key['kid'])) # Ensure the key no longer exists. self.getResponse(SuperUserServiceKey, params=dict(kid=key['kid']), expected_code=404) json = self.getJsonResponse(SuperUserServiceKeyManagement) self.assertEquals(key_count - 1, len(json['keys'])) def test_approve_key(self): self.login(ADMIN_ACCESS_USER) # Ensure the key is not yet approved. json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid='kid3')) self.assertEquals('unapprovedkey', json['name']) self.assertIsNone(json['approval']) # Approve the key. with assert_action_logged('service_key_approve'): self.postResponse(SuperUserServiceKeyApproval, params=dict(kid='kid3'), data=dict(notes='testapprove'), expected_code=201) # Ensure the key is approved. json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid='kid3')) self.assertEquals('unapprovedkey', json['name']) self.assertIsNotNone(json['approval']) self.assertEquals('ServiceKeyApprovalType.SUPERUSER', json['approval']['approval_type']) self.assertEquals(ADMIN_ACCESS_USER, json['approval']['approver']['username']) self.assertEquals('testapprove', json['approval']['notes']) def test_approve_preapproved(self): self.login(ADMIN_ACCESS_USER) new_key = { 'service': 'coolservice', 'name': 'mynewkey', 'metadata': dict(foo='baz'), 'notes': 'whazzup!?', 'expiration': timegm((datetime.datetime.now() + datetime.timedelta(days=1)).utctimetuple()), } # Create the key (preapproved automatically) json = self.postJsonResponse(SuperUserServiceKeyManagement, data=new_key) # Try to approve again. self.postResponse(SuperUserServiceKeyApproval, params=dict(kid=json['kid']), expected_code=201) def test_create_key(self): self.login(ADMIN_ACCESS_USER) new_key = { 'service': 'coolservice', 'name': 'mynewkey', 'metadata': dict(foo='baz'), 'notes': 'whazzup!?', 'expiration': timegm((datetime.datetime.now() + datetime.timedelta(days=1)).utctimetuple()), } with assert_action_logged('service_key_create'): # Create the key. json = self.postJsonResponse(SuperUserServiceKeyManagement, data=new_key) self.assertEquals('mynewkey', json['name']) self.assertTrue('kid' in json) self.assertTrue('public_key' in json) self.assertTrue('private_key' in json) # Verify the private key is a valid PEM. serialization.load_pem_private_key(json['private_key'].encode('utf-8'), None, default_backend()) # Verify the key. kid = json['kid'] json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=kid)) self.assertEquals('mynewkey', json['name']) self.assertEquals('coolservice', json['service']) self.assertEquals('baz', json['metadata']['foo']) self.assertEquals(kid, json['kid']) self.assertIsNotNone(json['approval']) self.assertEquals('ServiceKeyApprovalType.SUPERUSER', json['approval']['approval_type']) self.assertEquals(ADMIN_ACCESS_USER, json['approval']['approver']['username']) self.assertEquals('whazzup!?', json['approval']['notes']) class TestRepositoryManifestLabels(ApiTestCase): def test_basic_labels(self): self.login(ADMIN_ACCESS_USER) # Find the manifest digest for the prod tag in the complex repo. tag_manifest = model.tag.load_tag_manifest(ADMIN_ACCESS_USER, 'complex', 'prod') repository = ADMIN_ACCESS_USER + '/complex' # Check the existing labels on the complex repo, which should be empty json = self.getJsonResponse(RepositoryManifestLabels, params=dict(repository=repository, manifestref=tag_manifest.digest)) self.assertEquals(0, len(json['labels'])) # Add some labels to the manifest. with assert_action_logged('manifest_label_add'): label1 = self.postJsonResponse(RepositoryManifestLabels, params=dict(repository=repository, manifestref=tag_manifest.digest), data=dict(key='hello', value='world', media_type='text/plain'), expected_code=201) with assert_action_logged('manifest_label_add'): label2 = self.postJsonResponse(RepositoryManifestLabels, params=dict(repository=repository, manifestref=tag_manifest.digest), data=dict(key='hi', value='there', media_type='text/plain'), expected_code=201) with assert_action_logged('manifest_label_add'): label3 = self.postJsonResponse(RepositoryManifestLabels, params=dict(repository=repository, manifestref=tag_manifest.digest), data=dict(key='hello', value='someone', media_type='application/json'), expected_code=201) # Ensure we have *3* labels json = self.getJsonResponse(RepositoryManifestLabels, params=dict(repository=repository, manifestref=tag_manifest.digest)) self.assertEquals(3, len(json['labels'])) self.assertNotEquals(label2['label']['id'], label1['label']['id']) self.assertNotEquals(label3['label']['id'], label1['label']['id']) self.assertNotEquals(label2['label']['id'], label3['label']['id']) self.assertEquals('text/plain', label1['label']['media_type']) self.assertEquals('text/plain', label2['label']['media_type']) self.assertEquals('application/json', label3['label']['media_type']) # Ensure we can retrieve each of the labels. for label in json['labels']: label_json = self.getJsonResponse(ManageRepositoryManifestLabel, params=dict(repository=repository, manifestref=tag_manifest.digest, labelid=label['id'])) self.assertEquals(label['id'], label_json['id']) # Delete a label. with assert_action_logged('manifest_label_delete'): self.deleteEmptyResponse(ManageRepositoryManifestLabel, params=dict(repository=repository, manifestref=tag_manifest.digest, labelid=label1['label']['id'])) # Ensure the label is gone. json = self.getJsonResponse(RepositoryManifestLabels, params=dict(repository=repository, manifestref=tag_manifest.digest)) self.assertEquals(2, len(json['labels'])) # Check filtering. json = self.getJsonResponse(RepositoryManifestLabels, params=dict(repository=repository, manifestref=tag_manifest.digest, filter='hello')) self.assertEquals(1, len(json['labels'])) def test_prefixed_labels(self): self.login(ADMIN_ACCESS_USER) # Find the manifest digest for the prod tag in the complex repo. tag_manifest = model.tag.load_tag_manifest(ADMIN_ACCESS_USER, 'complex', 'prod') repository = ADMIN_ACCESS_USER + '/complex' self.postJsonResponse(RepositoryManifestLabels, params=dict(repository=repository, manifestref=tag_manifest.digest), data=dict(key='com.dockers.whatever', value='pants', media_type='text/plain'), expected_code=201) self.postJsonResponse(RepositoryManifestLabels, params=dict(repository=repository, manifestref=tag_manifest.digest), data=dict(key='my.cool.prefix.for.my.label', value='value', media_type='text/plain'), expected_code=201) def test_add_invalid_media_type(self): self.login(ADMIN_ACCESS_USER) tag_manifest = model.tag.load_tag_manifest(ADMIN_ACCESS_USER, 'complex', 'prod') repository = ADMIN_ACCESS_USER + '/complex' self.postResponse(RepositoryManifestLabels, params=dict(repository=repository, manifestref=tag_manifest.digest), data=dict(key='hello', value='world', media_type='some/invalid'), expected_code=400) def test_add_invalid_key(self): self.login(ADMIN_ACCESS_USER) tag_manifest = model.tag.load_tag_manifest(ADMIN_ACCESS_USER, 'complex', 'prod') repository = ADMIN_ACCESS_USER + '/complex' # Try to add an empty label key. self.postResponse(RepositoryManifestLabels, params=dict(repository=repository, manifestref=tag_manifest.digest), data=dict(key='', value='world'), expected_code=400) # Try to add an invalid label key. self.postResponse(RepositoryManifestLabels, params=dict(repository=repository, manifestref=tag_manifest.digest), data=dict(key='invalid___key', value='world'), expected_code=400) # Try to add a label key in a reserved namespace. self.postResponse(RepositoryManifestLabels, params=dict(repository=repository, manifestref=tag_manifest.digest), data=dict(key='io.docker.whatever', value='world'), expected_code=400) class TestSuperUserManagement(ApiTestCase): def test_get_user(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(SuperUserManagement, params=dict(username='freshuser')) self.assertEquals('freshuser', json['username']) self.assertEquals('jschorr+test@devtable.com', json['email']) self.assertEquals(False, json['super_user']) def test_delete_user(self): self.login(ADMIN_ACCESS_USER) # Verify the user exists. json = self.getJsonResponse(SuperUserManagement, params=dict(username='freshuser')) self.assertEquals('freshuser', json['username']) # Delete the user. self.deleteEmptyResponse(SuperUserManagement, params=dict(username='freshuser'), expected_code=204) # Verify the user no longer exists. self.getResponse(SuperUserManagement, params=dict(username='freshuser'), expected_code=404) def test_change_user_password(self): self.login(ADMIN_ACCESS_USER) # Verify the user exists. json = self.getJsonResponse(SuperUserManagement, params=dict(username='freshuser')) self.assertEquals('freshuser', json['username']) self.assertEquals('jschorr+test@devtable.com', json['email']) # Update the user. json = self.putJsonResponse(SuperUserManagement, params=dict(username='freshuser'), data=dict(password='somepassword')) self.assertTrue('encrypted_password' in json) def test_update_user(self): self.login(ADMIN_ACCESS_USER) # Verify the user exists. json = self.getJsonResponse(SuperUserManagement, params=dict(username='freshuser')) self.assertEquals('freshuser', json['username']) self.assertEquals('jschorr+test@devtable.com', json['email']) # Update the user. json = self.putJsonResponse(SuperUserManagement, params=dict(username='freshuser'), data=dict(email='foo@bar.com')) self.assertFalse('encrypted_password' in json) # Verify the user was updated. json = self.getJsonResponse(SuperUserManagement, params=dict(username='freshuser')) self.assertEquals('freshuser', json['username']) self.assertEquals('foo@bar.com', json['email']) def test_set_message(self): self.login(ADMIN_ACCESS_USER) # Create a message message = {"content": "new message", "severity": "info", "media_type": "text/plain"} self.postResponse(GlobalUserMessages, data=dict(message=message), expected_code=201) json = self.getJsonResponse(GlobalUserMessages) self.assertEquals(len(json['messages']), 3) has_matching_message = False for message in json["messages"]: new_message_match = message["content"] == "new message" severity_match = message["severity"] == "info" media_type_match = message["media_type"] == "text/plain" if new_message_match and severity_match and media_type_match: has_matching_message = True break self.assertTrue(has_matching_message, "Could not find matching message in: " + str(json["messages"])) self.assertNotEqual(json['messages'][0]["content"], json['messages'][2]["content"]) self.assertTrue(json['messages'][2]["uuid"]) def test_delete_message(self): self.login(ADMIN_ACCESS_USER) json = self.getJsonResponse(GlobalUserMessages) self.deleteEmptyResponse(GlobalUserMessage, {"uuid": json['messages'][0]['uuid']}, 204) json = self.getJsonResponse(GlobalUserMessages) self.assertEquals(len(json['messages']), 1) if __name__ == '__main__': unittest.main()