This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/test/test_auth.py

185 lines
6.6 KiB
Python
Raw Normal View History

import unittest
import base64
from datetime import datetime, timedelta
from app import app
from data import model
from data.database import OAuthApplication, OAuthAccessToken
from flask import g
from flask.ext.principal import identity_loaded
from auth.auth import _process_basic_auth
from auth.scopes import (scopes_from_scope_string, is_subset_string, DIRECT_LOGIN, ADMIN_REPO,
ALL_SCOPES)
from auth.permissions import QuayDeferredPermissionUser
from endpoints.api import api_bp, api
from endpoints.api.user import User, Signin
import json as py_json
from test.test_api_usage import ApiTestCase
ADMIN_ACCESS_USER = 'devtable'
DISABLED_USER = 'disabled'
@identity_loaded.connect_via(app)
def on_identity_loaded(sender, identity):
g.identity = identity
class TestAuth(ApiTestCase):
def verify_cookie_auth(self, username):
resp = self.getJsonResponse(User)
self.assertEquals(resp['username'], username)
def verify_identity(self, id):
try:
identity = g.identity
except:
identity = None
self.assertIsNotNone(identity)
self.assertEquals(identity.id, id)
def verify_no_identity(self):
try:
identity = g.identity
except:
identity = None
self.assertIsNone(identity)
def conduct_basic_auth(self, username, password):
encoded = base64.b64encode(username + ':' + password)
try:
_process_basic_auth('Basic ' + encoded)
except:
pass
def create_oauth(self, user):
oauth_app = OAuthApplication.create(client_id='onetwothree', redirect_uri='',
application_uri='', organization=user,
name='someapp')
expires_at = datetime.utcnow() + timedelta(seconds=50000)
OAuthAccessToken.create(application=oauth_app, authorized_user=user,
scope='repo:admin',
access_token='access1234', token_type='Bearer',
expires_at=expires_at, refresh_token=None, data={})
def test_login(self):
password = 'password'
resp = self.postJsonResponse(Signin, data=dict(username=ADMIN_ACCESS_USER, password=password))
self.assertTrue(resp.get('success'))
self.verify_cookie_auth(ADMIN_ACCESS_USER)
def test_login_disabled(self):
password = 'password'
self.postJsonResponse(Signin, data=dict(username=DISABLED_USER, password=password),
expected_code=403)
def test_basic_auth_user(self):
user = model.user.get_user(ADMIN_ACCESS_USER)
self.conduct_basic_auth(ADMIN_ACCESS_USER, 'password')
self.verify_identity(user.uuid)
def test_basic_auth_disabled_user(self):
user = model.user.get_user(DISABLED_USER)
self.conduct_basic_auth(DISABLED_USER, 'password')
self.verify_no_identity()
def test_basic_auth_token(self):
token = model.token.create_delegate_token(ADMIN_ACCESS_USER, 'simple', 'sometoken')
self.conduct_basic_auth('$token', token.code)
self.verify_identity(token.code)
def test_basic_auth_invalid_token(self):
self.conduct_basic_auth('$token', 'foobar')
self.verify_no_identity()
def test_basic_auth_invalid_user(self):
self.conduct_basic_auth('foobarinvalid', 'foobar')
self.verify_no_identity()
def test_oauth_invalid(self):
self.conduct_basic_auth('$oauthtoken', 'foobar')
self.verify_no_identity()
2016-04-13 13:01:42 +00:00
def test_oauth_invalid_http_response(self):
rv = self.app.get(api.url_for(User), headers={'Authorization': 'Bearer bad_token'})
assert 'WWW-Authenticate' in rv.headers
self.assertEquals(401, rv.status_code)
def test_oauth_valid_user(self):
user = model.user.get_user(ADMIN_ACCESS_USER)
self.create_oauth(user)
self.conduct_basic_auth('$oauthtoken', 'access1234')
self.verify_identity(user.uuid)
def test_oauth_disabled_user(self):
user = model.user.get_user(DISABLED_USER)
self.create_oauth(user)
self.conduct_basic_auth('$oauthtoken', 'access1234')
self.verify_no_identity()
def test_basic_auth_robot(self):
user = model.user.get_user(ADMIN_ACCESS_USER)
robot, passcode = model.user.get_robot('dtrobot', user)
self.conduct_basic_auth(robot.username, passcode)
self.verify_identity(robot.uuid)
def test_basic_auth_robot_invalidcode(self):
user = model.user.get_user(ADMIN_ACCESS_USER)
robot, _ = model.user.get_robot('dtrobot', user)
self.conduct_basic_auth(robot.username, 'someinvalidcode')
self.verify_no_identity()
def test_deferred_permissions_scopes(self):
self.assertEquals(QuayDeferredPermissionUser.for_id('123454')._scope_set, {DIRECT_LOGIN})
self.assertEquals(QuayDeferredPermissionUser.for_id('123454', {})._scope_set, {})
self.assertEquals(QuayDeferredPermissionUser.for_id('123454', {ADMIN_REPO})._scope_set, {ADMIN_REPO})
def assertParsedScopes(self, scopes_str, *args):
expected_scope_set = {ALL_SCOPES[scope_name] for scope_name in args}
parsed_scope_set = scopes_from_scope_string(scopes_str)
self.assertEquals(parsed_scope_set, expected_scope_set)
def test_scopes_parsing(self):
# Valid single scopes.
self.assertParsedScopes('repo:read', 'repo:read')
self.assertParsedScopes('repo:admin', 'repo:admin')
# Invalid scopes.
self.assertParsedScopes('not:valid')
self.assertParsedScopes('repo:admins')
# Valid scope strings.
self.assertParsedScopes('repo:read repo:admin', 'repo:read', 'repo:admin')
self.assertParsedScopes('repo:read,repo:admin', 'repo:read', 'repo:admin')
self.assertParsedScopes('repo:read,repo:admin repo:write', 'repo:read', 'repo:admin',
'repo:write')
# Partially invalid scopes.
self.assertParsedScopes('repo:read,not:valid')
self.assertParsedScopes('repo:read repo:admins')
# Invalid scope strings.
self.assertParsedScopes('repo:read|repo:admin')
2015-08-03 18:13:38 +00:00
# Mixture of delimiters.
self.assertParsedScopes('repo:read, repo:admin')
def test_subset_string(self):
self.assertTrue(is_subset_string('repo:read', 'repo:read'))
self.assertTrue(is_subset_string('repo:read repo:admin', 'repo:read'))
self.assertTrue(is_subset_string('repo:read,repo:admin', 'repo:read'))
self.assertTrue(is_subset_string('repo:read,repo:admin', 'repo:admin'))
self.assertTrue(is_subset_string('repo:read,repo:admin', 'repo:admin repo:read'))
self.assertFalse(is_subset_string('', 'repo:read'))
self.assertFalse(is_subset_string('unknown:tag', 'repo:read'))
self.assertFalse(is_subset_string('repo:read unknown:tag', 'repo:read'))
self.assertFalse(is_subset_string('repo:read,unknown:tag', 'repo:read'))
if __name__ == '__main__':
unittest.main()