This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/test_api_usage.py

4945 lines
187 KiB
Python

# coding=utf-8
import unittest
import datetime
import logging
import time
import re
import json as py_json
from StringIO import StringIO
from calendar import timegm
from contextlib import contextmanager
from httmock import urlmatch, HTTMock, all_requests
from urllib import urlencode
from urlparse import urlparse, urlunparse, parse_qs
from playhouse.test_utils import assert_query_count, _QueryLogHandler
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from mockldap import MockLdap
from endpoints.api import api_bp, api
from endpoints.building import PreparedBuild
from endpoints.webhooks import webhooks
from app import app, config_provider, all_queues, dockerfile_build_queue, notification_queue
from buildtrigger.basehandler import BuildTriggerHandler
from initdb import setup_database_for_testing, finished_database_for_testing
from data import database, model
from data.database import RepositoryActionCount, Repository as RepositoryTable
from test.helpers import assert_action_logged
from util.secscan.fake import fake_security_scanner
from endpoints.api.team import (TeamMember, TeamMemberList, TeamMemberInvite, OrganizationTeam,
TeamPermissions, InviteTeamMember)
from endpoints.api.tag import RepositoryTagImages, RepositoryTag, RestoreTag, ListRepositoryTags
from endpoints.api.search import EntitySearch, ConductSearch
from endpoints.api.image import RepositoryImage, RepositoryImageList
from endpoints.api.build import RepositoryBuildStatus, RepositoryBuildList, RepositoryBuildResource
from endpoints.api.robot import (UserRobotList, OrgRobot, OrgRobotList, UserRobot,
RegenerateUserRobot, RegenerateOrgRobot)
from endpoints.api.trigger import (BuildTriggerActivate, BuildTriggerSources, BuildTriggerSubdirs,
TriggerBuildList, ActivateBuildTrigger, BuildTrigger,
BuildTriggerList, BuildTriggerAnalyze, BuildTriggerFieldValues,
BuildTriggerSourceNamespaces)
from endpoints.api.repoemail import RepositoryAuthorizedEmail
from endpoints.api.repositorynotification import (RepositoryNotification,
RepositoryNotificationList,
TestRepositoryNotification)
from endpoints.api.user import (PrivateRepositories, ConvertToOrganization, Signout, Signin, User,
UserAuthorizationList, UserAuthorization, UserNotification,
UserNotificationList, StarredRepositoryList, StarredRepository)
from endpoints.api.repotoken import RepositoryToken, RepositoryTokenList
from endpoints.api.prototype import PermissionPrototype, PermissionPrototypeList
from endpoints.api.logs import (UserLogs, OrgLogs, OrgAggregateLogs, UserAggregateLogs,
RepositoryLogs, RepositoryAggregateLogs)
from endpoints.api.billing import (UserCard, UserPlan, ListPlans, OrganizationCard,
OrganizationPlan)
from endpoints.api.discovery import DiscoveryResource
from endpoints.api.error import Error
from endpoints.api.organization import (OrganizationList, OrganizationMember,
OrgPrivateRepositories, OrganizationMemberList,
Organization, ApplicationInformation,
OrganizationApplications, OrganizationApplicationResource,
OrganizationApplicationResetClientSecret, Organization)
from endpoints.api.repository import (RepositoryList, RepositoryVisibility, Repository,
REPOS_PER_PAGE)
from endpoints.api.permission import (RepositoryUserPermission, RepositoryTeamPermission,
RepositoryTeamPermissionList, RepositoryUserPermissionList)
from endpoints.api.superuser import (SuperUserLogs, SuperUserList, SuperUserManagement,
SuperUserServiceKeyManagement, SuperUserServiceKey,
SuperUserServiceKeyApproval, SuperUserTakeOwnership,
SuperUserCustomCertificates, SuperUserCustomCertificate)
from endpoints.api.globalmessages import (GlobalUserMessage, GlobalUserMessages,)
from endpoints.api.secscan import RepositoryImageSecurity, RepositoryManifestSecurity
from endpoints.api.suconfig import (SuperUserRegistryStatus, SuperUserConfig, SuperUserConfigFile,
SuperUserCreateInitialSuperUser)
from endpoints.api.manifest import RepositoryManifestLabels, ManageRepositoryManifestLabel
from test.test_ssl_util import generate_test_cert
try:
app.register_blueprint(api_bp, url_prefix='/api')
except ValueError:
# This blueprint was already registered
pass
app.register_blueprint(webhooks, url_prefix='/webhooks')
# The number of queries we run for guests on API calls.
BASE_QUERY_COUNT = 0
# The number of queries we run for logged in users on API calls.
BASE_LOGGEDIN_QUERY_COUNT = BASE_QUERY_COUNT + 1
# The number of queries we run for logged in users on API calls that check
# access permissions.
BASE_PERM_ACCESS_QUERY_COUNT = BASE_LOGGEDIN_QUERY_COUNT + 2
NO_ACCESS_USER = 'freshuser'
READ_ACCESS_USER = 'reader'
ADMIN_ACCESS_USER = 'devtable'
PUBLIC_USER = 'public'
ADMIN_ACCESS_EMAIL = 'jschorr@devtable.com'
ORG_REPO = 'orgrepo'
ORGANIZATION = 'buynlarge'
NEW_USER_DETAILS = {
'username': 'bobby',
'password': 'password',
'email': 'bobby@tables.com',
}
FAKE_APPLICATION_CLIENT_ID = 'deadbeef'
CSRF_TOKEN_KEY = '_csrf_token'
CSRF_TOKEN = '123csrfforme'
class AppConfigChange(object):
""" AppConfigChange takes a dictionary that overrides the global app config
for a given block of code. The values are restored on exit. """
def __init__(self, changes=None):
self._changes = changes or {}
self._originals = {}
self._to_rm = []
def __enter__(self):
for key in self._changes.keys():
try:
self._originals[key] = app.config[key]
except KeyError:
self._to_rm.append(key)
app.config[key] = self._changes[key]
def __exit__(self, type, value, traceback):
for key in self._originals.keys():
app.config[key] = self._originals[key]
for key in self._to_rm:
del app.config[key]
class ApiTestCase(unittest.TestCase):
maxDiff = None
@staticmethod
def _add_csrf(without_csrf):
parts = urlparse(without_csrf)
query = parse_qs(parts[4])
query[CSRF_TOKEN_KEY] = CSRF_TOKEN
return urlunparse(list(parts[0:4]) + [urlencode(query)] + list(parts[5:]))
def url_for(self, resource_name, params=None, skip_csrf=False):
params = params or {}
url = api.url_for(resource_name, **params)
if not skip_csrf:
url = ApiTestCase._add_csrf(url)
return url
def setUp(self):
setup_database_for_testing(self)
self.app = app.test_client()
self.ctx = app.test_request_context()
self.ctx.__enter__()
self.setCsrfToken(CSRF_TOKEN)
def tearDown(self):
finished_database_for_testing(self)
config_provider.clear()
self.ctx.__exit__(True, None, None)
def setCsrfToken(self, token):
with self.app.session_transaction() as sess:
sess[CSRF_TOKEN_KEY] = token
@contextmanager
def toggleFeature(self, name, enabled):
import features
previous_value = getattr(features, name)
setattr(features, name, enabled)
yield
setattr(features, name, previous_value)
def getJsonResponse(self, resource_name, params={}, expected_code=200):
rv = self.app.get(api.url_for(resource_name, **params))
self.assertEquals(expected_code, rv.status_code)
data = rv.data
parsed = py_json.loads(data)
return parsed
def postResponse(self, resource_name, params={}, data={}, file=None, headers=None,
expected_code=200):
data = py_json.dumps(data)
headers = headers or {}
headers.update({"Content-Type": "application/json"})
if file is not None:
data = {'file': file}
headers = None
rv = self.app.post(self.url_for(resource_name, params), data=data, headers=headers)
self.assertEquals(rv.status_code, expected_code)
return rv.data
def getResponse(self, resource_name, params={}, expected_code=200):
rv = self.app.get(api.url_for(resource_name, **params))
self.assertEquals(rv.status_code, expected_code)
return rv.data
def putResponse(self, resource_name, params={}, data={}, expected_code=200):
rv = self.app.put(self.url_for(resource_name, params),
data=py_json.dumps(data),
headers={"Content-Type": "application/json"})
self.assertEquals(rv.status_code, expected_code)
return rv.data
def deleteResponse(self, resource_name, params={}, expected_code=204):
rv = self.app.delete(self.url_for(resource_name, params))
if rv.status_code != expected_code:
print 'Mismatch data for resource DELETE %s: %s' % (resource_name, rv.data)
self.assertEquals(rv.status_code, expected_code)
return rv.data
def deleteEmptyResponse(self, resource_name, params={}, expected_code=204):
rv = self.app.delete(self.url_for(resource_name, params))
self.assertEquals(rv.status_code, expected_code)
self.assertEquals(rv.data, '') # ensure response body empty
return
def postJsonResponse(self, resource_name, params={}, data={},
expected_code=200):
rv = self.app.post(self.url_for(resource_name, params),
data=py_json.dumps(data),
headers={"Content-Type": "application/json"})
if rv.status_code != expected_code:
print 'Mismatch data for resource POST %s: %s' % (resource_name, rv.data)
self.assertEquals(rv.status_code, expected_code)
data = rv.data
parsed = py_json.loads(data)
return parsed
def putJsonResponse(self, resource_name, params={}, data={},
expected_code=200, skip_csrf=False):
rv = self.app.put(self.url_for(resource_name, params, skip_csrf),
data=py_json.dumps(data),
headers={"Content-Type": "application/json"})
if rv.status_code != expected_code:
print 'Mismatch data for resource PUT %s: %s' % (resource_name, rv.data)
self.assertEquals(rv.status_code, expected_code)
data = rv.data
parsed = py_json.loads(data)
return parsed
def assertNotInTeam(self, data, membername):
for memberData in data['members']:
if memberData['name'] == membername:
self.fail(membername + ' found in team: ' + data['name'])
def assertInTeam(self, data, membername):
for member_data in data['members']:
if member_data['name'] == membername:
return
self.fail(membername + ' not found in team: ' + data['name'])
def login(self, username, password='password'):
return self.postJsonResponse(Signin, data=dict(username=username, password=password))
class TestCSRFFailure(ApiTestCase):
def test_csrf_failure(self):
self.login(READ_ACCESS_USER)
# Make sure a simple post call succeeds.
self.putJsonResponse(User, data=dict(password='newpasswordiscool'))
# Change the session's CSRF token.
self.setCsrfToken('someinvalidtoken')
# Verify that the call now fails.
self.putJsonResponse(User, data=dict(password='newpasswordiscool'), expected_code=403)
def test_csrf_failure_empty_token(self):
self.login(READ_ACCESS_USER)
# Change the session's CSRF token to be empty.
self.setCsrfToken('')
# Verify that the call now fails.
self.putJsonResponse(User, data=dict(password='newpasswordiscool'), expected_code=403)
def test_csrf_failure_missing_token(self):
self.login(READ_ACCESS_USER)
# Make sure a simple post call without a token at all fails.
self.putJsonResponse(User, data=dict(password='newpasswordiscool'), skip_csrf=True,
expected_code=403)
# Change the session's CSRF token to be empty.
self.setCsrfToken('')
# Verify that the call still fails.
self.putJsonResponse(User, data=dict(password='newpasswordiscool'), skip_csrf=True,
expected_code=403)
class TestDiscovery(ApiTestCase):
def test_discovery(self):
json = self.getJsonResponse(DiscoveryResource)
assert 'paths' in json
class TestErrorDescription(ApiTestCase):
def test_get_error(self):
json = self.getJsonResponse(Error, params=dict(error_type='not_found'))
assert json['title'] == 'not_found'
assert 'type' in json
assert 'description' in json
class TestPlans(ApiTestCase):
def test_plans(self):
json = self.getJsonResponse(ListPlans)
found = set([])
for method_info in json['plans']:
found.add(method_info['stripeId'])
assert 'free' in found
class TestLoggedInUser(ApiTestCase):
def test_guest(self):
self.getJsonResponse(User, expected_code=401)
def test_user(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(User)
assert json['anonymous'] == False
assert json['username'] == READ_ACCESS_USER
class TestUserStarredRepositoryList(ApiTestCase):
def test_get_stars_guest(self):
self.getJsonResponse(StarredRepositoryList, expected_code=401)
def test_get_stars_user(self):
self.login(READ_ACCESS_USER)
# Queries: Base + the list query
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 1):
self.getJsonResponse(StarredRepositoryList, expected_code=200)
def test_star_repo_guest(self):
self.postJsonResponse(StarredRepositoryList,
data={
'namespace': 'public',
'repository': 'publicrepo',
},
expected_code=401)
def test_star_and_unstar_repo_user(self):
self.login(READ_ACCESS_USER)
# Queries: Base + the list query
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 1):
json = self.getJsonResponse(StarredRepositoryList)
assert json['repositories'] == []
json = self.postJsonResponse(StarredRepositoryList,
data={
'namespace': 'public',
'repository': 'publicrepo',
},
expected_code=201)
assert json['namespace'] == 'public'
assert json['repository'] == 'publicrepo'
self.deleteEmptyResponse(StarredRepository, params=dict(repository='public/publicrepo'),
expected_code=204)
json = self.getJsonResponse(StarredRepositoryList)
assert json['repositories'] == []
class TestUserNotification(ApiTestCase):
def test_get(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(UserNotificationList)
# Make sure each notification can be retrieved.
for notification in json['notifications']:
njson = self.getJsonResponse(UserNotification, params=dict(uuid=notification['id']))
self.assertEquals(notification['id'], njson['id'])
# Update a notification.
assert json['notifications']
assert not json['notifications'][0]['dismissed']
notification = json['notifications'][0]
pjson = self.putJsonResponse(UserNotification, params=dict(uuid=notification['id']),
data=dict(dismissed=True))
self.assertEquals(True, pjson['dismissed'])
def test_org_notifications(self):
# Create a notification on the organization.
org = model.user.get_user_or_org(ORGANIZATION)
model.notification.create_notification('test_notification', org, {'org': 'notification'})
# Ensure it is visible to the org admin.
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(UserNotificationList)
notification = json['notifications'][0]
self.assertEquals(notification['kind'], 'test_notification')
self.assertEquals(notification['metadata'], {'org': 'notification'})
# Ensure it is not visible to an org member.
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(UserNotificationList)
self.assertEquals(0, len(json['notifications']))
class TestGetUserPrivateAllowed(ApiTestCase):
def test_nonallowed(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(PrivateRepositories)
assert json['privateCount'] == 0
assert not json['privateAllowed']
def test_allowed(self):
self.login(ADMIN_ACCESS_USER)
# Change the subscription of the namespace.
self.putJsonResponse(UserPlan, data=dict(plan='personal-30'))
json = self.getJsonResponse(PrivateRepositories)
assert json['privateCount'] >= 6
assert not json['privateAllowed']
# Change the subscription of the namespace.
self.putJsonResponse(UserPlan, data=dict(plan='bus-large-30'))
json = self.getJsonResponse(PrivateRepositories)
assert json['privateAllowed']
class TestConvertToOrganization(ApiTestCase):
def test_sameadminuser(self):
self.login(READ_ACCESS_USER)
json = self.postJsonResponse(ConvertToOrganization,
data={'adminUser': READ_ACCESS_USER,
'adminPassword': 'password',
'plan': 'free'},
expected_code=400)
self.assertEqual('The admin user is not valid', json['detail'])
def test_sameadminuser_by_email(self):
self.login(READ_ACCESS_USER)
json = self.postJsonResponse(ConvertToOrganization,
data={'adminUser': 'no1@thanks.com',
'adminPassword': 'password',
'plan': 'free'},
expected_code=400)
self.assertEqual('The admin user is not valid', json['detail'])
def test_invalidadminuser(self):
self.login(READ_ACCESS_USER)
json = self.postJsonResponse(ConvertToOrganization,
data={'adminUser': 'unknownuser',
'adminPassword': 'password',
'plan': 'free'},
expected_code=400)
self.assertEqual('The admin user credentials are not valid',
json['detail'])
def test_invalidadminpassword(self):
self.login(READ_ACCESS_USER)
json = self.postJsonResponse(ConvertToOrganization,
data={'adminUser': ADMIN_ACCESS_USER,
'adminPassword': 'invalidpass',
'plan': 'free'},
expected_code=400)
self.assertEqual('The admin user credentials are not valid',
json['detail'])
def test_convert(self):
self.login(READ_ACCESS_USER)
# Add at least one permission for the read-user.
read_user = model.user.get_user(READ_ACCESS_USER)
simple_repo = model.repository.get_repository(ADMIN_ACCESS_USER, 'simple')
read_role = database.Role.get(name='read')
database.RepositoryPermission.create(user=read_user, repository=simple_repo, role=read_role)
# Convert the read user into an organization.
json = self.postJsonResponse(ConvertToOrganization,
data={'adminUser': ADMIN_ACCESS_USER,
'adminPassword': 'password',
'plan': 'free'})
self.assertEqual(True, json['success'])
# Verify the organization exists.
organization = model.organization.get_organization(READ_ACCESS_USER)
assert organization is not None
# Verify the admin user is the org's admin.
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(Organization,
params=dict(orgname=READ_ACCESS_USER))
self.assertEquals(READ_ACCESS_USER, json['name'])
self.assertEquals(True, json['is_admin'])
# Verify the now-org has no permissions.
count = (database
.RepositoryPermission
.select()
.where(database.RepositoryPermission.user == organization)
.count())
self.assertEquals(0, count)
def test_convert_via_email(self):
self.login(READ_ACCESS_USER)
json = self.postJsonResponse(ConvertToOrganization,
data={'adminUser': ADMIN_ACCESS_EMAIL,
'adminPassword': 'password',
'plan': 'free'})
self.assertEqual(True, json['success'])
# Verify the organization exists.
organization = model.organization.get_organization(READ_ACCESS_USER)
assert organization is not None
# Verify the admin user is the org's admin.
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(Organization,
params=dict(orgname=READ_ACCESS_USER))
self.assertEquals(READ_ACCESS_USER, json['name'])
self.assertEquals(True, json['is_admin'])
class TestChangeUserDetails(ApiTestCase):
def test_changepassword(self):
self.login(READ_ACCESS_USER)
self.putJsonResponse(User,
data=dict(password='newpasswordiscool'))
self.login(READ_ACCESS_USER, password='newpasswordiscool')
def test_changepassword_unicode(self):
self.login(READ_ACCESS_USER)
self.putJsonResponse(User,
data=dict(password=u'someunicode北京市pass'))
self.login(READ_ACCESS_USER, password=u'someunicode北京市pass')
def test_changeeemail(self):
self.login(READ_ACCESS_USER)
self.putJsonResponse(User,
data=dict(email='test+foo@devtable.com'))
def test_changeinvoiceemail(self):
self.login(READ_ACCESS_USER)
json = self.putJsonResponse(User,
data=dict(invoice_email=True))
self.assertEquals(True, json['invoice_email'])
json = self.putJsonResponse(User,
data=dict(invoice_email=False))
self.assertEquals(False, json['invoice_email'])
def test_changeusername_temp(self):
self.login(READ_ACCESS_USER)
user = model.user.get_user(READ_ACCESS_USER)
model.user.create_user_prompt(user, 'confirm_username')
self.assertTrue(model.user.has_user_prompt(user, 'confirm_username'))
# Add a robot under the user's namespace.
model.user.create_robot('somebot', user)
# Rename the user.
json = self.putJsonResponse(User, data=dict(username='someotherusername'))
# Ensure the username was changed.
self.assertEquals('someotherusername', json['username'])
self.assertFalse(model.user.has_user_prompt(user, 'confirm_username'))
# Ensure the robot was changed.
self.assertIsNone(model.user.get_user(READ_ACCESS_USER + '+somebot'))
self.assertIsNotNone(model.user.get_user('someotherusername+somebot'))
def test_changeusername_temp_samename(self):
self.login(READ_ACCESS_USER)
user = model.user.get_user(READ_ACCESS_USER)
model.user.create_user_prompt(user, 'confirm_username')
self.assertTrue(model.user.has_user_prompt(user, 'confirm_username'))
json = self.putJsonResponse(User, data=dict(username=READ_ACCESS_USER))
# Ensure the username was not changed but they are no longer temporarily named.
self.assertEquals(READ_ACCESS_USER, json['username'])
self.assertFalse(model.user.has_user_prompt(user, 'confirm_username'))
def test_changeusername_notallowed(self):
with self.toggleFeature('USER_RENAME', False):
self.login(ADMIN_ACCESS_USER)
user = model.user.get_user(ADMIN_ACCESS_USER)
self.assertFalse(model.user.has_user_prompt(user, 'confirm_username'))
json = self.putJsonResponse(User, data=dict(username='someotherusername'))
self.assertEquals(ADMIN_ACCESS_USER, json['username'])
self.assertTrue('prompts' in json)
self.assertIsNone(model.user.get_user('someotherusername'))
self.assertIsNotNone(model.user.get_user(ADMIN_ACCESS_USER))
def test_changeusername_allowed(self):
with self.toggleFeature('USER_RENAME', True):
self.login(ADMIN_ACCESS_USER)
user = model.user.get_user(ADMIN_ACCESS_USER)
self.assertFalse(model.user.has_user_prompt(user, 'confirm_username'))
json = self.putJsonResponse(User, data=dict(username='someotherusername'))
self.assertEquals('someotherusername', json['username'])
self.assertTrue('prompts' in json)
self.assertIsNotNone(model.user.get_user('someotherusername'))
self.assertIsNone(model.user.get_user(ADMIN_ACCESS_USER))
def test_changeusername_already_used(self):
self.login(READ_ACCESS_USER)
user = model.user.get_user(READ_ACCESS_USER)
model.user.create_user_prompt(user, 'confirm_username')
self.assertTrue(model.user.has_user_prompt(user, 'confirm_username'))
# Try to change to a used username.
self.putJsonResponse(User, data=dict(username=ADMIN_ACCESS_USER), expected_code=400)
# Change to a new username.
self.putJsonResponse(User, data=dict(username='unusedusername'))
class TestCreateNewUser(ApiTestCase):
def test_existingusername(self):
json = self.postJsonResponse(User,
data=dict(username=READ_ACCESS_USER,
password='password',
email='test@example.com'),
expected_code=400)
self.assertEquals('The username already exists', json['detail'])
def test_trycreatetooshort(self):
json = self.postJsonResponse(User,
data=dict(username='a',
password='password',
email='test@example.com'),
expected_code=400)
self.assertEquals('Invalid namespace a: Namespace must be between 2 and 255 characters in length',
json['detail'])
def test_trycreateregexmismatch(self):
json = self.postJsonResponse(User,
data=dict(username='auserName',
password='password',
email='test@example.com'),
expected_code=400)
self.assertEquals('Invalid namespace auserName: Namespace must match expression ^([a-z0-9]+(?:[._-][a-z0-9]+)*)$',
json['detail'])
def test_createuser(self):
data = self.postJsonResponse(User, data=NEW_USER_DETAILS, expected_code=200)
self.assertEquals(True, data['awaiting_verification'])
def test_createuser_captcha(self):
@urlmatch(netloc=r'(.*\.)?google.com', path='/recaptcha/api/siteverify')
def captcha_endpoint(url, request):
if url.query.find('response=somecode') > 0:
return {'status_code': 200, 'content': py_json.dumps({'success': True})}
else:
return {'status_code': 400, 'content': py_json.dumps({'success': False})}
with HTTMock(captcha_endpoint):
with self.toggleFeature('RECAPTCHA', True):
# Try with a missing captcha.
self.postResponse(User, data=NEW_USER_DETAILS, expected_code=400)
# Try with an invalid captcha.
details = dict(NEW_USER_DETAILS)
details['recaptcha_response'] = 'someinvalidcode'
self.postResponse(User, data=details, expected_code=400)
# Try with a valid captcha.
details = dict(NEW_USER_DETAILS)
details['recaptcha_response'] = 'somecode'
self.postResponse(User, data=details, expected_code=200)
def test_createuser_withteaminvite(self):
inviter = model.user.get_user(ADMIN_ACCESS_USER)
team = model.team.get_organization_team(ORGANIZATION, 'owners')
invite = model.team.add_or_invite_to_team(inviter, team, None, NEW_USER_DETAILS['email'])
details = {
'invite_code': invite.invite_token
}
details.update(NEW_USER_DETAILS)
data = self.postJsonResponse(User, data=details, expected_code=200)
# Make sure the user is verified since the email address of the user matches
# that of the team invite.
self.assertFalse('awaiting_verification' in data)
# Make sure the user was not (yet) added to the team.
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='owners'))
self.assertNotInTeam(json, NEW_USER_DETAILS['username'])
def test_createuser_withteaminvite_differentemails(self):
inviter = model.user.get_user(ADMIN_ACCESS_USER)
team = model.team.get_organization_team(ORGANIZATION, 'owners')
invite = model.team.add_or_invite_to_team(inviter, team, None, 'differentemail@example.com')
details = {
'invite_code': invite.invite_token
}
details.update(NEW_USER_DETAILS)
data = self.postJsonResponse(User, data=details, expected_code=200)
# Make sure the user is *not* verified since the email address of the user
# does not match that of the team invite.
self.assertTrue(data['awaiting_verification'])
# Make sure the user was not (yet) added to the team.
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='owners'))
self.assertNotInTeam(json, NEW_USER_DETAILS['username'])
def test_createuser_withmultipleteaminvites(self):
inviter = model.user.get_user(ADMIN_ACCESS_USER)
owners_team = model.team.get_organization_team(ORGANIZATION, 'owners')
readers_team = model.team.get_organization_team(ORGANIZATION, 'readers')
other_owners_team = model.team.get_organization_team('library', 'owners')
owners_invite = model.team.add_or_invite_to_team(inviter, owners_team, None,
NEW_USER_DETAILS['email'])
readers_invite = model.team.add_or_invite_to_team(inviter, readers_team, None,
NEW_USER_DETAILS['email'])
other_owners_invite = model.team.add_or_invite_to_team(inviter, other_owners_team, None,
NEW_USER_DETAILS['email'])
# Create the user and ensure they have a verified email address.
details = {
'invite_code': owners_invite.invite_token
}
details.update(NEW_USER_DETAILS)
data = self.postJsonResponse(User, data=details, expected_code=200)
# Make sure the user is verified since the email address of the user matches
# that of the team invite.
self.assertFalse('awaiting_verification' in data)
# Make sure the user was not (yet) added to the teams.
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='owners'))
self.assertNotInTeam(json, NEW_USER_DETAILS['username'])
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers'))
self.assertNotInTeam(json, NEW_USER_DETAILS['username'])
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname='library',
teamname='owners'))
self.assertNotInTeam(json, NEW_USER_DETAILS['username'])
# Accept the first invitation.
self.login(NEW_USER_DETAILS['username'])
self.putJsonResponse(TeamMemberInvite, params=dict(code=owners_invite.invite_token))
# Make sure both codes are now invalid.
self.putResponse(TeamMemberInvite, params=dict(code=owners_invite.invite_token),
expected_code=400)
self.putResponse(TeamMemberInvite, params=dict(code=readers_invite.invite_token),
expected_code=400)
# Make sure the user is now in the two invited teams under the organization, but not
# in the other org's team.
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='owners'))
self.assertInTeam(json, NEW_USER_DETAILS['username'])
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers'))
self.assertInTeam(json, NEW_USER_DETAILS['username'])
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname='library',
teamname='owners'))
self.assertNotInTeam(json, NEW_USER_DETAILS['username'])
# Accept the second invitation.
self.login(NEW_USER_DETAILS['username'])
self.putJsonResponse(TeamMemberInvite, params=dict(code=other_owners_invite.invite_token))
# Make sure the user was added to the other organization.
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname='library',
teamname='owners'))
self.assertInTeam(json, NEW_USER_DETAILS['username'])
# Make sure the invitation codes are now invalid.
self.putResponse(TeamMemberInvite, params=dict(code=other_owners_invite.invite_token),
expected_code=400)
class TestDeleteNamespace(ApiTestCase):
def test_deletenamespaces(self):
self.login(ADMIN_ACCESS_USER)
# Try to first delete the user. Since they are the sole admin of two orgs, it should fail.
with check_transitive_deletes():
self.deleteResponse(User, expected_code=400)
# Delete the two orgs, checking in between.
with check_transitive_deletes():
self.deleteEmptyResponse(Organization, params=dict(orgname=ORGANIZATION), expected_code=204)
self.deleteResponse(User, expected_code=400) # Should still fail.
self.deleteEmptyResponse(Organization, params=dict(orgname='library'), expected_code=204)
# Add some queue items for the user.
notification_queue.put([ADMIN_ACCESS_USER, 'somerepo', 'somename'], '{}')
dockerfile_build_queue.put([ADMIN_ACCESS_USER, 'anotherrepo'], '{}')
# Now delete the user.
with check_transitive_deletes():
self.deleteEmptyResponse(User, expected_code=204)
# Ensure the queue items are gone.
self.assertIsNone(notification_queue.get())
self.assertIsNone(dockerfile_build_queue.get())
def test_delete_federateduser(self):
self.login(PUBLIC_USER)
# Add some federated logins.
user = model.user.get_user(PUBLIC_USER)
model.user.attach_federated_login(user, 'github', 'something', {})
with check_transitive_deletes():
self.deleteEmptyResponse(User, expected_code=204)
def test_delete_prompted_user(self):
self.login('randomuser')
with check_transitive_deletes():
self.deleteEmptyResponse(User, expected_code=204)
class TestSignin(ApiTestCase):
def test_signin_unicode(self):
self.postResponse(Signin, data=dict(username=u'\xe5\x8c\x97\xe4\xba\xac\xe5\xb8\x82',
password='password'), expected_code=403)
def test_signin_invitecode(self):
# Create a new user (unverified)
data = self.postJsonResponse(User, data=NEW_USER_DETAILS, expected_code=200)
self.assertTrue(data['awaiting_verification'])
# Try to sign in without an invite code.
data = self.postJsonResponse(Signin, data=NEW_USER_DETAILS, expected_code=403)
self.assertTrue(data['needsEmailVerification'])
# Try to sign in with an invalid invite code.
details = {
'invite_code': 'someinvalidcode'
}
details.update(NEW_USER_DETAILS)
data = self.postJsonResponse(Signin, data=details, expected_code=403)
self.assertTrue(data['needsEmailVerification'])
# Sign in with an invite code and ensure the user becomes verified.
inviter = model.user.get_user(ADMIN_ACCESS_USER)
team = model.team.get_organization_team(ORGANIZATION, 'owners')
invite = model.team.add_or_invite_to_team(inviter, team, None, NEW_USER_DETAILS['email'])
details = {
'invite_code': invite.invite_token
}
details.update(NEW_USER_DETAILS)
data = self.postJsonResponse(Signin, data=details, expected_code=200)
self.assertFalse('needsEmailVerification' in data)
class TestSignout(ApiTestCase):
def test_signout(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(User)
assert json['username'] == READ_ACCESS_USER
self.postResponse(Signout)
# Make sure we're now signed out.
self.getJsonResponse(User, expected_code=401)
class TestConductSearch(ApiTestCase):
def test_noaccess(self):
self.login(NO_ACCESS_USER)
json = self.getJsonResponse(ConductSearch,
params=dict(query='read'))
self.assertEquals(0, len(json['results']))
json = self.getJsonResponse(ConductSearch,
params=dict(query='owners'))
self.assertEquals(0, len(json['results']))
def test_nouser(self):
json = self.getJsonResponse(ConductSearch,
params=dict(query='read'))
self.assertEquals(0, len(json['results']))
json = self.getJsonResponse(ConductSearch,
params=dict(query='public'))
self.assertEquals(2, len(json['results']))
self.assertEquals(json['results'][0]['kind'], 'repository')
self.assertEquals(json['results'][0]['name'], 'publicrepo')
self.assertEquals(json['results'][1]['kind'], 'user')
self.assertEquals(json['results'][1]['name'], 'public')
json = self.getJsonResponse(ConductSearch,
params=dict(query='owners'))
self.assertEquals(0, len(json['results']))
def test_orgmember(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(ConductSearch,
params=dict(query='owners'))
self.assertEquals(0, len(json['results']))
json = self.getJsonResponse(ConductSearch,
params=dict(query='readers'))
self.assertEquals(1, len(json['results']))
self.assertEquals(json['results'][0]['kind'], 'team')
self.assertEquals(json['results'][0]['name'], 'readers')
def test_orgadmin(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(ConductSearch,
params=dict(query='owners'))
self.assertEquals(2, len(json['results']))
self.assertEquals(json['results'][0]['kind'], 'team')
self.assertEquals(json['results'][0]['name'], 'owners')
json = self.getJsonResponse(ConductSearch,
params=dict(query='readers'))
self.assertEquals(1, len(json['results']))
self.assertEquals(json['results'][0]['kind'], 'team')
self.assertEquals(json['results'][0]['name'], 'readers')
def test_explicit_permission(self):
self.login('reader')
json = self.getJsonResponse(ConductSearch,
params=dict(query='shared'))
self.assertEquals(1, len(json['results']))
self.assertEquals(json['results'][0]['kind'], 'repository')
self.assertEquals(json['results'][0]['name'], 'shared')
def test_full_text(self):
self.login(ADMIN_ACCESS_USER)
# Make sure the repository is found via `full` and `text search`.
json = self.getJsonResponse(ConductSearch,
params=dict(query='full'))
self.assertEquals(1, len(json['results']))
self.assertEquals(json['results'][0]['kind'], 'repository')
self.assertEquals(json['results'][0]['name'], 'text-full-repo')
json = self.getJsonResponse(ConductSearch,
params=dict(query='text search'))
self.assertEquals(1, len(json['results']))
self.assertEquals(json['results'][0]['kind'], 'repository')
self.assertEquals(json['results'][0]['name'], 'text-full-repo')
class TestGetMatchingEntities(ApiTestCase):
def test_simple_lookup(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(EntitySearch,
params=dict(prefix=ADMIN_ACCESS_USER, namespace=ORGANIZATION,
includeTeams='true'))
self.assertEquals(1, len(json['results']))
def test_simple_lookup_noorg(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(EntitySearch,
params=dict(prefix=ADMIN_ACCESS_USER))
self.assertEquals(1, len(json['results']))
def test_unicode_search(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(EntitySearch,
params=dict(prefix='北京市', namespace=ORGANIZATION,
includeTeams='true'))
self.assertEquals(0, len(json['results']))
def test_notinorg(self):
self.login(NO_ACCESS_USER)
json = self.getJsonResponse(EntitySearch,
params=dict(prefix='o', namespace=ORGANIZATION,
includeTeams='true'))
names = set([r['name'] for r in json['results']])
assert 'outsideorg' in names
assert not 'owners' in names
def test_inorg(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(EntitySearch,
params=dict(prefix='o', namespace=ORGANIZATION,
includeTeams='true'))
names = set([r['name'] for r in json['results']])
assert 'outsideorg' in names
assert 'owners' in names
def test_inorg_withorgs(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(EntitySearch,
params=dict(prefix=ORGANIZATION[0], namespace=ORGANIZATION,
includeOrgs='true'))
names = set([r['name'] for r in json['results']])
assert ORGANIZATION in names
class TestCreateOrganization(ApiTestCase):
def test_existinguser(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(OrganizationList,
data=dict(name=ADMIN_ACCESS_USER, email='testorg@example.com'),
expected_code=400)
self.assertEquals('A user or organization with this name already exists',
json['detail'])
def test_existingorg(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(OrganizationList,
data=dict(name=ORGANIZATION, email='testorg@example.com'),
expected_code=400)
self.assertEquals('A user or organization with this name already exists',
json['detail'])
def test_createorg(self):
self.login(ADMIN_ACCESS_USER)
data = self.postResponse(OrganizationList,
data=dict(name='neworg',
email='testorg@example.com'),
expected_code=201)
self.assertEquals('"Created"', data.strip())
# Ensure the org was created.
organization = model.organization.get_organization('neworg')
assert organization is not None
# Verify the admin user is the org's admin.
json = self.getJsonResponse(Organization,
params=dict(orgname='neworg'))
self.assertEquals('neworg', json['name'])
self.assertEquals(True, json['is_admin'])
def test_createorg_viaoauth(self):
# Attempt with no auth.
self.postResponse(OrganizationList,
data=dict(name='neworg',
email='testorg@example.com'),
expected_code=401)
# Attempt with auth with invalid scope.
dt_user = model.user.get_user(ADMIN_ACCESS_USER)
token = model.oauth.create_access_token_for_testing(dt_user, 'deadbeef', 'repo:read',
access_token='foo')
self.postResponse(OrganizationList,
data=dict(name='neworg',
email='testorg@example.com'),
headers=dict(Authorization='Bearer ' + token.access_token),
expected_code=403)
# Create OAuth token with user:admin scope.
token = model.oauth.create_access_token_for_testing(dt_user, 'deadbeef', 'user:admin',
access_token='bar')
data = self.postResponse(OrganizationList,
data=dict(name='neworg',
email='testorg@example.com'),
headers=dict(Authorization='Bearer ' + token.access_token),
expected_code=201)
self.assertEquals('"Created"', data.strip())
class TestGetOrganization(ApiTestCase):
def test_unknownorg(self):
self.login(ADMIN_ACCESS_USER)
self.getResponse(Organization, params=dict(orgname='notvalid'), expected_code=404)
def test_cannotaccess(self):
self.login(NO_ACCESS_USER)
self.getResponse(Organization, params=dict(orgname=ORGANIZATION), expected_code=200)
def test_getorganization(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(Organization,
params=dict(orgname=ORGANIZATION))
self.assertEquals(ORGANIZATION, json['name'])
self.assertEquals(False, json['is_admin'])
def test_getorganization_asadmin(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(Organization,
params=dict(orgname=ORGANIZATION))
self.assertEquals(ORGANIZATION, json['name'])
self.assertEquals(True, json['is_admin'])
class TestChangeOrganizationDetails(ApiTestCase):
def test_changeinvoiceemail(self):
self.login(ADMIN_ACCESS_USER)
json = self.putJsonResponse(Organization,
params=dict(orgname=ORGANIZATION),
data=dict(invoice_email=True))
self.assertEquals(True, json['invoice_email'])
json = self.putJsonResponse(Organization,
params=dict(orgname=ORGANIZATION),
data=dict(invoice_email=False))
self.assertEquals(False, json['invoice_email'])
def test_changemail(self):
self.login(ADMIN_ACCESS_USER)
json = self.putJsonResponse(Organization,
params=dict(orgname=ORGANIZATION),
data=dict(email='newemail@example.com'))
self.assertEquals('newemail@example.com', json['email'])
class TestGetOrganizationPrototypes(ApiTestCase):
def test_getprototypes(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION))
assert len(json['prototypes']) > 0
class TestCreateOrganizationPrototypes(ApiTestCase):
def test_invaliduser(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION),
data=dict(activating_user={'name': 'unknownuser'},
role='read',
delegate={'kind': 'team', 'name': 'owners'}),
expected_code=400)
self.assertEquals('Unknown activating user', json['detail'])
def test_missingdelegate(self):
self.login(ADMIN_ACCESS_USER)
self.postJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION),
data=dict(role='read'),
expected_code=400)
def test_createprototype(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION),
data=dict(role='read',
delegate={'kind': 'team',
'name': 'readers'}))
self.assertEquals('read', json['role'])
pid = json['id']
# Verify the prototype exists.
json = self.getJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION))
ids = set([p['id'] for p in json['prototypes']])
assert pid in ids
class TestDeleteOrganizationPrototypes(ApiTestCase):
def test_deleteprototype(self):
self.login(ADMIN_ACCESS_USER)
# Get the existing prototypes
json = self.getJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION))
ids = [p['id'] for p in json['prototypes']]
pid = ids[0]
# Delete a prototype.
self.deleteEmptyResponse(PermissionPrototype,
params=dict(orgname=ORGANIZATION, prototypeid=pid))
# Verify the prototype no longer exists.
json = self.getJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION))
newids = [p['id'] for p in json['prototypes']]
assert not pid in newids
class TestUpdateOrganizationPrototypes(ApiTestCase):
def test_updateprototype(self):
self.login(ADMIN_ACCESS_USER)
# Get the existing prototypes
json = self.getJsonResponse(PermissionPrototypeList,
params=dict(orgname=ORGANIZATION))
ids = [p['id'] for p in json['prototypes']]
pid = ids[0]
# Update a prototype.
json = self.putJsonResponse(PermissionPrototype,
params=dict(orgname=ORGANIZATION, prototypeid=pid),
data=dict(role='admin'))
self.assertEquals('admin', json['role'])
class TestGetOrganizationMembers(ApiTestCase):
def test_getmembers(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrganizationMemberList,
params=dict(orgname=ORGANIZATION))
membernames = [member['name'] for member in json['members']]
assert ADMIN_ACCESS_USER in membernames
assert READ_ACCESS_USER in membernames
assert not NO_ACCESS_USER in membernames
for member in json['members']:
membername = member['name']
response = self.getJsonResponse(OrganizationMember,
params=dict(orgname=ORGANIZATION, membername=membername))
self.assertEquals(member, response)
class TestRemoveOrganizationMember(ApiTestCase):
def test_try_remove_only_admin(self):
self.login(ADMIN_ACCESS_USER)
self.deleteResponse(OrganizationMember,
params=dict(orgname=ORGANIZATION, membername=ADMIN_ACCESS_USER),
expected_code=400)
def test_remove_member(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrganizationMemberList,
params=dict(orgname=ORGANIZATION))
membernames = [member['name'] for member in json['members']]
assert ADMIN_ACCESS_USER in membernames
assert READ_ACCESS_USER in membernames
self.deleteEmptyResponse(OrganizationMember,
params=dict(orgname=ORGANIZATION, membername=READ_ACCESS_USER))
json = self.getJsonResponse(OrganizationMemberList,
params=dict(orgname=ORGANIZATION))
membernames = [member['name'] for member in json['members']]
assert ADMIN_ACCESS_USER in membernames
assert not READ_ACCESS_USER in membernames
def test_remove_member_repo_permission(self):
self.login(ADMIN_ACCESS_USER)
# Add read user as a direct permission on the admin user's repo.
model.permission.set_user_repo_permission(READ_ACCESS_USER, ADMIN_ACCESS_USER, 'simple', 'read')
# Verify the user has a permission on the admin user's repo.
admin_perms = [p.user.username
for p in model.user.get_all_repo_users(ADMIN_ACCESS_USER, 'simple')]
assert READ_ACCESS_USER in admin_perms
# Add read user as a direct permission on the org repo.
model.permission.set_user_repo_permission(READ_ACCESS_USER, ORGANIZATION, ORG_REPO, 'read')
# Verify the user has a permission on the org repo.
org_perms = [p.user.username for p in model.user.get_all_repo_users(ORGANIZATION, ORG_REPO)]
assert READ_ACCESS_USER in org_perms
# Remove the user from the org.
self.deleteEmptyResponse(OrganizationMember,
params=dict(orgname=ORGANIZATION, membername=READ_ACCESS_USER))
# Verify that the user's permission on the org repo is gone, but it is still
# present on the other repo.
org_perms = [p.user.username for p in model.user.get_all_repo_users(ORGANIZATION, ORG_REPO)]
assert not READ_ACCESS_USER in org_perms
admin_perms = [p.user.username
for p in model.user.get_all_repo_users(ADMIN_ACCESS_USER, 'simple')]
assert READ_ACCESS_USER in admin_perms
class TestGetOrganizationPrivateAllowed(ApiTestCase):
def test_existingorg(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrgPrivateRepositories,
params=dict(orgname=ORGANIZATION))
self.assertEquals(True, json['privateAllowed'])
assert not 'reposAllowed' in json
def test_neworg(self):
self.login(ADMIN_ACCESS_USER)
data = self.postResponse(OrganizationList,
data=dict(name='neworg',
email='test@example.com'),
expected_code=201)
json = self.getJsonResponse(OrgPrivateRepositories,
params=dict(orgname='neworg'))
self.assertEquals(False, json['privateAllowed'])
class TestUpdateOrganizationTeam(ApiTestCase):
def test_updateexisting(self):
self.login(ADMIN_ACCESS_USER)
data = self.putJsonResponse(OrganizationTeam,
params=dict(orgname=ORGANIZATION,
teamname='readers'),
data=dict(description='My cool team',
role='creator'))
self.assertEquals('My cool team', data['description'])
self.assertEquals('creator', data['role'])
def test_attemptchangeroleonowners(self):
self.login(ADMIN_ACCESS_USER)
self.putJsonResponse(OrganizationTeam,
params=dict(orgname=ORGANIZATION, teamname='owners'),
data=dict(role='creator'),
expected_code=400)
def test_createnewteam(self):
self.login(ADMIN_ACCESS_USER)
data = self.putJsonResponse(OrganizationTeam,
params=dict(orgname=ORGANIZATION,
teamname='newteam'),
data=dict(description='My cool team',
role='member'))
self.assertEquals('My cool team', data['description'])
self.assertEquals('member', data['role'])
# Verify the team was created.
json = self.getJsonResponse(Organization,
params=dict(orgname=ORGANIZATION))
assert 'newteam' in json['teams']
class TestDeleteOrganizationTeam(ApiTestCase):
def test_deleteteam(self):
self.login(ADMIN_ACCESS_USER)
self.deleteEmptyResponse(OrganizationTeam,
params=dict(orgname=ORGANIZATION, teamname='readers'))
# Make sure the team was deleted
json = self.getJsonResponse(Organization,
params=dict(orgname=ORGANIZATION))
assert not 'readers' in json['teams']
def test_attemptdeleteowners(self):
self.login(ADMIN_ACCESS_USER)
resp = self.deleteResponse(OrganizationTeam,
params=dict(orgname=ORGANIZATION, teamname='owners'),
expected_code=400)
data = py_json.loads(resp)
msg = ("Deleting team 'owners' would remove admin ability for user " +
"'devtable' in organization 'buynlarge'")
self.assertEquals(msg, data['message'])
class TestTeamPermissions(ApiTestCase):
def test_team_permissions(self):
self.login(ADMIN_ACCESS_USER)
resp = self.getJsonResponse(TeamPermissions,
params=dict(orgname=ORGANIZATION, teamname='readers'))
self.assertEquals(1, len(resp['permissions']))
class TestGetOrganizationTeamMembers(ApiTestCase):
def test_invalidteam(self):
self.login(ADMIN_ACCESS_USER)
self.getResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION, teamname='notvalid'),
expected_code=404)
def test_getmembers(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers'))
self.assertEquals(READ_ACCESS_USER, json['members'][1]['name'])
class TestUpdateOrganizationTeamMember(ApiTestCase):
def test_addmember_alreadyteammember(self):
self.login(ADMIN_ACCESS_USER)
membername = READ_ACCESS_USER
self.putResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='readers',
membername=membername),
expected_code=400)
def test_addmember_orgmember(self):
self.login(ADMIN_ACCESS_USER)
membername = READ_ACCESS_USER
self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='owners',
membername=membername))
# Verify the user was added to the team.
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='owners'))
self.assertInTeam(json, membername)
def test_addmember_robot(self):
self.login(ADMIN_ACCESS_USER)
membername = ORGANIZATION + '+coolrobot'
self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='readers',
membername=membername))
# Verify the user was added to the team.
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers'))
self.assertInTeam(json, membername)
def test_addmember_invalidrobot(self):
self.login(ADMIN_ACCESS_USER)
membername = 'freshuser+anotherrobot'
self.putResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='readers',
membername=membername),
expected_code=400)
def test_addmember_nonorgmember(self):
self.login(ADMIN_ACCESS_USER)
membername = NO_ACCESS_USER
response = self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='owners',
membername=membername))
self.assertEquals(True, response['invited'])
# Make sure the user is not (yet) part of the team.
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers'))
for member in json['members']:
self.assertNotEqual(membername, member['name'])
class TestAcceptTeamMemberInvite(ApiTestCase):
def test_accept(self):
self.login(ADMIN_ACCESS_USER)
# Create the invite.
membername = NO_ACCESS_USER
response = self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='owners',
membername=membername))
self.assertEquals(True, response['invited'])
# Login as the user.
self.login(membername)
# Accept the invite.
user = model.user.get_user(membername)
invites = list(model.team.lookup_team_invites(user))
self.assertEquals(1, len(invites))
self.putJsonResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token))
# Verify the user is now on the team.
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='owners'))
self.assertInTeam(json, membername)
# Verify the accept now fails.
self.putResponse(TeamMemberInvite,
params=dict(code=invites[0].invite_token),
expected_code=400)
def test_accept_via_email(self):
self.login(ADMIN_ACCESS_USER)
# Create the invite.
member = model.user.get_user(NO_ACCESS_USER)
response = self.putJsonResponse(InviteTeamMember,
params=dict(orgname=ORGANIZATION, teamname='owners',
email=member.email))
self.assertEquals(True, response['invited'])
# Login as the user.
self.login(member.username)
# Accept the invite.
invites = list(model.team.lookup_team_invites_by_email(member.email))
self.assertEquals(1, len(invites))
self.putJsonResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token))
# Verify the user is now on the team.
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='owners'))
self.assertInTeam(json, member.username)
# Verify the accept now fails.
self.putResponse(TeamMemberInvite,
params=dict(code=invites[0].invite_token),
expected_code=400)
def test_accept_invite_different_user(self):
self.login(ADMIN_ACCESS_USER)
# Create the invite.
response = self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='owners',
membername=NO_ACCESS_USER))
self.assertEquals(True, response['invited'])
# Login as a different user.
self.login(PUBLIC_USER)
# Try to accept the invite.
user = model.user.get_user(NO_ACCESS_USER)
invites = list(model.team.lookup_team_invites(user))
self.assertEquals(1, len(invites))
self.putResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token),
expected_code=400)
# Ensure the invite is still valid.
user = model.user.get_user(NO_ACCESS_USER)
invites = list(model.team.lookup_team_invites(user))
self.assertEquals(1, len(invites))
# Ensure the user is *not* a member of the team.
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='owners'))
self.assertNotInTeam(json, PUBLIC_USER)
def test_accept_invite_different_email(self):
self.login(ADMIN_ACCESS_USER)
# Create the invite.
response = self.putJsonResponse(InviteTeamMember,
params=dict(orgname=ORGANIZATION, teamname='owners',
email='someemail@example.com'))
self.assertEquals(True, response['invited'])
# Login as a different user.
self.login(PUBLIC_USER)
# Try to accept the invite.
invites = list(model.team.lookup_team_invites_by_email('someemail@example.com'))
self.assertEquals(1, len(invites))
self.putResponse(TeamMemberInvite, params=dict(code=invites[0].invite_token),
expected_code=400)
# Ensure the invite is still valid.
invites = list(model.team.lookup_team_invites_by_email('someemail@example.com'))
self.assertEquals(1, len(invites))
# Ensure the user is *not* a member of the team.
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='owners'))
self.assertNotInTeam(json, PUBLIC_USER)
class TestDeclineTeamMemberInvite(ApiTestCase):
def test_decline_wronguser(self):
self.login(ADMIN_ACCESS_USER)
# Create the invite.
membername = NO_ACCESS_USER
response = self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='owners',
membername=membername))
self.assertEquals(True, response['invited'])
# Try to decline the invite.
user = model.user.get_user(membername)
invites = list(model.team.lookup_team_invites(user))
self.assertEquals(1, len(invites))
self.deleteResponse(TeamMemberInvite,
params=dict(code=invites[0].invite_token),
expected_code=400)
def test_decline(self):
self.login(ADMIN_ACCESS_USER)
# Create the invite.
membername = NO_ACCESS_USER
response = self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='owners',
membername=membername))
self.assertEquals(True, response['invited'])
# Login as the user.
self.login(membername)
# Decline the invite.
user = model.user.get_user(membername)
invites = list(model.team.lookup_team_invites(user))
self.assertEquals(1, len(invites))
self.deleteEmptyResponse(TeamMemberInvite,
params=dict(code=invites[0].invite_token))
# Make sure the invite was deleted.
self.deleteResponse(TeamMemberInvite,
params=dict(code=invites[0].invite_token),
expected_code=400)
class TestDeleteOrganizationTeamMember(ApiTestCase):
def test_deletememberinvite(self):
self.login(ADMIN_ACCESS_USER)
# Verify the initial member count
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers',
includePending=True))
self.assertEquals(len(json['members']), 3)
membername = NO_ACCESS_USER
response = self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='readers',
membername=membername))
self.assertEquals(True, response['invited'])
# Verify the invite was added.
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers',
includePending=True))
self.assertEquals(len(json['members']), 4)
# Delete the invite.
self.deleteEmptyResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='readers',
membername=membername))
# Verify the user was removed from the team.
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers',
includePending=True))
self.assertEquals(len(json['members']), 3)
def test_deletemember(self):
self.login(ADMIN_ACCESS_USER)
self.deleteEmptyResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='readers',
membername=READ_ACCESS_USER))
# Verify the user was removed from the team.
json = self.getJsonResponse(TeamMemberList,
params=dict(orgname=ORGANIZATION,
teamname='readers'))
self.assertEquals(len(json['members']), 1)
class TestCreateRepo(ApiTestCase):
def test_invalidreponame(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(RepositoryList,
data=dict(repository='some/repo',
visibility='public',
description=''),
expected_code=400)
self.assertEquals('Invalid repository name', json['detail'])
def test_duplicaterepo(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(RepositoryList,
data=dict(repository='simple',
visibility='public',
description=''),
expected_code=400)
self.assertEquals('Repository already exists', json['detail'])
def test_createrepo(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(RepositoryList,
data=dict(repository='newrepo',
visibility='public',
description=''),
expected_code=201)
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
self.assertEquals('newrepo', json['name'])
def test_createrepo_underorg(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(RepositoryList,
data=dict(namespace=ORGANIZATION,
repository='newrepo',
visibility='private',
description=''),
expected_code=201)
self.assertEquals(ORGANIZATION, json['namespace'])
self.assertEquals('newrepo', json['name'])
class TestListRepos(ApiTestCase):
def test_listrepos_asguest(self):
# Queries: Base + the list query
with assert_query_count(BASE_QUERY_COUNT + 1):
json = self.getJsonResponse(RepositoryList, params=dict(public=True))
self.assertEquals(len(json['repositories']), 1)
def assertPublicRepos(self, has_extras=False):
public_user = model.user.get_user('public')
# Delete all existing repos under the namespace.
for repo in list(RepositoryTable.select().where(RepositoryTable.namespace_user == public_user)):
model.repository.purge_repository(public_user.username, repo.name)
# Add public repos until we have enough for a few pages.
required = set()
for i in range(0, REPOS_PER_PAGE * 3):
name = 'publicrepo%s' % i
model.repository.create_repository('public', name, public_user,
visibility='public')
required.add(name)
# Request results until we no longer have any.
next_page = None
while True:
json = self.getJsonResponse(RepositoryList, params=dict(public=True, next_page=next_page))
for repo in json['repositories']:
name = repo['name']
if name in required:
required.remove(name)
else:
self.assertTrue(has_extras, "Could not find name %s in repos created" % name)
if 'next_page' in json:
self.assertEquals(len(json['repositories']), REPOS_PER_PAGE)
else:
break
next_page = json['next_page']
def test_listrepos_asguest_withpages(self):
self.assertPublicRepos()
def test_listrepos_asorgmember_withpages(self):
self.login(READ_ACCESS_USER)
self.assertPublicRepos(has_extras=True)
def test_listrepos_filter(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(RepositoryList,
params=dict(namespace=ORGANIZATION,
public=False))
self.assertGreater(len(json['repositories']), 0)
for repo in json['repositories']:
self.assertEquals(ORGANIZATION, repo['namespace'])
def test_listrepos_allparams(self):
# Add a repository action count entry for one of the org repos.
repo = model.repository.get_repository(ORGANIZATION, ORG_REPO)
RepositoryActionCount.create(repository=repo, count=10, date=datetime.datetime.utcnow())
self.login(ADMIN_ACCESS_USER)
# Queries: Base + the list query + the popularity and last modified queries + full perms load
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 4):
json = self.getJsonResponse(RepositoryList,
params=dict(namespace=ORGANIZATION,
public=False,
last_modified=True,
popularity=True))
self.assertGreater(len(json['repositories']), 0)
for repository in json['repositories']:
self.assertEquals(ORGANIZATION, repository['namespace'])
if repository['name'] == ORG_REPO:
self.assertGreater(repository['popularity'], 0)
def test_listrepos_starred_nouser(self):
self.getResponse(RepositoryList,
params=dict(last_modified=True,
popularity=True,
starred=True),
expected_code=400)
def test_listrepos_starred(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(RepositoryList,
params=dict(last_modified=True,
popularity=True,
starred=True))
self.assertTrue(len(json['repositories']) > 0)
for repo in json['repositories']:
self.assertTrue(repo['is_starred'])
def test_listrepos_asguest_allparams(self):
json = self.getJsonResponse(RepositoryList,
params=dict(namespace=ORGANIZATION,
public=False,
last_modified=True))
for repo in json['repositories']:
self.assertEquals(ORGANIZATION, repo['namespace'])
def assertRepositoryVisible(self, namespace, name):
json = self.getJsonResponse(RepositoryList,
params=dict(namespace=namespace,
public=False))
self.assertEquals(1, len(json['repositories']))
self.assertEquals(name, json['repositories'][0]['name'])
def assertRepositoryNotVisible(self, namespace, name):
json = self.getJsonResponse(RepositoryList, params=dict(namespace=namespace, public=False))
for repo in json['repositories']:
self.assertNotEquals(name, repo['name'])
json = self.getJsonResponse(RepositoryList, params=dict(starred=True))
for repo in json['repositories']:
self.assertNotEquals(name, repo['name'])
def test_listrepos_starred_filtered(self):
admin_user = model.user.get_user(ADMIN_ACCESS_USER)
reader_user = model.user.get_user(READ_ACCESS_USER)
# Create a new organization.
new_org = model.organization.create_organization('neworg', 'neworg@devtable.com', admin_user)
admin_team = model.team.create_team('admin', new_org, 'admin')
# Add a repository to the organization.
repo = model.repository.create_repository('neworg', 'somerepo', admin_user)
with self.add_to_team_temporarily(reader_user, admin_team):
# Star the repository for the user.
model.repository.star_repository(reader_user, repo)
# Verify that the user cannot see the repo, since they are no longer allowed to do so.
self.login(READ_ACCESS_USER)
self.assertRepositoryNotVisible('neworg', 'somerepo')
@contextmanager
def add_to_team_temporarily(self, user, team):
model.team.add_user_to_team(user, team)
yield
model.team.remove_user_from_team(team.organization.username, team.name, user.username,
ADMIN_ACCESS_USER)
def test_listrepos_org_filtered(self):
admin_user = model.user.get_user(ADMIN_ACCESS_USER)
reader_user = model.user.get_user(READ_ACCESS_USER)
# Create a new organization.
new_org = model.organization.create_organization('neworg', 'neworg@devtable.com', admin_user)
admin_team = model.team.create_team('admin', new_org, 'admin')
creator_team = model.team.create_team('creators', new_org, 'creator')
member_team = model.team.create_team('members', new_org, 'member')
# Add a repository to the organization.
model.repository.create_repository('neworg', 'somerepo', admin_user)
# Verify that the admin user can view it.
self.login(ADMIN_ACCESS_USER)
self.assertRepositoryVisible('neworg', 'somerepo')
# Add reader to a creator team under the org and verify they *cannot* see the repository.
with self.add_to_team_temporarily(reader_user, creator_team):
self.login(READ_ACCESS_USER)
self.assertRepositoryNotVisible('neworg', 'somerepo')
# Add reader to a member team under the org and verify they *cannot* see the repository.
with self.add_to_team_temporarily(reader_user, member_team):
self.login(READ_ACCESS_USER)
self.assertRepositoryNotVisible('neworg', 'somerepo')
# Add reader to an admin team under the org and verify they *can* see the repository.
with self.add_to_team_temporarily(reader_user, admin_team):
self.login(READ_ACCESS_USER)
self.assertRepositoryVisible('neworg', 'somerepo')
# Verify that the public user cannot see the repository.
self.login(PUBLIC_USER)
self.assertRepositoryNotVisible('neworg', 'somerepo')
class TestViewPublicRepository(ApiTestCase):
def test_normalview(self):
resp = self.getJsonResponse(Repository, params=dict(repository='public/publicrepo'))
self.assertFalse('stats' in resp)
def test_normalview_withstats(self):
resp = self.getJsonResponse(Repository, params=dict(repository='public/publicrepo', includeStats=True))
self.assertTrue('stats' in resp)
def test_anon_access_disabled(self):
import features
features.ANONYMOUS_ACCESS = False
try:
self.getResponse(Repository, params=dict(repository='public/publicrepo'), expected_code=401)
finally:
features.ANONYMOUS_ACCESS = True
class TestUpdateRepo(ApiTestCase):
SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple'
def test_updatedescription(self):
self.login(ADMIN_ACCESS_USER)
self.putJsonResponse(Repository,
params=dict(repository=self.SIMPLE_REPO),
data=dict(description='Some cool repo'))
# Verify the repo description was updated.
json = self.getJsonResponse(Repository,
params=dict(repository=self.SIMPLE_REPO))
self.assertEquals('Some cool repo', json['description'])
class TestChangeRepoVisibility(ApiTestCase):
SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple'
def test_trychangevisibility(self):
self.login(ADMIN_ACCESS_USER)
# Make public.
self.postJsonResponse(RepositoryVisibility,
params=dict(repository=self.SIMPLE_REPO),
data=dict(visibility='public'))
# Verify the visibility.
json = self.getJsonResponse(Repository,
params=dict(repository=self.SIMPLE_REPO))
self.assertEquals(True, json['is_public'])
# Change the subscription of the namespace.
self.putJsonResponse(UserPlan, data=dict(plan='personal-30'))
# Try to make private.
self.postJsonResponse(RepositoryVisibility,
params=dict(repository=self.SIMPLE_REPO),
data=dict(visibility='private'),
expected_code=402)
# Verify the visibility.
json = self.getJsonResponse(Repository,
params=dict(repository=self.SIMPLE_REPO))
self.assertEquals(True, json['is_public'])
def test_changevisibility(self):
self.login(ADMIN_ACCESS_USER)
# Make public.
self.postJsonResponse(RepositoryVisibility,
params=dict(repository=self.SIMPLE_REPO),
data=dict(visibility='public'))
# Verify the visibility.
json = self.getJsonResponse(Repository,
params=dict(repository=self.SIMPLE_REPO))
self.assertEquals(True, json['is_public'])
# Make private.
self.postJsonResponse(RepositoryVisibility,
params=dict(repository=self.SIMPLE_REPO),
data=dict(visibility='private'))
# Verify the visibility.
json = self.getJsonResponse(Repository,
params=dict(repository=self.SIMPLE_REPO))
self.assertEquals(False, json['is_public'])
class log_queries(object):
def __init__(self, query_filter=None):
self.filter = query_filter
def get_queries(self):
queries = [q.msg[0] for q in self._handler.queries]
if self.filter:
queries = [q for q in queries if re.match(self.filter, q)]
return queries
def __enter__(self):
logger = logging.getLogger('peewee')
self._handler = _QueryLogHandler()
logger.setLevel(logging.DEBUG)
logger.addHandler(self._handler)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
logger = logging.getLogger('peewee')
logger.removeHandler(self._handler)
class check_transitive_deletes(log_queries):
def __init__(self):
super(check_transitive_deletes, self).__init__(query_filter=r'^DELETE.+IN \(SELECT.+$')
def __exit__(self, exc_type, exc_val, exc_tb):
super(check_transitive_deletes, self).__exit__(exc_type, exc_val, exc_tb)
queries = self.get_queries()
if queries:
raise Exception('Detected transitive deletion in queries: %s' % queries)
class TestDeleteRepository(ApiTestCase):
SIMPLE_REPO = ADMIN_ACCESS_USER + '/simple'
COMPLEX_REPO = ADMIN_ACCESS_USER + '/complex'
def test_deleterepo(self):
self.login(ADMIN_ACCESS_USER)
# Verify the repo exists.
self.getResponse(Repository,
params=dict(repository=self.SIMPLE_REPO))
# Add a build queue item for the repo.
dockerfile_build_queue.put([ADMIN_ACCESS_USER, 'simple'], '{}')
# Delete the repository.
self.deleteEmptyResponse(Repository, params=dict(repository=self.SIMPLE_REPO))
# Ensure the queue item is gone.
self.assertIsNone(dockerfile_build_queue.get())
# Verify the repo was deleted.
self.getResponse(Repository,
params=dict(repository=self.SIMPLE_REPO),
expected_code=404)
def test_verify_queue_removal(self):
self.login(ADMIN_ACCESS_USER)
# Verify the repo exists.
self.getResponse(Repository,
params=dict(repository=self.SIMPLE_REPO))
# Add a build queue item for the repo and another repo.
dockerfile_build_queue.put([ADMIN_ACCESS_USER, 'simple'], '{}', available_after=-1)
dockerfile_build_queue.put([ADMIN_ACCESS_USER, 'anotherrepo'], '{}', available_after=-1)
# Delete the repository.
self.deleteEmptyResponse(Repository, params=dict(repository=self.SIMPLE_REPO))
# Ensure the other queue item is still present.
self.assertIsNotNone(dockerfile_build_queue.get())
def test_deleterepo2(self):
self.login(ADMIN_ACCESS_USER)
# Verify the repo exists.
self.getResponse(Repository,
params=dict(repository=self.COMPLEX_REPO))
self.deleteEmptyResponse(Repository, params=dict(repository=self.COMPLEX_REPO))
# Verify the repo was deleted.
self.getResponse(Repository,
params=dict(repository=self.COMPLEX_REPO),
expected_code=404)
def test_populate_and_delete_repo(self):
self.login(ADMIN_ACCESS_USER)
# Verify the repo exists.
self.getResponse(Repository,
params=dict(repository=self.COMPLEX_REPO))
# Make sure the repository has some images and tags.
self.assertTrue(len(list(model.image.get_repository_images(ADMIN_ACCESS_USER, 'complex'))) > 0)
self.assertTrue(len(list(model.tag.list_repository_tags(ADMIN_ACCESS_USER, 'complex'))) > 0)
# Add some data for the repository, in addition to is already existing images and tags.
repository = model.repository.get_repository(ADMIN_ACCESS_USER, 'complex')
# Create some access tokens.
access_token = model.token.create_access_token(repository, 'read')
model.token.create_access_token(repository, 'write')
delegate_token = model.token.create_delegate_token(ADMIN_ACCESS_USER, 'complex', 'sometoken',
'read')
model.token.create_delegate_token(ADMIN_ACCESS_USER, 'complex', 'sometoken', 'write')
# Create some repository builds.
model.build.create_repository_build(repository, access_token, {}, 'someid', 'foobar')
model.build.create_repository_build(repository, delegate_token, {}, 'someid2', 'foobar2')
# Create some notifications.
model.notification.create_repo_notification(repository, 'repo_push', 'hipchat', {}, {})
model.notification.create_repo_notification(repository, 'build_queued', 'slack', {}, {})
# Create some logs.
model.log.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository)
model.log.log_action('push_repo', ADMIN_ACCESS_USER, repository=repository)
# Create some build triggers.
user = model.user.get_user(ADMIN_ACCESS_USER)
model.build.create_build_trigger(repository, 'github', 'sometoken', user)
model.build.create_build_trigger(repository, 'github', 'anothertoken', user)
# Create some email authorizations.
model.repository.create_email_authorization_for_repo(ADMIN_ACCESS_USER, 'complex', 'a@b.com')
model.repository.create_email_authorization_for_repo(ADMIN_ACCESS_USER, 'complex', 'b@c.com')
# Create some repository action count entries.
RepositoryActionCount.create(repository=repository, date=datetime.datetime.now(), count=1)
RepositoryActionCount.create(repository=repository,
date=datetime.datetime.now() - datetime.timedelta(days=2), count=2)
RepositoryActionCount.create(repository=repository,
date=datetime.datetime.now() - datetime.timedelta(days=5), count=6)
# Create some labels.
pre_delete_label_count = database.Label.select().count()
tag_manifest = model.tag.load_tag_manifest(ADMIN_ACCESS_USER, 'complex', 'prod')
model.label.create_manifest_label(tag_manifest, 'foo', 'bar', 'manifest')
model.label.create_manifest_label(tag_manifest, 'foo', 'baz', 'manifest')
model.label.create_manifest_label(tag_manifest, 'something', '{}', 'api',
media_type_name='application/json')
model.label.create_manifest_label(tag_manifest, 'something', '{"some": "json"}', 'manifest')
# Delete the repository.
with check_transitive_deletes():
self.deleteEmptyResponse(Repository, params=dict(repository=self.COMPLEX_REPO))
# Verify the repo was deleted.
self.getResponse(Repository,
params=dict(repository=self.COMPLEX_REPO),
expected_code=404)
# Verify the labels are gone.
post_delete_label_count = database.Label.select().count()
self.assertEquals(post_delete_label_count, pre_delete_label_count)
class TestGetRepository(ApiTestCase):
PUBLIC_REPO = PUBLIC_USER + '/publicrepo'
def test_getrepo_badnames(self):
self.login(ADMIN_ACCESS_USER)
bad_names = ['logs', 'build', 'tokens', 'foo.bar', 'foo-bar', 'foo_bar']
# For each bad name, create the repo.
for bad_name in bad_names:
json = self.postJsonResponse(RepositoryList, expected_code=201,
data=dict(repository=bad_name, visibility='public',
description=''))
# Make sure we can retrieve its information.
json = self.getJsonResponse(Repository,
params=dict(repository=ADMIN_ACCESS_USER + '/' + bad_name))
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
self.assertEquals(bad_name, json['name'])
self.assertEquals(True, json['is_public'])
def test_getrepo_public_asguest(self):
json = self.getJsonResponse(Repository,
params=dict(repository=self.PUBLIC_REPO))
self.assertEquals(PUBLIC_USER, json['namespace'])
self.assertEquals('publicrepo', json['name'])
self.assertEquals(True, json['is_public'])
self.assertEquals(False, json['is_organization'])
self.assertEquals(False, json['can_write'])
self.assertEquals(False, json['can_admin'])
assert 'latest' in json['tags']
def test_getrepo_public_asowner(self):
self.login(PUBLIC_USER)
json = self.getJsonResponse(Repository,
params=dict(repository=self.PUBLIC_REPO))
self.assertEquals(False, json['is_organization'])
self.assertEquals(True, json['can_write'])
self.assertEquals(True, json['can_admin'])
def test_getrepo_building(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(Repository,
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
self.assertEquals(True, json['can_write'])
self.assertEquals(True, json['can_admin'])
self.assertEquals(False, json['is_organization'])
def test_getrepo_org_asnonmember(self):
self.getResponse(Repository,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO),
expected_code=401)
def test_getrepo_org_asreader(self):
self.login(READ_ACCESS_USER)
json = self.getJsonResponse(Repository,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO))
self.assertEquals(ORGANIZATION, json['namespace'])
self.assertEquals(ORG_REPO, json['name'])
self.assertEquals(False, json['can_write'])
self.assertEquals(False, json['can_admin'])
self.assertEquals(True, json['is_organization'])
def test_getrepo_org_asadmin(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(Repository,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO))
self.assertEquals(True, json['can_write'])
self.assertEquals(True, json['can_admin'])
self.assertEquals(True, json['is_organization'])
class TestRepositoryBuildResource(ApiTestCase):
def test_repo_build_invalid_url(self):
self.login(ADMIN_ACCESS_USER)
self.postJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(archive_url='hppt://quay.io'),
expected_code=400)
def test_cancel_invalidbuild(self):
self.login(ADMIN_ACCESS_USER)
self.deleteResponse(RepositoryBuildResource,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid='invalid'),
expected_code=404)
def test_cancel_waitingbuild(self):
self.login(ADMIN_ACCESS_USER)
# Request a (fake) build.
json = self.postJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz'),
expected_code=201)
uuid = json['id']
# Check for the build.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
self.assertEquals(1, len(json['builds']))
self.assertEquals(uuid, json['builds'][0]['id'])
# Find the build's queue item.
build_ref = database.RepositoryBuild.get(uuid=uuid)
queue_item = database.QueueItem.get(id=build_ref.queue_id)
self.assertTrue(queue_item.available)
self.assertTrue(queue_item.retries_remaining > 0)
# Cancel the build.
self.deleteResponse(RepositoryBuildResource,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid=uuid),
expected_code=201)
# Check for the build.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
self.assertEquals(1, len(json['builds']))
self.assertEquals('cancelled', json['builds'][0]['phase'])
# Check for the build's queue item.
try:
database.QueueItem.get(id=build_ref.queue_id)
self.fail('QueueItem still exists for build')
except database.QueueItem.DoesNotExist:
pass
def test_attemptcancel_scheduledbuild(self):
self.login(ADMIN_ACCESS_USER)
# Request a (fake) build.
json = self.postJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz'),
expected_code=201)
uuid = json['id']
# Check for the build.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
self.assertEquals(1, len(json['builds']))
self.assertEquals(uuid, json['builds'][0]['id'])
# Set queue item to be picked up.
build_ref = database.RepositoryBuild.get(uuid=uuid)
qi = database.QueueItem.get(id=build_ref.queue_id)
qi.available = False
qi.save()
# Try to cancel the build.
self.deleteResponse(RepositoryBuildResource,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid=uuid),
expected_code=201)
def test_attemptcancel_workingbuild(self):
self.login(ADMIN_ACCESS_USER)
# Request a (fake) build.
json = self.postJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz'),
expected_code=201)
uuid = json['id']
# Check for the build.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
self.assertEquals(1, len(json['builds']))
self.assertEquals(uuid, json['builds'][0]['id'])
# Set the build to a different phase.
rb = database.RepositoryBuild.get(uuid=uuid)
rb.phase = database.BUILD_PHASE.BUILDING
rb.save()
# Try to cancel the build.
self.deleteResponse(RepositoryBuildResource,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', build_uuid=uuid),
expected_code=400)
class TestRepoBuilds(ApiTestCase):
def test_getrepo_nobuilds(self):
self.login(ADMIN_ACCESS_USER)
# Queries: Permission + the list query + app check
with assert_query_count(3):
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
assert len(json['builds']) == 0
def test_getrepobuilds(self):
self.login(ADMIN_ACCESS_USER)
# Queries: Permission + the list query + app check
with assert_query_count(3):
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
assert len(json['builds']) > 0
build = json['builds'][-1]
assert 'id' in build
assert 'status' in build
# Check the status endpoint.
status_json = self.getJsonResponse(RepositoryBuildStatus,
params=dict(repository=ADMIN_ACCESS_USER + '/building',
build_uuid=build['id']))
self.assertEquals(status_json['id'], build['id'])
self.assertEquals(status_json['resource_key'], build['resource_key'])
self.assertEquals(status_json['trigger'], build['trigger'])
class TestRequestRepoBuild(ApiTestCase):
def test_requestbuild_noidurl(self):
self.login(ADMIN_ACCESS_USER)
# Request a (fake) build without a file ID or URL.
self.postResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(),
expected_code=400)
def test_requestbuild_invalidurls(self):
self.login(ADMIN_ACCESS_USER)
# Request a (fake) build with and invalid URL.
self.postResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(archive_url='foobarbaz'),
expected_code=400)
self.postResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(archive_url='file://foobarbaz'),
expected_code=400)
def test_requestrepobuild_withurl(self):
self.login(ADMIN_ACCESS_USER)
# Ensure we are not yet building.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
assert len(json['builds']) == 0
# Request a (fake) build.
self.postResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(archive_url='http://quay.io/robots.txt'),
expected_code=201)
# Check for the build.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
assert len(json['builds']) > 0
self.assertEquals('http://quay.io/robots.txt', json['builds'][0]['archive_url'])
def test_requestrepobuild_withfile(self):
self.login(ADMIN_ACCESS_USER)
# Ensure we are not yet building.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
assert len(json['builds']) == 0
# Request a (fake) build.
self.postResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz'),
expected_code=201)
# Check for the build.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
assert len(json['builds']) > 0
def test_requestrepobuild_with_robot(self):
self.login(ADMIN_ACCESS_USER)
# Ensure we are not yet building.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
assert len(json['builds']) == 0
# Request a (fake) build.
pull_robot = ADMIN_ACCESS_USER + '+dtrobot'
self.postResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz', pull_robot=pull_robot),
expected_code=201)
# Check for the build.
json = self.getJsonResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
assert len(json['builds']) > 0
def test_requestrepobuild_with_invalid_robot(self):
self.login(ADMIN_ACCESS_USER)
# Request a (fake) build.
pull_robot = ADMIN_ACCESS_USER + '+invalidrobot'
self.postResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz', pull_robot=pull_robot),
expected_code=404)
def test_requestrepobuild_with_unauthorized_robot(self):
self.login(ADMIN_ACCESS_USER)
# Request a (fake) build.
pull_robot = 'freshuser+anotherrobot'
self.postResponse(RepositoryBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(file_id='foobarbaz', pull_robot=pull_robot),
expected_code=403)
class TestRepositoryEmail(ApiTestCase):
def test_emailnotauthorized(self):
self.login(ADMIN_ACCESS_USER)
# Verify the e-mail address is not authorized.
self.getResponse(RepositoryAuthorizedEmail,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
email='test@example.com'),
expected_code=404)
def test_emailnotauthorized_butsent(self):
self.login(ADMIN_ACCESS_USER)
# Verify the e-mail address is not authorized.
json = self.getJsonResponse(RepositoryAuthorizedEmail,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
email='jschorr+other@devtable.com'))
self.assertEquals(False, json['confirmed'])
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
self.assertEquals('simple', json['repository'])
def test_emailauthorized(self):
self.login(ADMIN_ACCESS_USER)
# Verify the e-mail address is authorized.
json = self.getJsonResponse(RepositoryAuthorizedEmail,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
email='jschorr@devtable.com'))
self.assertEquals(True, json['confirmed'])
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
self.assertEquals('simple', json['repository'])
def test_send_email_authorization(self):
self.login(ADMIN_ACCESS_USER)
# Send the email.
json = self.postJsonResponse(RepositoryAuthorizedEmail,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
email='jschorr+foo@devtable.com'))
self.assertEquals(False, json['confirmed'])
self.assertEquals(ADMIN_ACCESS_USER, json['namespace'])
self.assertEquals('simple', json['repository'])
class TestRepositoryNotifications(ApiTestCase):
def test_testnotification(self):
self.login(ADMIN_ACCESS_USER)
# Add a notification.
json = self.postJsonResponse(RepositoryNotificationList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(config={'url': 'http://example.com'}, event='repo_push',
method='webhook', eventConfig={}),
expected_code=201)
uuid = json['uuid']
self.assertIsNone(notification_queue.get())
# Issue a test notification.
self.postJsonResponse(TestRepositoryNotification,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=uuid))
# Ensure the item is in the queue.
time.sleep(1) # Makes sure the queue get works on MySQL with its second-level precision.
found = notification_queue.get()
self.assertIsNotNone(found)
self.assertTrue('notification_uuid' in found['body'])
def test_webhooks(self):
self.login(ADMIN_ACCESS_USER)
# Add a notification.
json = self.postJsonResponse(RepositoryNotificationList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(config={'url': 'http://example.com'}, event='repo_push',
method='webhook', eventConfig={}),
expected_code=201)
self.assertEquals('repo_push', json['event'])
self.assertEquals('webhook', json['method'])
self.assertEquals('http://example.com', json['config']['url'])
self.assertIsNone(json['title'])
wid = json['uuid']
# Get the notification.
json = self.getJsonResponse(RepositoryNotification,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=wid))
self.assertEquals(wid, json['uuid'])
self.assertEquals('repo_push', json['event'])
self.assertEquals('webhook', json['method'])
self.assertIsNone(json['title'])
# Verify the notification is listed.
json = self.getJsonResponse(RepositoryNotificationList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
ids = [w['uuid'] for w in json['notifications']]
assert wid in ids
# Delete the notification.
self.deleteEmptyResponse(RepositoryNotification,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=wid),
expected_code=204)
# Verify the notification is gone.
self.getResponse(RepositoryNotification,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=wid),
expected_code=404)
# Add another notification.
json = self.postJsonResponse(RepositoryNotificationList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(config={'url': 'http://example.com'}, event='repo_push',
method='webhook', title='Some Notification',
eventConfig={}),
expected_code=201)
self.assertEquals('repo_push', json['event'])
self.assertEquals('webhook', json['method'])
self.assertEquals('http://example.com', json['config']['url'])
self.assertEquals('Some Notification', json['title'])
wid = json['uuid']
# Get the notification.
json = self.getJsonResponse(RepositoryNotification,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', uuid=wid))
self.assertEquals(wid, json['uuid'])
self.assertEquals('repo_push', json['event'])
self.assertEquals('webhook', json['method'])
self.assertEquals('Some Notification', json['title'])
class TestListAndGetImage(ApiTestCase):
def test_listandgetimages(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(RepositoryImageList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
assert len(json['images']) > 0
for image in json['images']:
assert 'id' in image
assert 'tags' in image
assert 'created' in image
assert 'comment' in image
assert 'command' in image
assert 'ancestors' in image
assert 'size' in image
ijson = self.getJsonResponse(RepositoryImage,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
image_id=image['id']))
self.assertEquals(image['id'], ijson['id'])
class TestGetImageChanges(ApiTestCase):
def test_getimagechanges(self):
self.login(ADMIN_ACCESS_USER)
# Find an image to check.
json = self.getJsonResponse(RepositoryImageList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
image_id = json['images'][0]['id']
# Lookup the image's changes.
# TODO: Fix me once we can get fake changes into the test data
#self.getJsonResponse(RepositoryImageChanges,
# params=dict(repository=ADMIN_ACCESS_USER + '/simple',
# image_id=image_id))
class TestRestoreTag(ApiTestCase):
def test_restoretag_invalidtag(self):
self.login(ADMIN_ACCESS_USER)
self.postResponse(RestoreTag,
params=dict(repository=ADMIN_ACCESS_USER + '/history', tag='invalidtag'),
data=dict(image='invalid_image'),
expected_code=400)
def test_restoretag_invalidimage(self):
self.login(ADMIN_ACCESS_USER)
self.postResponse(RestoreTag,
params=dict(repository=ADMIN_ACCESS_USER + '/history', tag='latest'),
data=dict(image='invalid_image'),
expected_code=400)
def test_restoretag_invalidmanifest(self):
self.login(ADMIN_ACCESS_USER)
self.postResponse(RestoreTag,
params=dict(repository=ADMIN_ACCESS_USER + '/history', tag='latest'),
data=dict(manifest_digest='invalid_digest'),
expected_code=400)
def test_restoretag(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(ListRepositoryTags,
params=dict(repository=ADMIN_ACCESS_USER + '/history',
tag='latest'))
self.assertEquals(2, len(json['tags']))
self.assertFalse('end_ts' in json['tags'][0])
previous_image_id = json['tags'][1]['docker_image_id']
self.postJsonResponse(RestoreTag, params=dict(repository=ADMIN_ACCESS_USER + '/history',
tag='latest'),
data=dict(image=previous_image_id))
json = self.getJsonResponse(ListRepositoryTags,
params=dict(repository=ADMIN_ACCESS_USER + '/history',
tag='latest'))
self.assertEquals(3, len(json['tags']))
self.assertFalse('end_ts' in json['tags'][0])
self.assertEquals(previous_image_id, json['tags'][0]['docker_image_id'])
def test_restoretag_to_digest(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(ListRepositoryTags,
params=dict(repository=ADMIN_ACCESS_USER + '/history',
tag='latest'))
self.assertEquals(2, len(json['tags']))
self.assertFalse('end_ts' in json['tags'][0])
previous_manifest = json['tags'][1]['manifest_digest']
self.postJsonResponse(RestoreTag, params=dict(repository=ADMIN_ACCESS_USER + '/history',
tag='latest'),
data=dict(image='foo', manifest_digest=previous_manifest))
json = self.getJsonResponse(ListRepositoryTags,
params=dict(repository=ADMIN_ACCESS_USER + '/history',
tag='latest'))
self.assertEquals(3, len(json['tags']))
self.assertFalse('end_ts' in json['tags'][0])
self.assertEquals(previous_manifest, json['tags'][0]['manifest_digest'])
class TestListAndDeleteTag(ApiTestCase):
def test_invalid_tags(self):
self.login(ADMIN_ACCESS_USER)
# List the images for staging.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex',
tag='staging'))
staging_images = json['images']
# Try to add some invalid tags.
self.putResponse(RepositoryTag,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='-fail'),
data=dict(image=staging_images[0]['id']),
expected_code=400)
self.putResponse(RepositoryTag,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='北京'),
data=dict(image=staging_images[0]['id']),
expected_code=400)
def test_listdeletecreateandmovetag(self):
self.login(ADMIN_ACCESS_USER)
# List the images for prod.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'))
prod_images = json['images']
assert len(prod_images) > 0
# List the images for staging.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex',
tag='staging'))
staging_images = json['images']
assert len(prod_images) == len(staging_images) + 1
# Delete prod.
self.deleteEmptyResponse(RepositoryTag,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'),
expected_code=204)
# Make sure the tag is gone.
self.getResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'),
expected_code=404)
# Make the sure the staging images are still there.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex',
tag='staging'))
self.assertEquals(staging_images, json['images'])
# Require a valid tag name.
self.putResponse(RepositoryTag,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='-fail'),
data=dict(image=staging_images[0]['id']),
expected_code=400)
# Add a new tag to the staging image.
self.putResponse(RepositoryTag,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'),
data=dict(image=staging_images[0]['id']),
expected_code=201)
# Make sure the tag is present.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex',
tag='sometag'))
sometag_images = json['images']
self.assertEquals(sometag_images, staging_images)
# Move the tag.
self.putResponse(RepositoryTag,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='sometag'),
data=dict(image=staging_images[-1]['id']),
expected_code=201)
# Make sure the tag has moved.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex',
tag='sometag'))
sometag_new_images = json['images']
self.assertEquals(1, len(sometag_new_images))
self.assertEquals(staging_images[-1], sometag_new_images[0])
def test_deletesubtag(self):
self.login(ADMIN_ACCESS_USER)
# List the images for prod.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'))
prod_images = json['images']
assert len(prod_images) > 0
# Delete staging.
self.deleteEmptyResponse(RepositoryTag,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='staging'),
expected_code=204)
# Make sure the prod images are still around.
json = self.getJsonResponse(RepositoryTagImages,
params=dict(repository=ADMIN_ACCESS_USER + '/complex', tag='prod'))
self.assertEquals(prod_images, json['images'])
def test_listtag_digest(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(ListRepositoryTags,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', page=1,
limit=1))
self.assertTrue('manifest_digest' in json['tags'][0])
def test_listtagpagination(self):
self.login(ADMIN_ACCESS_USER)
latest_image = model.tag.get_tag_image(ADMIN_ACCESS_USER, "complex", "prod")
# Create 10 tags in an empty repo.
user = model.user.get_user_or_org(ADMIN_ACCESS_USER)
repo = model.repository.create_repository(ADMIN_ACCESS_USER, "empty", user)
image = model.image.find_create_or_link_image(latest_image.docker_image_id, repo,
ADMIN_ACCESS_USER, {}, ['local_us'])
remaining_tags = set()
for i in xrange(1, 11):
tag_name = "tag" + str(i)
remaining_tags.add(tag_name)
model.tag.create_or_update_tag(ADMIN_ACCESS_USER, "empty", tag_name,
image.docker_image_id)
# Make sure we can iterate over all of them.
json = self.getJsonResponse(ListRepositoryTags,
params=dict(repository=ADMIN_ACCESS_USER + '/empty', page=1,
limit=5))
self.assertEquals(1, json['page'])
self.assertEquals(5, len(json['tags']))
self.assertTrue(json['has_additional'])
names = {tag['name'] for tag in json['tags']}
remaining_tags = remaining_tags - names
self.assertEquals(5, len(remaining_tags))
json = self.getJsonResponse(ListRepositoryTags,
params=dict(repository=ADMIN_ACCESS_USER + '/empty', page=2,
limit=5))
self.assertEquals(2, json['page'])
self.assertEquals(5, len(json['tags']))
self.assertFalse(json['has_additional'])
names = {tag['name'] for tag in json['tags']}
remaining_tags = remaining_tags - names
self.assertEquals(0, len(remaining_tags))
json = self.getJsonResponse(ListRepositoryTags,
params=dict(repository=ADMIN_ACCESS_USER + '/empty', page=3,
limit=5))
self.assertEquals(3, json['page'])
self.assertEquals(0, len(json['tags']))
self.assertFalse(json['has_additional'])
class TestRepoPermissions(ApiTestCase):
def listUserPermissions(self, namespace=ADMIN_ACCESS_USER, repo='simple'):
return self.getJsonResponse(RepositoryUserPermissionList,
params=dict(repository=namespace + '/' + repo))['permissions']
def listTeamPermissions(self):
response = self.getJsonResponse(RepositoryTeamPermissionList,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO))
return response['permissions']
def test_userpermissions_underorg(self):
self.login(ADMIN_ACCESS_USER)
permissions = self.listUserPermissions(namespace=ORGANIZATION, repo=ORG_REPO)
self.assertEquals(1, len(permissions))
assert 'outsideorg' in permissions
self.assertEquals('read', permissions['outsideorg']['role'])
self.assertEquals(False, permissions['outsideorg']['is_org_member'])
# Add another user.
self.putJsonResponse(RepositoryUserPermission,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO,
username=ADMIN_ACCESS_USER),
data=dict(role='admin'))
# Verify the user is present.
permissions = self.listUserPermissions(namespace=ORGANIZATION, repo=ORG_REPO)
self.assertEquals(2, len(permissions))
assert ADMIN_ACCESS_USER in permissions
self.assertEquals('admin', permissions[ADMIN_ACCESS_USER]['role'])
self.assertEquals(True, permissions[ADMIN_ACCESS_USER]['is_org_member'])
def test_userpermissions(self):
self.login(ADMIN_ACCESS_USER)
# The repo should start with just the admin as a user perm.
permissions = self.listUserPermissions()
self.assertEquals(1, len(permissions))
assert ADMIN_ACCESS_USER in permissions
self.assertEquals('admin', permissions[ADMIN_ACCESS_USER]['role'])
self.assertFalse('is_org_member' in permissions[ADMIN_ACCESS_USER])
# Add another user.
self.putJsonResponse(RepositoryUserPermission,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
username=NO_ACCESS_USER),
data=dict(role='read'))
# Verify the user is present.
permissions = self.listUserPermissions()
self.assertEquals(2, len(permissions))
assert NO_ACCESS_USER in permissions
self.assertEquals('read', permissions[NO_ACCESS_USER]['role'])
self.assertFalse('is_org_member' in permissions[NO_ACCESS_USER])
json = self.getJsonResponse(RepositoryUserPermission,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
username=NO_ACCESS_USER))
self.assertEquals('read', json['role'])
# Change the user's permissions.
self.putJsonResponse(RepositoryUserPermission,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
username=NO_ACCESS_USER),
data=dict(role='admin'))
# Verify.
permissions = self.listUserPermissions()
self.assertEquals(2, len(permissions))
assert NO_ACCESS_USER in permissions
self.assertEquals('admin', permissions[NO_ACCESS_USER]['role'])
# Delete the user's permission.
self.deleteEmptyResponse(RepositoryUserPermission,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
username=NO_ACCESS_USER))
# Verify.
permissions = self.listUserPermissions()
self.assertEquals(1, len(permissions))
assert not NO_ACCESS_USER in permissions
def test_teampermissions(self):
self.login(ADMIN_ACCESS_USER)
# The repo should start with just the readers as a team perm.
permissions = self.listTeamPermissions()
self.assertEquals(1, len(permissions))
assert 'readers' in permissions
self.assertEquals('read', permissions['readers']['role'])
# Add another team.
self.putJsonResponse(RepositoryTeamPermission,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'),
data=dict(role='write'))
# Verify the team is present.
permissions = self.listTeamPermissions()
self.assertEquals(2, len(permissions))
assert 'owners' in permissions
self.assertEquals('write', permissions['owners']['role'])
json = self.getJsonResponse(RepositoryTeamPermission,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO,
teamname='owners'))
self.assertEquals('write', json['role'])
# Change the team's permissions.
self.putJsonResponse(RepositoryTeamPermission,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'),
data=dict(role='admin'))
# Verify.
permissions = self.listTeamPermissions()
self.assertEquals(2, len(permissions))
assert 'owners' in permissions
self.assertEquals('admin', permissions['owners']['role'])
# Delete the team's permission.
self.deleteEmptyResponse(RepositoryTeamPermission,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, teamname='owners'))
# Verify.
permissions = self.listTeamPermissions()
self.assertEquals(1, len(permissions))
assert not 'owners' in permissions
class TestApiTokens(ApiTestCase):
def listTokens(self):
return self.getJsonResponse(RepositoryTokenList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))['tokens']
def test_tokens(self):
self.login(ADMIN_ACCESS_USER)
# Create a new token.
json = self.postJsonResponse(RepositoryTokenList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'),
data=dict(role='read', friendlyName='mytoken'),
expected_code=201)
self.assertEquals('mytoken', json['friendlyName'])
self.assertEquals('read', json['role'])
token_code = json['code']
# Verify.
tokens = self.listTokens()
assert token_code in tokens
self.assertEquals('mytoken', tokens[token_code]['friendlyName'])
json = self.getJsonResponse(RepositoryToken,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
code=token_code))
self.assertEquals(tokens[token_code], json)
# Change the token's permission.
self.putJsonResponse(RepositoryToken,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code),
data=dict(role='write'))
# Verify.
json = self.getJsonResponse(RepositoryToken,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
code=token_code))
self.assertEquals('write', json['role'])
# Delete the token.
self.deleteEmptyResponse(RepositoryToken,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
code=token_code))
# Verify.
self.getResponse(RepositoryToken,
params=dict(repository=ADMIN_ACCESS_USER + '/simple', code=token_code),
expected_code=404)
class TestUserCard(ApiTestCase):
def test_getusercard(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(UserCard)
self.assertEquals('4242', json['card']['last4'])
self.assertEquals('Visa', json['card']['type'])
def test_setusercard_error(self):
self.login(ADMIN_ACCESS_USER)
json = self.postJsonResponse(UserCard,
data=dict(token='sometoken'),
expected_code=402)
assert 'carderror' in json
class TestOrgCard(ApiTestCase):
def test_getorgcard(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrganizationCard,
params=dict(orgname=ORGANIZATION))
self.assertEquals('4242', json['card']['last4'])
self.assertEquals('Visa', json['card']['type'])
class TestUserSubscription(ApiTestCase):
def getSubscription(self):
return self.getJsonResponse(UserPlan)
def test_updateplan(self):
self.login(ADMIN_ACCESS_USER)
# Change the plan.
self.putJsonResponse(UserPlan,
data=dict(plan='free'))
# Verify
sub = self.getSubscription()
self.assertEquals('free', sub['plan'])
# Change the plan.
self.putJsonResponse(UserPlan,
data=dict(plan='bus-large-30'))
# Verify
sub = self.getSubscription()
self.assertEquals('bus-large-30', sub['plan'])
class TestOrgSubscription(ApiTestCase):
def getSubscription(self):
return self.getJsonResponse(OrganizationPlan, params=dict(orgname=ORGANIZATION))
def test_updateplan(self):
self.login(ADMIN_ACCESS_USER)
# Change the plan.
self.putJsonResponse(OrganizationPlan,
params=dict(orgname=ORGANIZATION),
data=dict(plan='free'))
# Verify
sub = self.getSubscription()
self.assertEquals('free', sub['plan'])
# Change the plan.
self.putJsonResponse(OrganizationPlan,
params=dict(orgname=ORGANIZATION),
data=dict(plan='bus-large-30'))
# Verify
sub = self.getSubscription()
self.assertEquals('bus-large-30', sub['plan'])
class TestUserRobots(ApiTestCase):
def getRobotNames(self):
return [r['name'] for r in self.getJsonResponse(UserRobotList)['robots']]
def test_robot_list(self):
self.login(NO_ACCESS_USER)
# Create some robots.
self.putJsonResponse(UserRobot,
params=dict(robot_shortname='bender'),
expected_code=201)
self.putJsonResponse(UserRobot,
params=dict(robot_shortname='goldy'),
expected_code=201)
self.putJsonResponse(UserRobot,
params=dict(robot_shortname='coolbot'),
expected_code=201)
# Queries: Base + the lookup query
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 1):
self.getJsonResponse(UserRobotList)
# Queries: Base + the lookup query
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 1):
self.getJsonResponse(UserRobotList, params=dict(permissions=True))
def test_robots(self):
self.login(NO_ACCESS_USER)
# Create a robot.
json = self.putJsonResponse(UserRobot,
params=dict(robot_shortname='bender'),
expected_code=201)
self.assertEquals(NO_ACCESS_USER + '+bender', json['name'])
# Verify.
robots = self.getRobotNames()
assert NO_ACCESS_USER + '+bender' in robots
# Delete the robot.
self.deleteEmptyResponse(UserRobot,
params=dict(robot_shortname='bender'))
# Verify.
robots = self.getRobotNames()
assert not NO_ACCESS_USER + '+bender' in robots
def test_regenerate(self):
self.login(NO_ACCESS_USER)
# Create a robot.
json = self.putJsonResponse(UserRobot,
params=dict(robot_shortname='bender'),
expected_code=201)
token = json['token']
# Regenerate the robot.
json = self.postJsonResponse(RegenerateUserRobot, params=dict(robot_shortname='bender'),
expected_code=200)
# Verify the token changed.
self.assertNotEquals(token, json['token'])
json2 = self.getJsonResponse(UserRobot,
params=dict(robot_shortname='bender'),
expected_code=200)
self.assertEquals(json['token'], json2['token'])
class TestOrgRobots(ApiTestCase):
def getRobotNames(self):
return [r['name'] for r in self.getJsonResponse(OrgRobotList,
params=dict(orgname=ORGANIZATION))['robots']]
def test_create_robot_with_underscores(self):
self.login(ADMIN_ACCESS_USER)
# Create the robot.
self.putJsonResponse(OrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='mr_bender'),
expected_code=201)
# Add the robot to a team.
membername = ORGANIZATION + '+mr_bender'
self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='readers',
membername=membername))
# Retrieve the robot's details.
self.getJsonResponse(OrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='mr_bender'),
expected_code=200)
def test_delete_robot_after_use(self):
self.login(ADMIN_ACCESS_USER)
# Create the robot.
self.putJsonResponse(OrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='bender'),
expected_code=201)
# Add the robot to a team.
membername = ORGANIZATION + '+bender'
self.putJsonResponse(TeamMember,
params=dict(orgname=ORGANIZATION, teamname='readers',
membername=membername))
# Add a repository permission.
self.putJsonResponse(RepositoryUserPermission,
params=dict(repository=ORGANIZATION + '/' + ORG_REPO, username=membername),
data=dict(role='read'))
# Add a permission prototype with the robot as the activating user.
self.postJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION),
data=dict(role='read',
activating_user={'name': membername},
delegate={'kind': 'user',
'name': membername}))
# Add a permission prototype with the robot as the delegating user.
self.postJsonResponse(PermissionPrototypeList, params=dict(orgname=ORGANIZATION),
data=dict(role='read',
delegate={'kind': 'user',
'name': membername}))
# Add a build trigger with the robot as the pull robot.
database.BuildTriggerService.create(name='fakeservice')
# Add a new fake trigger.
repo = model.repository.get_repository(ORGANIZATION, ORG_REPO)
user = model.user.get_user(ADMIN_ACCESS_USER)
pull_robot = model.user.get_user(membername)
trigger = model.build.create_build_trigger(repo, 'fakeservice', 'sometoken', user,
pull_robot=pull_robot)
# Add a fake build of the fake build trigger.
token = model.token.create_access_token(repo, 'write', kind='build-worker',
friendly_name='Repository Build Token')
build = model.build.create_repository_build(repo, token, {}, 'fake-dockerfile', 'fake-name',
trigger, pull_robot_name=membername)
# Add some log entries for the robot.
model.log.log_action('pull_repo', ORGANIZATION, performer=pull_robot, repository=repo)
# Delete the robot and verify it works.
self.deleteEmptyResponse(OrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='bender'))
# Verify the build is still present.
self.assertIsNotNone(model.build.get_repository_build(build.uuid))
# All the above records should now be deleted, along with the robot. We verify a few of the
# critical ones below.
# Check the team.
team = model.team.get_organization_team(ORGANIZATION, 'readers')
members = [member.username
for member in model.organization.get_organization_team_members(team.id)]
self.assertFalse(membername in members)
# Check the robot itself.
self.assertIsNone(model.user.get_user(membername))
def test_robots(self):
self.login(ADMIN_ACCESS_USER)
# Create a robot.
json = self.putJsonResponse(OrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='bender'),
expected_code=201)
self.assertEquals(ORGANIZATION + '+bender', json['name'])
# Verify.
robots = self.getRobotNames()
assert ORGANIZATION + '+bender' in robots
# Delete the robot.
self.deleteEmptyResponse(OrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='bender'))
# Verify.
robots = self.getRobotNames()
assert not ORGANIZATION + '+bender' in robots
def test_regenerate(self):
self.login(ADMIN_ACCESS_USER)
# Create a robot.
json = self.putJsonResponse(OrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='bender'),
expected_code=201)
token = json['token']
# Regenerate the robot.
json = self.postJsonResponse(RegenerateOrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='bender'),
expected_code=200)
# Verify the token changed.
self.assertNotEquals(token, json['token'])
json2 = self.getJsonResponse(OrgRobot,
params=dict(orgname=ORGANIZATION, robot_shortname='bender'),
expected_code=200)
self.assertEquals(json['token'], json2['token'])
class TestLogs(ApiTestCase):
def test_repo_logs(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(RepositoryLogs, params=dict(repository='devtable/simple'))
assert 'logs' in json
assert 'start_time' in json
assert 'end_time' in json
def test_repo_logs_crossyear(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(RepositoryLogs, params=dict(repository='devtable/simple',
starttime='12/01/2016',
endtime='1/09/2017'))
self.assertEquals('Thu, 01 Dec 2016 00:00:00 -0000', json['start_time'])
self.assertEquals('Tue, 10 Jan 2017 00:00:00 -0000', json['end_time'])
def test_repo_aggregate_logs(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(RepositoryAggregateLogs, params=dict(repository='devtable/simple'))
assert 'aggregated' in json
assert len(json['aggregated']) > 0
def test_user_logs(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(UserLogs)
assert 'logs' in json
assert 'start_time' in json
assert 'end_time' in json
def test_org_logs(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrgLogs, params=dict(orgname=ORGANIZATION))
assert 'logs' in json
assert 'start_time' in json
assert 'end_time' in json
def test_user_aggregate_logs(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(UserAggregateLogs)
assert 'aggregated' in json
def test_org_logs(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrgAggregateLogs, params=dict(orgname=ORGANIZATION))
assert 'aggregated' in json
def test_performer(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrgLogs, params=dict(orgname=ORGANIZATION))
all_logs = json['logs']
json = self.getJsonResponse(OrgLogs,
params=dict(performer=READ_ACCESS_USER, orgname=ORGANIZATION))
assert len(json['logs']) < len(all_logs)
for log in json['logs']:
self.assertEquals(READ_ACCESS_USER, log['performer']['name'])
class TestApplicationInformation(ApiTestCase):
def test_get_info(self):
json = self.getJsonResponse(ApplicationInformation,
params=dict(client_id=FAKE_APPLICATION_CLIENT_ID))
assert 'name' in json
assert 'uri' in json
assert 'organization' in json
def test_get_invalid_info(self):
self.getJsonResponse(ApplicationInformation, params=dict(client_id='invalid-code'),
expected_code=404)
class TestOrganizationApplications(ApiTestCase):
def test_list_create_applications(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION))
self.assertEquals(2, len(json['applications']))
found = False
for application in json['applications']:
if application['client_id'] == FAKE_APPLICATION_CLIENT_ID:
found = True
break
self.assertTrue(found)
# Add a new application.
json = self.postJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION),
data=dict(name="Some cool app", description="foo"))
self.assertEquals("Some cool app", json['name'])
self.assertEquals("foo", json['description'])
# Retrieve the apps list again
list_json = self.getJsonResponse(OrganizationApplications, params=dict(orgname=ORGANIZATION))
self.assertEquals(3, len(list_json['applications']))
class TestOrganizationApplicationResource(ApiTestCase):
def test_get_edit_delete_application(self):
self.login(ADMIN_ACCESS_USER)
# Retrieve the application.
json = self.getJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION,
client_id=FAKE_APPLICATION_CLIENT_ID))
self.assertEquals(FAKE_APPLICATION_CLIENT_ID, json['client_id'])
# Edit the application.
edit_json = self.putJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION,
client_id=FAKE_APPLICATION_CLIENT_ID),
data=dict(name="Some App", description="foo",
application_uri="bar", redirect_uri="baz",
avatar_email="meh"))
self.assertEquals(FAKE_APPLICATION_CLIENT_ID, edit_json['client_id'])
self.assertEquals("Some App", edit_json['name'])
self.assertEquals("foo", edit_json['description'])
self.assertEquals("bar", edit_json['application_uri'])
self.assertEquals("baz", edit_json['redirect_uri'])
self.assertEquals("meh", edit_json['avatar_email'])
# Retrieve the application again.
json = self.getJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION,
client_id=FAKE_APPLICATION_CLIENT_ID))
self.assertEquals(json, edit_json)
# Delete the application.
self.deleteEmptyResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID))
# Make sure the application is gone.
self.getJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION, client_id=FAKE_APPLICATION_CLIENT_ID),
expected_code=404)
class TestOrganization(ApiTestCase):
def test_change_send_billing_invoice(self):
self.login(ADMIN_ACCESS_USER)
self.putJsonResponse(Organization, params=dict(orgname=ORGANIZATION),
data=dict(invoice_email=False, invoice_email_address=None))
class TestOrganizationApplicationResetClientSecret(ApiTestCase):
def test_reset_client_secret(self):
self.login(ADMIN_ACCESS_USER)
# Retrieve the application.
json = self.getJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION,
client_id=FAKE_APPLICATION_CLIENT_ID))
self.assertEquals(FAKE_APPLICATION_CLIENT_ID, json['client_id'])
# Reset the client secret.
reset_json = self.postJsonResponse(OrganizationApplicationResetClientSecret,
params=dict(orgname=ORGANIZATION,
client_id=FAKE_APPLICATION_CLIENT_ID))
self.assertEquals(FAKE_APPLICATION_CLIENT_ID, reset_json['client_id'])
self.assertNotEquals(reset_json['client_secret'], json['client_secret'])
# Verify it was changed in the DB.
json = self.getJsonResponse(OrganizationApplicationResource,
params=dict(orgname=ORGANIZATION,
client_id=FAKE_APPLICATION_CLIENT_ID))
self.assertEquals(reset_json['client_secret'], json['client_secret'])
class FakeBuildTrigger(BuildTriggerHandler):
@classmethod
def service_name(cls):
return 'fakeservice'
def list_build_source_namespaces(self):
return [
{'name': 'first', 'id': 'first'},
{'name': 'second', 'id': 'second'},
]
def list_build_sources_for_namespace(self, namespace):
if namespace == "first":
return [{
'name': 'source',
}]
elif namespace == "second":
return [{
'name': self.auth_token,
}]
else:
return []
def list_build_subdirs(self):
return [self.auth_token, 'foo', 'bar', self.config['somevalue']]
def handle_trigger_request(self, request):
prepared = PreparedBuild(self.trigger)
prepared.build_name = 'build-name'
prepared.tags = ['bar']
prepared.dockerfile_id = 'foo'
prepared.subdirectory = 'subdir'
prepared.metadata = {'foo': 'bar'}
prepared.is_manual = False
return prepared
def is_active(self):
return 'active' in self.config and self.config['active']
def activate(self, standard_webhook_url):
self.config['active'] = True
return self.config, {}
def deactivate(self):
self.config['active'] = False
return self.config
def manual_start(self, run_parameters=None):
prepared = PreparedBuild(self.trigger)
prepared.build_name = 'build-name'
prepared.tags = ['bar']
prepared.dockerfile_id = 'foo'
prepared.subdirectory = 'subdir'
prepared.metadata = {'foo': 'bar'}
prepared.is_manual = True
return prepared
def get_repository_url(self):
return 'http://foo/' + self.config['build_source']
def load_dockerfile_contents(self):
if not 'dockerfile' in self.config:
return None
return self.config['dockerfile']
def list_field_values(self, field_name, limit=None):
if field_name == 'test_field':
return [1, 2, 3]
return None
class TestBuildTriggers(ApiTestCase):
def test_list_build_triggers(self):
self.login(ADMIN_ACCESS_USER)
# Check a repo with no known triggers.
json = self.getJsonResponse(BuildTriggerList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
self.assertEquals(0, len(json['triggers']))
# Check a repo with one known trigger.
json = self.getJsonResponse(BuildTriggerList,
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
self.assertEquals(1, len(json['triggers']))
trigger = json['triggers'][0]
assert 'id' in trigger
assert 'is_active' in trigger
assert 'config' in trigger
assert 'service' in trigger
# Verify the get trigger method.
trigger_json = self.getJsonResponse(BuildTrigger,
params=dict(repository=ADMIN_ACCESS_USER + '/building',
trigger_uuid=trigger['id']))
self.assertEquals(trigger, trigger_json)
# Check the recent builds for the trigger.
builds_json = self.getJsonResponse(TriggerBuildList,
params=dict(repository=ADMIN_ACCESS_USER + '/building',
trigger_uuid=trigger['id']))
assert 'builds' in builds_json
def test_delete_build_trigger(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(BuildTriggerList,
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
self.assertEquals(1, len(json['triggers']))
trigger = json['triggers'][0]
# Delete the trigger.
self.deleteEmptyResponse(BuildTrigger,
params=dict(repository=ADMIN_ACCESS_USER + '/building',
trigger_uuid=trigger['id']))
# Verify it was deleted.
json = self.getJsonResponse(BuildTriggerList,
params=dict(repository=ADMIN_ACCESS_USER + '/building'))
self.assertEquals(0, len(json['triggers']))
def test_analyze_fake_trigger(self):
self.login(ADMIN_ACCESS_USER)
database.BuildTriggerService.create(name='fakeservice')
# Add a new fake trigger.
repo = model.repository.get_repository(ADMIN_ACCESS_USER, 'simple')
user = model.user.get_user(ADMIN_ACCESS_USER)
trigger = model.build.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
# Analyze the trigger's dockerfile: First, no dockerfile.
trigger_config = {}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('warning', analyze_json['status'])
self.assertEquals('Specified Dockerfile path for the trigger was not ' +
'found on the main branch. This trigger may fail.', analyze_json['message'])
# Analyze the trigger's dockerfile: Second, missing FROM in dockerfile.
trigger_config = {'dockerfile': 'MAINTAINER me'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('warning', analyze_json['status'])
self.assertEquals('No FROM line found in the Dockerfile', analyze_json['message'])
# Analyze the trigger's dockerfile: Third, dockerfile with public repo.
trigger_config = {'dockerfile': 'FROM somerepo'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('publicbase', analyze_json['status'])
# Analyze the trigger's dockerfile: Fourth, dockerfile with private repo with an invalid path.
trigger_config = {'dockerfile': 'FROM localhost:5000/somepath'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('warning', analyze_json['status'])
self.assertEquals('"localhost:5000/somepath" is not a valid Quay repository path',
analyze_json['message'])
# Analyze the trigger's dockerfile: Fifth, dockerfile with private repo that does not exist.
trigger_config = {'dockerfile': 'FROM localhost:5000/nothere/randomrepo'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('error', analyze_json['status'])
nofound = 'Repository "localhost:5000/%s/randomrepo" referenced by the Dockerfile was not found'
self.assertEquals(nofound % 'nothere', analyze_json['message'])
# Analyze the trigger's dockerfile: Sixth, dockerfile with private repo that the user cannot see
trigger_config = {'dockerfile': 'FROM localhost:5000/randomuser/randomrepo'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('error', analyze_json['status'])
self.assertEquals(nofound % 'randomuser', analyze_json['message'])
# Analyze the trigger's dockerfile: Seventh, dockerfile with private repo that the user see.
trigger_config = {'dockerfile': 'FROM localhost:5000/devtable/complex'}
analyze_json = self.postJsonResponse(BuildTriggerAnalyze,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals('requiresrobot', analyze_json['status'])
self.assertEquals('devtable', analyze_json['namespace'])
self.assertEquals('complex', analyze_json['name'])
self.assertEquals(ADMIN_ACCESS_USER + '+dtrobot', analyze_json['robots'][0]['name'])
def test_fake_trigger(self):
self.login(ADMIN_ACCESS_USER)
database.BuildTriggerService.create(name='fakeservice')
# Add a new fake trigger.
repo = model.repository.get_repository(ADMIN_ACCESS_USER, 'simple')
user = model.user.get_user(ADMIN_ACCESS_USER)
trigger = model.build.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
# Verify the trigger.
json = self.getJsonResponse(BuildTriggerList,
params=dict(repository=ADMIN_ACCESS_USER + '/simple'))
self.assertEquals(1, len(json['triggers']))
self.assertEquals(trigger.uuid, json['triggers'][0]['id'])
self.assertEquals(trigger.service.name, json['triggers'][0]['service'])
self.assertEquals(False, json['triggers'][0]['is_active'])
# List the trigger's source namespaces.
namespace_json = self.getJsonResponse(BuildTriggerSourceNamespaces,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid))
self.assertEquals([{'id': 'first', 'name': 'first'}, {'id': 'second', 'name': 'second'}], namespace_json['namespaces'])
source_json = self.postJsonResponse(BuildTriggerSources,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data=dict(namespace='first'))
self.assertEquals([{'name': 'source'}], source_json['sources'])
# List the trigger's subdirs.
subdir_json = self.postJsonResponse(BuildTriggerSubdirs,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data={'somevalue': 'meh'})
self.assertEquals({'status': 'success', 'subdir': ['/sometoken', '/foo', '/bar', '/meh']},
subdir_json)
# Activate the trigger.
trigger_config = {
'build_source': 'somesource'
}
activate_json = self.postJsonResponse(BuildTriggerActivate,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data={'config': trigger_config})
self.assertEquals(True, activate_json['is_active'])
# Make sure the trigger has a write token.
trigger = model.build.get_build_trigger(trigger.uuid)
self.assertNotEquals(None, trigger.write_token)
self.assertEquals(True, py_json.loads(trigger.config)['active'])
# Make sure we cannot activate again.
self.postResponse(BuildTriggerActivate,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data={'config': trigger_config},
expected_code=400)
# Retrieve values for a field.
result = self.postJsonResponse(BuildTriggerFieldValues,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid, field_name="test_field"))
self.assertEquals(result['values'], [1, 2, 3])
self.postResponse(BuildTriggerFieldValues,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid, field_name="another_field"),
expected_code=404)
# Start a manual build.
start_json = self.postJsonResponse(ActivateBuildTrigger,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data=dict(),
expected_code=201)
assert 'id' in start_json
self.assertEquals("build-name", start_json['display_name'])
self.assertEquals(['bar'], start_json['tags'])
self.assertEquals('subdir', start_json['subdirectory'])
self.assertEquals('somesource', start_json['trigger']['build_source'])
# Verify the metadata was added.
build_obj = database.RepositoryBuild.get(database.RepositoryBuild.uuid == start_json['id'])
self.assertEquals('bar', py_json.loads(build_obj.job_config)['trigger_metadata']['foo'])
# Start another manual build, with a ref.
self.postJsonResponse(ActivateBuildTrigger,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data=dict(refs={'kind': 'branch', 'name': 'foobar'}),
expected_code=201)
# Start another manual build with a null ref.
self.postJsonResponse(ActivateBuildTrigger,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data=dict(refs=None),
expected_code=201)
def test_invalid_robot_account(self):
self.login(ADMIN_ACCESS_USER)
database.BuildTriggerService.create(name='fakeservice')
# Add a new fake trigger.
repo = model.repository.get_repository(ADMIN_ACCESS_USER, 'simple')
user = model.user.get_user(ADMIN_ACCESS_USER)
trigger = model.build.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
# Try to activate it with an invalid robot account.
trigger_config = {}
self.postJsonResponse(BuildTriggerActivate,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data={'config': trigger_config, 'pull_robot': 'someinvalidrobot'},
expected_code=404)
def test_unauthorized_robot_account(self):
self.login(ADMIN_ACCESS_USER)
database.BuildTriggerService.create(name='fakeservice')
# Add a new fake trigger.
repo = model.repository.get_repository(ADMIN_ACCESS_USER, 'simple')
user = model.user.get_user(ADMIN_ACCESS_USER)
trigger = model.build.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
# Try to activate it with a robot account in the wrong namespace.
trigger_config = {}
self.postJsonResponse(BuildTriggerActivate,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data={'config':trigger_config, 'pull_robot':'freshuser+anotherrobot'},
expected_code=403)
def test_robot_account(self):
self.login(ADMIN_ACCESS_USER)
database.BuildTriggerService.create(name='fakeservice')
# Add a new fake trigger.
repo = model.repository.get_repository(ADMIN_ACCESS_USER, 'simple')
user = model.user.get_user(ADMIN_ACCESS_USER)
trigger = model.build.create_build_trigger(repo, 'fakeservice', 'sometoken', user)
# Try to activate it with a robot account.
trigger_config = {}
activate_json = self.postJsonResponse(BuildTriggerActivate,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data={'config':trigger_config,
'pull_robot':ADMIN_ACCESS_USER + '+dtrobot'})
# Verify that the robot was saved.
self.assertEquals(True, activate_json['is_active'])
self.assertEquals(ADMIN_ACCESS_USER + '+dtrobot', activate_json['pull_robot']['name'])
# Start a manual build.
start_json = self.postJsonResponse(ActivateBuildTrigger,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
trigger_uuid=trigger.uuid),
data=dict(refs=dict(kind='branch', name='foobar')),
expected_code=201)
assert 'id' in start_json
self.assertEquals("build-name", start_json['display_name'])
self.assertEquals(['bar'], start_json['tags'])
class TestUserAuthorizations(ApiTestCase):
def test_list_get_delete_user_authorizations(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(UserAuthorizationList)
self.assertEquals(1, len(json['authorizations']))
authorization = json['authorizations'][0]
assert 'uuid' in authorization
assert 'scopes' in authorization
assert 'application' in authorization
# Retrieve the authorization.
get_json = self.getJsonResponse(UserAuthorization,
params=dict(access_token_uuid=authorization['uuid']))
self.assertEquals(authorization, get_json)
# Delete the authorization.
self.deleteEmptyResponse(UserAuthorization, params=dict(access_token_uuid=authorization['uuid']))
# Verify it has been deleted.
self.getJsonResponse(UserAuthorization, params=dict(access_token_uuid=authorization['uuid']),
expected_code=404)
class TestSuperUserLogs(ApiTestCase):
def test_get_logs(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(SuperUserLogs)
assert 'logs' in json
assert len(json['logs']) > 0
class TestSuperUserList(ApiTestCase):
def test_get_users(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(SuperUserList)
assert 'users' in json
assert len(json['users']) > 0
class TestSuperUserCreateInitialSuperUser(ApiTestCase):
def test_create_superuser(self):
data = {
'username': 'newsuper',
'password': 'password',
'email': 'jschorr+fake@devtable.com',
}
# Try to write before some config. Should 403.
self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403)
# Add some fake config.
fake_config = {
'AUTHENTICATION_TYPE': 'Database',
'SECRET_KEY': 'fakekey',
}
self.putJsonResponse(SuperUserConfig, data=dict(config=fake_config, hostname='fakehost'))
# Try to write with config. Should 403 since there are users in the DB.
self.postResponse(SuperUserCreateInitialSuperUser, data=data, expected_code=403)
# Delete all users in the DB.
for user in list(database.User.select()):
model.user.delete_user(user, all_queues, force=True)
# Create the superuser.
self.postJsonResponse(SuperUserCreateInitialSuperUser, data=data)
# Ensure the user exists in the DB.
self.assertIsNotNone(model.user.get_user('newsuper'))
# Ensure that the current user is newsuper.
json = self.getJsonResponse(User)
self.assertEquals('newsuper', json['username'])
# Ensure that the current user is a superuser in the config.
json = self.getJsonResponse(SuperUserConfig)
self.assertEquals(['newsuper'], json['config']['SUPER_USERS'])
# Ensure that the current user is a superuser in memory by trying to call an API
# that will fail otherwise.
self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'))
class TestSuperUserConfig(ApiTestCase):
def test_get_status_update_config(self):
# With no config the status should be 'upload-license'.
json = self.getJsonResponse(SuperUserRegistryStatus)
self.assertEquals('upload-license', json['status'])
# And the config should 401.
self.getResponse(SuperUserConfig, expected_code=401)
# Add a fake license file.
config_provider.save_license('something')
# With no config but a license the status should be 'config-db'.
json = self.getJsonResponse(SuperUserRegistryStatus)
self.assertEquals('config-db', json['status'])
# Add some fake config.
fake_config = {
'AUTHENTICATION_TYPE': 'Database',
'SECRET_KEY': 'fakekey',
}
json = self.putJsonResponse(SuperUserConfig, data=dict(config=fake_config, hostname='fakehost'))
self.assertEquals('fakekey', json['config']['SECRET_KEY'])
self.assertEquals('fakehost', json['config']['SERVER_HOSTNAME'])
self.assertEquals('Database', json['config']['AUTHENTICATION_TYPE'])
# With config the status should be 'setup-db'.
json = self.getJsonResponse(SuperUserRegistryStatus)
self.assertEquals('setup-db', json['status'])
def test_config_file(self):
# Try without an account. Should 403.
self.getResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'), expected_code=403)
# Login to a superuser.
self.login(ADMIN_ACCESS_USER)
# Try for an invalid file. Should 404.
self.getResponse(SuperUserConfigFile, params=dict(filename='foobar'), expected_code=404)
# Try for a valid filename. Should not exist.
json = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'))
self.assertFalse(json['exists'])
# Add the file.
self.postResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'),
file=(StringIO('my file contents'), 'ssl.cert'))
# Should now exist.
json = self.getJsonResponse(SuperUserConfigFile, params=dict(filename='ssl.cert'))
self.assertTrue(json['exists'])
def test_update_with_external_auth(self):
self.login(ADMIN_ACCESS_USER)
# Run a mock LDAP.
mockldap = MockLdap({
'dc=quay,dc=io': {'dc': ['quay', 'io']},
'ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees'
},
'uid=' + ADMIN_ACCESS_USER + ',ou=employees,dc=quay,dc=io': {
'dc': ['quay', 'io'],
'ou': 'employees',
'uid': [ADMIN_ACCESS_USER],
'userPassword': ['password'],
'mail': [ADMIN_ACCESS_EMAIL],
},
})
config = {
'AUTHENTICATION_TYPE': 'LDAP',
'LDAP_BASE_DN': ['dc=quay', 'dc=io'],
'LDAP_ADMIN_DN': 'uid=devtable,ou=employees,dc=quay,dc=io',
'LDAP_ADMIN_PASSWD': 'password',
'LDAP_USER_RDN': ['ou=employees'],
'LDAP_UID_ATTR': 'uid',
'LDAP_EMAIL_ATTR': 'mail',
}
mockldap.start()
try:
# Try writing some config with an invalid password.
self.putResponse(SuperUserConfig, data={'config': config, 'hostname': 'foo'}, expected_code=400)
self.putResponse(SuperUserConfig,
data={'config': config, 'password': 'invalid', 'hostname': 'foo'}, expected_code=400)
# Write the config with the valid password.
self.putResponse(SuperUserConfig,
data={'config': config, 'password': 'password', 'hostname': 'foo'}, expected_code=200)
# Ensure that the user row has been linked.
self.assertEquals(ADMIN_ACCESS_USER, model.user.verify_federated_login('ldap', ADMIN_ACCESS_USER).username)
finally:
mockldap.stop()
class TestRepositoryImageSecurity(ApiTestCase):
def test_get_vulnerabilities(self):
self.login(ADMIN_ACCESS_USER)
tag = model.tag.get_active_tag(ADMIN_ACCESS_USER, 'simple', 'latest')
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, 'simple', 'latest')
tag_manifest = database.TagManifest.get(tag=tag)
# Grab the security info for the tag. It should be queued.
manifest_response = self.getJsonResponse(RepositoryManifestSecurity,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
manifestref=tag_manifest.digest,
vulnerabilities='true'))
image_response = self.getJsonResponse(RepositoryImageSecurity,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
imageid=layer.docker_image_id,
vulnerabilities='true'))
self.assertEquals(manifest_response, image_response)
self.assertEquals('queued', image_response['status'])
# Mark the layer as indexed.
layer.security_indexed = True
layer.security_indexed_engine = app.config['SECURITY_SCANNER_ENGINE_VERSION_TARGET']
layer.save()
# Grab the security info again.
with fake_security_scanner() as security_scanner:
security_scanner.add_layer(security_scanner.layer_id(layer))
manifest_response = self.getJsonResponse(RepositoryManifestSecurity,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
manifestref=tag_manifest.digest,
vulnerabilities='true'))
image_response = self.getJsonResponse(RepositoryImageSecurity,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
imageid=layer.docker_image_id,
vulnerabilities='true'))
self.assertEquals(manifest_response, image_response)
self.assertEquals('scanned', image_response['status'])
self.assertEquals(1, image_response['data']['Layer']['IndexedByVersion'])
def test_get_vulnerabilities_read_failover(self):
self.login(ADMIN_ACCESS_USER)
# Get a layer and mark it as indexed.
layer = model.tag.get_tag_image(ADMIN_ACCESS_USER, 'simple', 'latest')
layer.security_indexed = True
layer.security_indexed_engine = app.config['SECURITY_SCANNER_ENGINE_VERSION_TARGET']
layer.save()
with fake_security_scanner(hostname='failoverscanner') as security_scanner:
# Query the wrong security scanner URL without failover.
self.getResponse(RepositoryImageSecurity,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
imageid=layer.docker_image_id, vulnerabilities='true'),
expected_code=520)
# Set the failover URL in the global config.
with AppConfigChange({'SECURITY_SCANNER_READONLY_FAILOVER_ENDPOINTS': ['https://failoverscanner']}):
# Configure the API to return 200 for this layer.
layer_id = security_scanner.layer_id(layer)
security_scanner.set_ok_layer_id(layer_id)
# Call the API and succeed on failover.
self.getResponse(RepositoryImageSecurity,
params=dict(repository=ADMIN_ACCESS_USER + '/simple',
imageid=layer.docker_image_id, vulnerabilities='true'),
expected_code=200)
class TestSuperUserCustomCertificates(ApiTestCase):
def test_custom_certificates(self):
self.login(ADMIN_ACCESS_USER)
# Upload a certificate.
cert_contents, _ = generate_test_cert(hostname='somecoolhost', san_list=['DNS:bar', 'DNS:baz'])
self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert'),
file=(StringIO(cert_contents), 'testcert'), expected_code=204)
# Make sure it is present.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(1, len(json['certs']))
cert_info = json['certs'][0]
self.assertEquals('testcert', cert_info['path'])
self.assertEquals(set(['somecoolhost', 'bar', 'baz']), set(cert_info['names']))
self.assertFalse(cert_info['expired'])
# Remove the certificate.
self.deleteResponse(SuperUserCustomCertificate, params=dict(certpath='testcert'))
# Make sure it is gone.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(0, len(json['certs']))
def test_expired_custom_certificate(self):
self.login(ADMIN_ACCESS_USER)
# Upload a certificate.
cert_contents, _ = generate_test_cert(hostname='somecoolhost', expires=-10)
self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert'),
file=(StringIO(cert_contents), 'testcert'), expected_code=204)
# Make sure it is present.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(1, len(json['certs']))
cert_info = json['certs'][0]
self.assertEquals('testcert', cert_info['path'])
self.assertEquals(set(['somecoolhost']), set(cert_info['names']))
self.assertTrue(cert_info['expired'])
def test_invalid_custom_certificate(self):
self.login(ADMIN_ACCESS_USER)
# Upload an invalid certificate.
self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert'),
file=(StringIO('some contents'), 'testcert'), expected_code=204)
# Make sure it is present but invalid.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(1, len(json['certs']))
cert_info = json['certs'][0]
self.assertEquals('testcert', cert_info['path'])
self.assertEquals('no start line', cert_info['error'])
def test_path_sanitization(self):
self.login(ADMIN_ACCESS_USER)
# Upload a certificate.
cert_contents, _ = generate_test_cert(hostname='somecoolhost', expires=-10)
self.postResponse(SuperUserCustomCertificate, params=dict(certpath='testcert/../foobar'),
file=(StringIO(cert_contents), 'testcert/../foobar'), expected_code=204)
# Make sure it is present.
json = self.getJsonResponse(SuperUserCustomCertificates)
self.assertEquals(1, len(json['certs']))
cert_info = json['certs'][0]
self.assertEquals('foobar', cert_info['path'])
class TestSuperUserTakeOwnership(ApiTestCase):
def test_take_ownership_superuser(self):
self.login(ADMIN_ACCESS_USER)
# Should fail to take ownership of a superuser.
self.postResponse(SuperUserTakeOwnership, params=dict(namespace=ADMIN_ACCESS_USER),
expected_code=400)
def test_take_ownership_invalid_namespace(self):
self.login(ADMIN_ACCESS_USER)
self.postResponse(SuperUserTakeOwnership, params=dict(namespace='invalid'),
expected_code=404)
def test_take_ownership_non_superuser(self):
self.login(READ_ACCESS_USER)
self.postResponse(SuperUserTakeOwnership, params=dict(namespace='freshuser'),
expected_code=403)
def test_take_ownership_user(self):
self.login(ADMIN_ACCESS_USER)
with assert_action_logged('take_ownership'):
# Take ownership of the read user.
self.postResponse(SuperUserTakeOwnership, params=dict(namespace=READ_ACCESS_USER))
# Ensure that the read access user is now an org, with the superuser as the owner.
reader = model.user.get_user_or_org(READ_ACCESS_USER)
self.assertTrue(reader.organization)
usernames = [admin.username for admin in model.organization.get_admin_users(reader)]
self.assertIn(ADMIN_ACCESS_USER, usernames)
def test_take_ownership_org(self):
# Create a new org with another user as owner.
public_user = model.user.get_user(PUBLIC_USER)
org = model.organization.create_organization('someorg', 'some@example.com', public_user)
# Ensure that the admin is not yet owner of the org.
usernames = [admin.username for admin in model.organization.get_admin_users(org)]
self.assertNotIn(ADMIN_ACCESS_USER, usernames)
with assert_action_logged('take_ownership'):
# Take ownership.
self.login(ADMIN_ACCESS_USER)
self.postResponse(SuperUserTakeOwnership, params=dict(namespace='someorg'))
# Ensure now in the admin users.
usernames = [admin.username for admin in model.organization.get_admin_users(org)]
self.assertIn(ADMIN_ACCESS_USER, usernames)
class TestSuperUserKeyManagement(ApiTestCase):
def test_get_update_keys(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(SuperUserServiceKeyManagement)
key_count = len(json['keys'])
key = json['keys'][0]
self.assertTrue('name' in key)
self.assertTrue('service' in key)
self.assertTrue('kid' in key)
self.assertTrue('created_date' in key)
self.assertTrue('expiration_date' in key)
self.assertTrue('jwk' in key)
self.assertTrue('approval' in key)
self.assertTrue('metadata' in key)
with assert_action_logged('service_key_modify'):
# Update the key's name.
self.putJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']),
data=dict(name='somenewname'))
# Ensure the key's name has been changed.
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']))
self.assertEquals('somenewname', json['name'])
with assert_action_logged('service_key_modify'):
# Update the key's metadata.
self.putJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']),
data=dict(metadata=dict(foo='bar')))
# Ensure the key's metadata has been changed.
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']))
self.assertEquals('bar', json['metadata']['foo'])
with assert_action_logged('service_key_extend'):
# Change the key's expiration.
self.putJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']),
data=dict(expiration=None))
# Ensure the key's expiration has been changed.
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=key['kid']))
self.assertIsNone(json['expiration_date'])
with assert_action_logged('service_key_delete'):
# Delete the key.
self.deleteEmptyResponse(SuperUserServiceKey, params=dict(kid=key['kid']))
# Ensure the key no longer exists.
self.getResponse(SuperUserServiceKey, params=dict(kid=key['kid']), expected_code=404)
json = self.getJsonResponse(SuperUserServiceKeyManagement)
self.assertEquals(key_count - 1, len(json['keys']))
def test_approve_key(self):
self.login(ADMIN_ACCESS_USER)
# Ensure the key is not yet approved.
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid='kid3'))
self.assertEquals('unapprovedkey', json['name'])
self.assertIsNone(json['approval'])
# Approve the key.
with assert_action_logged('service_key_approve'):
self.postResponse(SuperUserServiceKeyApproval, params=dict(kid='kid3'),
data=dict(notes='testapprove'), expected_code=201)
# Ensure the key is approved.
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid='kid3'))
self.assertEquals('unapprovedkey', json['name'])
self.assertIsNotNone(json['approval'])
self.assertEquals('ServiceKeyApprovalType.SUPERUSER', json['approval']['approval_type'])
self.assertEquals(ADMIN_ACCESS_USER, json['approval']['approver']['username'])
self.assertEquals('testapprove', json['approval']['notes'])
def test_approve_preapproved(self):
self.login(ADMIN_ACCESS_USER)
new_key = {
'service': 'coolservice',
'name': 'mynewkey',
'metadata': dict(foo='baz'),
'notes': 'whazzup!?',
'expiration': timegm((datetime.datetime.now() + datetime.timedelta(days=1)).utctimetuple()),
}
# Create the key (preapproved automatically)
json = self.postJsonResponse(SuperUserServiceKeyManagement, data=new_key)
# Try to approve again.
self.postResponse(SuperUserServiceKeyApproval, params=dict(kid=json['kid']), expected_code=201)
def test_create_key(self):
self.login(ADMIN_ACCESS_USER)
new_key = {
'service': 'coolservice',
'name': 'mynewkey',
'metadata': dict(foo='baz'),
'notes': 'whazzup!?',
'expiration': timegm((datetime.datetime.now() + datetime.timedelta(days=1)).utctimetuple()),
}
with assert_action_logged('service_key_create'):
# Create the key.
json = self.postJsonResponse(SuperUserServiceKeyManagement, data=new_key)
self.assertEquals('mynewkey', json['name'])
self.assertTrue('kid' in json)
self.assertTrue('public_key' in json)
self.assertTrue('private_key' in json)
# Verify the private key is a valid PEM.
serialization.load_pem_private_key(json['private_key'].encode('utf-8'), None, default_backend())
# Verify the key.
kid = json['kid']
json = self.getJsonResponse(SuperUserServiceKey, params=dict(kid=kid))
self.assertEquals('mynewkey', json['name'])
self.assertEquals('coolservice', json['service'])
self.assertEquals('baz', json['metadata']['foo'])
self.assertEquals(kid, json['kid'])
self.assertIsNotNone(json['approval'])
self.assertEquals('ServiceKeyApprovalType.SUPERUSER', json['approval']['approval_type'])
self.assertEquals(ADMIN_ACCESS_USER, json['approval']['approver']['username'])
self.assertEquals('whazzup!?', json['approval']['notes'])
class TestRepositoryManifestLabels(ApiTestCase):
def test_basic_labels(self):
self.login(ADMIN_ACCESS_USER)
# Find the manifest digest for the prod tag in the complex repo.
tag_manifest = model.tag.load_tag_manifest(ADMIN_ACCESS_USER, 'complex', 'prod')
repository = ADMIN_ACCESS_USER + '/complex'
# Check the existing labels on the complex repo, which should be empty
json = self.getJsonResponse(RepositoryManifestLabels,
params=dict(repository=repository, manifestref=tag_manifest.digest))
self.assertEquals(0, len(json['labels']))
# Add some labels to the manifest.
with assert_action_logged('manifest_label_add'):
label1 = self.postJsonResponse(RepositoryManifestLabels,
params=dict(repository=repository,
manifestref=tag_manifest.digest),
data=dict(key='hello', value='world',
media_type='text/plain'),
expected_code=201)
with assert_action_logged('manifest_label_add'):
label2 = self.postJsonResponse(RepositoryManifestLabels,
params=dict(repository=repository,
manifestref=tag_manifest.digest),
data=dict(key='hi', value='there',
media_type='text/plain'),
expected_code=201)
with assert_action_logged('manifest_label_add'):
label3 = self.postJsonResponse(RepositoryManifestLabels,
params=dict(repository=repository,
manifestref=tag_manifest.digest),
data=dict(key='hello', value='someone',
media_type='application/json'),
expected_code=201)
# Ensure we have *3* labels
json = self.getJsonResponse(RepositoryManifestLabels,
params=dict(repository=repository,
manifestref=tag_manifest.digest))
self.assertEquals(3, len(json['labels']))
self.assertNotEquals(label2['label']['id'], label1['label']['id'])
self.assertNotEquals(label3['label']['id'], label1['label']['id'])
self.assertNotEquals(label2['label']['id'], label3['label']['id'])
self.assertEquals('text/plain', label1['label']['media_type'])
self.assertEquals('text/plain', label2['label']['media_type'])
self.assertEquals('application/json', label3['label']['media_type'])
# Ensure we can retrieve each of the labels.
for label in json['labels']:
label_json = self.getJsonResponse(ManageRepositoryManifestLabel,
params=dict(repository=repository,
manifestref=tag_manifest.digest,
labelid=label['id']))
self.assertEquals(label['id'], label_json['id'])
# Delete a label.
with assert_action_logged('manifest_label_delete'):
self.deleteEmptyResponse(ManageRepositoryManifestLabel,
params=dict(repository=repository,
manifestref=tag_manifest.digest,
labelid=label1['label']['id']))
# Ensure the label is gone.
json = self.getJsonResponse(RepositoryManifestLabels,
params=dict(repository=repository,
manifestref=tag_manifest.digest))
self.assertEquals(2, len(json['labels']))
# Check filtering.
json = self.getJsonResponse(RepositoryManifestLabels,
params=dict(repository=repository,
manifestref=tag_manifest.digest,
filter='hello'))
self.assertEquals(1, len(json['labels']))
def test_prefixed_labels(self):
self.login(ADMIN_ACCESS_USER)
# Find the manifest digest for the prod tag in the complex repo.
tag_manifest = model.tag.load_tag_manifest(ADMIN_ACCESS_USER, 'complex', 'prod')
repository = ADMIN_ACCESS_USER + '/complex'
self.postJsonResponse(RepositoryManifestLabels,
params=dict(repository=repository,
manifestref=tag_manifest.digest),
data=dict(key='com.dockers.whatever', value='pants',
media_type='text/plain'),
expected_code=201)
self.postJsonResponse(RepositoryManifestLabels,
params=dict(repository=repository,
manifestref=tag_manifest.digest),
data=dict(key='my.cool.prefix.for.my.label', value='value',
media_type='text/plain'),
expected_code=201)
def test_add_invalid_media_type(self):
self.login(ADMIN_ACCESS_USER)
tag_manifest = model.tag.load_tag_manifest(ADMIN_ACCESS_USER, 'complex', 'prod')
repository = ADMIN_ACCESS_USER + '/complex'
self.postResponse(RepositoryManifestLabels,
params=dict(repository=repository,
manifestref=tag_manifest.digest),
data=dict(key='hello', value='world', media_type='some/invalid'),
expected_code=400)
def test_add_invalid_key(self):
self.login(ADMIN_ACCESS_USER)
tag_manifest = model.tag.load_tag_manifest(ADMIN_ACCESS_USER, 'complex', 'prod')
repository = ADMIN_ACCESS_USER + '/complex'
# Try to add an empty label key.
self.postResponse(RepositoryManifestLabels,
params=dict(repository=repository,
manifestref=tag_manifest.digest),
data=dict(key='', value='world'),
expected_code=400)
# Try to add an invalid label key.
self.postResponse(RepositoryManifestLabels,
params=dict(repository=repository,
manifestref=tag_manifest.digest),
data=dict(key='invalid___key', value='world'),
expected_code=400)
# Try to add a label key in a reserved namespace.
self.postResponse(RepositoryManifestLabels,
params=dict(repository=repository,
manifestref=tag_manifest.digest),
data=dict(key='io.docker.whatever', value='world'),
expected_code=400)
class TestSuperUserManagement(ApiTestCase):
def test_get_user(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(SuperUserManagement, params=dict(username='freshuser'))
self.assertEquals('freshuser', json['username'])
self.assertEquals('jschorr+test@devtable.com', json['email'])
self.assertEquals(False, json['super_user'])
def test_delete_user(self):
self.login(ADMIN_ACCESS_USER)
# Verify the user exists.
json = self.getJsonResponse(SuperUserManagement, params=dict(username='freshuser'))
self.assertEquals('freshuser', json['username'])
# Delete the user.
self.deleteEmptyResponse(SuperUserManagement, params=dict(username='freshuser'), expected_code=204)
# Verify the user no longer exists.
self.getResponse(SuperUserManagement, params=dict(username='freshuser'), expected_code=404)
def test_change_user_password(self):
self.login(ADMIN_ACCESS_USER)
# Verify the user exists.
json = self.getJsonResponse(SuperUserManagement, params=dict(username='freshuser'))
self.assertEquals('freshuser', json['username'])
self.assertEquals('jschorr+test@devtable.com', json['email'])
# Update the user.
json = self.putJsonResponse(SuperUserManagement, params=dict(username='freshuser'),
data=dict(password='somepassword'))
self.assertTrue('encrypted_password' in json)
def test_update_user(self):
self.login(ADMIN_ACCESS_USER)
# Verify the user exists.
json = self.getJsonResponse(SuperUserManagement, params=dict(username='freshuser'))
self.assertEquals('freshuser', json['username'])
self.assertEquals('jschorr+test@devtable.com', json['email'])
# Update the user.
json = self.putJsonResponse(SuperUserManagement, params=dict(username='freshuser'),
data=dict(email='foo@bar.com'))
self.assertFalse('encrypted_password' in json)
# Verify the user was updated.
json = self.getJsonResponse(SuperUserManagement, params=dict(username='freshuser'))
self.assertEquals('freshuser', json['username'])
self.assertEquals('foo@bar.com', json['email'])
def test_set_message(self):
self.login(ADMIN_ACCESS_USER)
# Create a message
message = {"content": "new message", "severity": "info", "media_type": "text/plain"}
self.postResponse(GlobalUserMessages, data=dict(message=message), expected_code=201)
json = self.getJsonResponse(GlobalUserMessages)
self.assertEquals(len(json['messages']), 3)
has_matching_message = False
for message in json["messages"]:
new_message_match = message["content"] == "new message"
severity_match = message["severity"] == "info"
media_type_match = message["media_type"] == "text/plain"
if new_message_match and severity_match and media_type_match:
has_matching_message = True
break
self.assertTrue(has_matching_message, "Could not find matching message in: " + str(json["messages"]))
self.assertNotEqual(json['messages'][0]["content"], json['messages'][2]["content"])
self.assertTrue(json['messages'][2]["uuid"])
def test_delete_message(self):
self.login(ADMIN_ACCESS_USER)
json = self.getJsonResponse(GlobalUserMessages)
self.deleteEmptyResponse(GlobalUserMessage, {"uuid": json['messages'][0]['uuid']}, 204)
json = self.getJsonResponse(GlobalUserMessages)
self.assertEquals(len(json['messages']), 1)
if __name__ == '__main__':
unittest.main()