import base64 import unittest from datetime import datetime, timedelta from flask import g from flask_principal import identity_loaded from app import app from auth.scopes import (scopes_from_scope_string, is_subset_string, DIRECT_LOGIN, ADMIN_REPO, ALL_SCOPES) from auth.permissions import QuayDeferredPermissionUser from auth.process import _process_basic_auth from data import model from data.database import OAuthApplication, OAuthAccessToken from endpoints.api import api from endpoints.api.user import User, Signin from test.test_api_usage import ApiTestCase ADMIN_ACCESS_USER = 'devtable' DISABLED_USER = 'disabled' @identity_loaded.connect_via(app) def on_identity_loaded(sender, identity): g.identity = identity class TestAuth(ApiTestCase): def verify_cookie_auth(self, username): resp = self.getJsonResponse(User) self.assertEquals(resp['username'], username) def verify_identity(self, id): try: identity = g.identity except: identity = None self.assertIsNotNone(identity) self.assertEquals(identity.id, id) def verify_no_identity(self): try: identity = g.identity except: identity = None self.assertIsNone(identity) def conduct_basic_auth(self, username, password): encoded = base64.b64encode(username + ':' + password) try: _process_basic_auth('Basic ' + encoded) except: pass def create_oauth(self, user): oauth_app = OAuthApplication.create(client_id='onetwothree', redirect_uri='', application_uri='', organization=user, name='someapp') expires_at = datetime.utcnow() + timedelta(seconds=50000) OAuthAccessToken.create(application=oauth_app, authorized_user=user, scope='repo:admin', access_token='access1234', token_type='Bearer', expires_at=expires_at, refresh_token=None, data={}) def test_login(self): password = 'password' resp = self.postJsonResponse(Signin, data=dict(username=ADMIN_ACCESS_USER, password=password)) self.assertTrue(resp.get('success')) self.verify_cookie_auth(ADMIN_ACCESS_USER) def test_login_disabled(self): password = 'password' self.postJsonResponse(Signin, data=dict(username=DISABLED_USER, password=password), expected_code=403) def test_basic_auth_user(self): user = model.user.get_user(ADMIN_ACCESS_USER) self.conduct_basic_auth(ADMIN_ACCESS_USER, 'password') self.verify_identity(user.uuid) def test_basic_auth_disabled_user(self): user = model.user.get_user(DISABLED_USER) self.conduct_basic_auth(DISABLED_USER, 'password') self.verify_no_identity() def test_basic_auth_token(self): token = model.token.create_delegate_token(ADMIN_ACCESS_USER, 'simple', 'sometoken') self.conduct_basic_auth('$token', token.code) self.verify_identity(token.code) def test_basic_auth_invalid_token(self): self.conduct_basic_auth('$token', 'foobar') self.verify_no_identity() def test_basic_auth_invalid_user(self): self.conduct_basic_auth('foobarinvalid', 'foobar') self.verify_no_identity() def test_oauth_invalid(self): self.conduct_basic_auth('$oauthtoken', 'foobar') self.verify_no_identity() def test_oauth_invalid_http_response(self): rv = self.app.get(api.url_for(User), headers={'Authorization': 'Bearer bad_token'}) assert 'WWW-Authenticate' in rv.headers self.assertEquals(401, rv.status_code) def test_oauth_valid_user(self): user = model.user.get_user(ADMIN_ACCESS_USER) self.create_oauth(user) self.conduct_basic_auth('$oauthtoken', 'access1234') self.verify_identity(user.uuid) def test_oauth_disabled_user(self): user = model.user.get_user(DISABLED_USER) self.create_oauth(user) self.conduct_basic_auth('$oauthtoken', 'access1234') self.verify_no_identity() def test_basic_auth_robot(self): user = model.user.get_user(ADMIN_ACCESS_USER) robot, passcode = model.user.get_robot('dtrobot', user) self.conduct_basic_auth(robot.username, passcode) self.verify_identity(robot.uuid) def test_basic_auth_robot_invalidcode(self): user = model.user.get_user(ADMIN_ACCESS_USER) robot, _ = model.user.get_robot('dtrobot', user) self.conduct_basic_auth(robot.username, 'someinvalidcode') self.verify_no_identity() def test_deferred_permissions_scopes(self): self.assertEquals(QuayDeferredPermissionUser.for_id('123454')._scope_set, {DIRECT_LOGIN}) self.assertEquals(QuayDeferredPermissionUser.for_id('123454', {})._scope_set, {}) self.assertEquals(QuayDeferredPermissionUser.for_id('123454', {ADMIN_REPO})._scope_set, {ADMIN_REPO}) def assertParsedScopes(self, scopes_str, *args): expected_scope_set = {ALL_SCOPES[scope_name] for scope_name in args} parsed_scope_set = scopes_from_scope_string(scopes_str) self.assertEquals(parsed_scope_set, expected_scope_set) def test_scopes_parsing(self): # Valid single scopes. self.assertParsedScopes('repo:read', 'repo:read') self.assertParsedScopes('repo:admin', 'repo:admin') # Invalid scopes. self.assertParsedScopes('not:valid') self.assertParsedScopes('repo:admins') # Valid scope strings. self.assertParsedScopes('repo:read repo:admin', 'repo:read', 'repo:admin') self.assertParsedScopes('repo:read,repo:admin', 'repo:read', 'repo:admin') self.assertParsedScopes('repo:read,repo:admin repo:write', 'repo:read', 'repo:admin', 'repo:write') # Partially invalid scopes. self.assertParsedScopes('repo:read,not:valid') self.assertParsedScopes('repo:read repo:admins') # Invalid scope strings. self.assertParsedScopes('repo:read|repo:admin') # Mixture of delimiters. self.assertParsedScopes('repo:read, repo:admin') def test_subset_string(self): self.assertTrue(is_subset_string('repo:read', 'repo:read')) self.assertTrue(is_subset_string('repo:read repo:admin', 'repo:read')) self.assertTrue(is_subset_string('repo:read,repo:admin', 'repo:read')) self.assertTrue(is_subset_string('repo:read,repo:admin', 'repo:admin')) self.assertTrue(is_subset_string('repo:read,repo:admin', 'repo:admin repo:read')) self.assertFalse(is_subset_string('', 'repo:read')) self.assertFalse(is_subset_string('unknown:tag', 'repo:read')) self.assertFalse(is_subset_string('repo:read unknown:tag', 'repo:read')) self.assertFalse(is_subset_string('repo:read,unknown:tag', 'repo:read')) if __name__ == '__main__': unittest.main()