diff --git a/auth/basic.py b/auth/basic.py new file mode 100644 index 000000000..a6c621da6 --- /dev/null +++ b/auth/basic.py @@ -0,0 +1,100 @@ +import logging + +from base64 import b64decode +from flask import request + +from app import authentication +from auth.oauth import validate_oauth_token +from auth.validateresult import ValidateResult, AuthKind +from data import model +from util.names import parse_robot_username + +logger = logging.getLogger(__name__) + +ACCESS_TOKEN_USERNAME = '$token' +OAUTH_TOKEN_USERNAME = '$oauthtoken' + + +def has_basic_auth(username): + """ Returns true if a basic auth header exists with a username and password pair that validates + against the internal authentication system. Returns True on full success and False on any + failure (missing header, invalid header, invalid credentials, etc). + """ + auth_header = request.headers.get('authorization', '') + result = validate_basic_auth(auth_header) + return result.has_user and result.user.username == username + + +def validate_basic_auth(auth_header): + """ Validates the specified basic auth header, returning whether its credentials point + to a valid user or token. + """ + if not auth_header: + return ValidateResult(AuthKind.basic, missing=True) + + logger.debug('Attempt to process basic auth header') + + # Parse the basic auth header. + credentials, err = _parse_basic_auth_header(auth_header) + if err is not None: + logger.debug('Got invalid basic auth header: %s', auth_header) + return ValidateResult(AuthKind.basic, missing=True) + + auth_username, auth_password_or_token = credentials + + # Check for access tokens. + if auth_username == ACCESS_TOKEN_USERNAME: + logger.debug('Found basic auth header for access token') + try: + token = model.token.load_token_data(auth_password_or_token) + logger.debug('Successfully validated basic auth for access token %s', token.id) + return ValidateResult(AuthKind.basic, token=token) + except model.DataModelException: + logger.warning('Failed to validate basic auth for access token %s', auth_password_or_token) + return ValidateResult(AuthKind.basic, error_message='Invalid access token') + + # Check for OAuth tokens. + if auth_username == OAUTH_TOKEN_USERNAME: + return validate_oauth_token(auth_password_or_token) + + # Check for robots and users. + is_robot = parse_robot_username(auth_username) + if is_robot: + logger.debug('Found basic auth header for robot %s', auth_username) + try: + robot = model.user.verify_robot(auth_username, auth_password_or_token) + + logger.debug('Successfully validated basic auth for robot %s', auth_username) + return ValidateResult(AuthKind.basic, robot=robot) + except model.InvalidRobotException as ire: + logger.warning('Failed to validate basic auth for robot %s: %s', auth_username, ire.message) + return ValidateResult(AuthKind.basic, error_message=ire.message) + + # Otherwise, treat as a standard user. + (authenticated, err) = authentication.verify_and_link_user(auth_username, auth_password_or_token, + basic_auth=True) + if authenticated: + logger.debug('Successfully validated basic auth for user %s', authenticated.username) + return ValidateResult(AuthKind.basic, user=authenticated) + else: + logger.warning('Failed to validate basic auth for user %s: %s', auth_username, err) + return ValidateResult(AuthKind.basic, error_message=err) + + +def _parse_basic_auth_header(auth): + """ Parses the given basic auth header, returning the credentials found inside. + """ + normalized = [part.strip() for part in auth.split(' ') if part] + if normalized[0].lower() != 'basic' or len(normalized) != 2: + return None, 'Invalid basic auth header' + + try: + credentials = [part.decode('utf-8') for part in b64decode(normalized[1]).split(':', 1)] + except TypeError: + logger.exception('Exception when parsing basic auth header: %s', auth) + return None, 'Could not parse basic auth header' + + if len(credentials) != 2: + return None, 'Unexpected number of credentials found in basic auth header' + + return credentials, None diff --git a/auth/cookie.py b/auth/cookie.py new file mode 100644 index 000000000..b60db51a6 --- /dev/null +++ b/auth/cookie.py @@ -0,0 +1,37 @@ +import logging + +from uuid import UUID +from flask_login import current_user + +from auth.validateresult import AuthKind, ValidateResult + +logger = logging.getLogger(__name__) + +def validate_session_cookie(auth_header=None): + """ Attempts to load a user from a session cookie. """ + if current_user.is_anonymous: + return ValidateResult(AuthKind.cookie, missing=True) + + try: + # Attempt to parse the user uuid to make sure the cookie has the right value type + UUID(current_user.get_id()) + except ValueError: + logger.debug('Got non-UUID for session cookie user: %s', current_user.get_id()) + return ValidateResult(AuthKind.cookie, error_message='Invalid session cookie format') + + logger.debug('Loading user from cookie: %s', current_user.get_id()) + db_user = current_user.db_user() + if db_user is None: + return ValidateResult(AuthKind.cookie, error_message='Could not find matching user') + + # Don't allow disabled users to login. + if not db_user.enabled: + logger.debug('User %s in session cookie is disabled', db_user.username) + return ValidateResult(AuthKind.cookie, error_message='User account is disabled') + + # Don't allow organizations to "login". + if db_user.organization: + logger.debug('User %s in session cookie is in-fact organization', db_user.username) + return ValidateResult(AuthKind.cookie, error_message='Cannot login to organization') + + return ValidateResult(AuthKind.cookie, user=db_user) diff --git a/auth/decorators.py b/auth/decorators.py index 6afe48252..a7cea12a2 100644 --- a/auth/decorators.py +++ b/auth/decorators.py @@ -1,287 +1,90 @@ import logging from functools import wraps -from uuid import UUID -from datetime import datetime -from base64 import b64decode - from flask import request, session -from flask.sessions import SecureCookieSessionInterface, BadSignature -from flask_login import current_user -from flask_principal import identity_changed, Identity -import scopes +from app import metric_queue +from auth.basic import validate_basic_auth +from auth.oauth import validate_bearer_auth +from auth.cookie import validate_session_cookie +from auth.signedgrant import validate_signed_grant -from app import app, authentication, metric_queue -from auth_context import (set_authenticated_user, set_validated_token, set_grant_context, - set_validated_oauth_token) -from data import model -from endpoints.exception import InvalidToken, ExpiredToken -from permissions import QuayDeferredPermissionUser from util.http import abort logger = logging.getLogger(__name__) -SIGNATURE_PREFIX = 'sigv2=' +def _auth_decorator(pass_result=False, handlers=None): + """ Builds an auth decorator that runs the given handlers and, if any return successfully, + sets up the auth context. The wrapped function will be invoked *regardless of success or + failure of the auth handler(s)* + """ + def processor(func): + @wraps(func) + def wrapper(*args, **kwargs): + auth_header = request.headers.get('authorization', '') + result = None -def _load_user_from_cookie(): - if not current_user.is_anonymous: - try: - # Attempt to parse the user uuid to make sure the cookie has the right value type - UUID(current_user.get_id()) - except ValueError: - return None + for handler in handlers: + result = handler(auth_header) + # If the handler was missing the necessary information, skip it and try the next one. + if result.missing: + continue - logger.debug('Loading user from cookie: %s', current_user.get_id()) - db_user = current_user.db_user() - if db_user is not None: - # Don't allow disabled users to login. - if not db_user.enabled: - return None + # Check for a valid result. + if result.auth_valid: + logger.debug('Found valid auth result: %s => (%s)', result.tuple()) - set_authenticated_user(db_user) - loaded = QuayDeferredPermissionUser.for_user(db_user) - identity_changed.send(app, identity=loaded) - return db_user + # Set the various pieces of the auth context. + result.apply_to_context() - return None + # Log the metric. + metric_queue.authentication_count.Inc(labelvalues=[result.kind, True]) + break + + # Otherwise, report the error. + if result.error_message is not None: + # Log the failure. + metric_queue.authentication_count.Inc(labelvalues=[result.kind, False]) + break + + if pass_result: + kwargs['auth_result'] = result + + return func(*args, **kwargs) + return wrapper + return processor -def _validate_and_apply_oauth_token(token): - validated = model.oauth.validate_access_token(token) - if not validated: - logger.warning('OAuth access token could not be validated: %s', token) - metric_queue.authentication_count.Inc(labelvalues=['oauth', False]) - raise InvalidToken('OAuth access token could not be validated: {token}'.format(token=token)) - elif validated.expires_at <= datetime.utcnow(): - logger.info('OAuth access with an expired token: %s', token) - metric_queue.authentication_count.Inc(labelvalues=['oauth', False]) - raise ExpiredToken('OAuth access token has expired: {token}'.format(token=token)) - - # Don't allow disabled users to login. - if not validated.authorized_user.enabled: - metric_queue.authentication_count.Inc(labelvalues=['oauth', False]) - return False - - # We have a valid token - scope_set = scopes.scopes_from_scope_string(validated.scope) - logger.debug('Successfully validated oauth access token: %s with scope: %s', token, - scope_set) - - set_authenticated_user(validated.authorized_user) - set_validated_oauth_token(validated) - - new_identity = QuayDeferredPermissionUser.for_user(validated.authorized_user, scope_set) - identity_changed.send(app, identity=new_identity) - metric_queue.authentication_count.Inc(labelvalues=['oauth', True]) - return True - - -def _parse_basic_auth_header(auth): - normalized = [part.strip() for part in auth.split(' ') if part] - if normalized[0].lower() != 'basic' or len(normalized) != 2: - logger.debug('Invalid basic auth format.') - return None - - logger.debug('Found basic auth header: %s', auth) - try: - credentials = [part.decode('utf-8') for part in b64decode(normalized[1]).split(':', 1)] - except TypeError: - logger.exception('Exception when parsing basic auth header') - return None - - if len(credentials) != 2: - logger.debug('Invalid basic auth credential format.') - return None - - return credentials - - -def _process_basic_auth(auth): - credentials = _parse_basic_auth_header(auth) - if credentials is None: - return - - if credentials[0] == '$token': - # Use as token auth - try: - token = model.token.load_token_data(credentials[1]) - logger.debug('Successfully validated token: %s', credentials[1]) - set_validated_token(token) - identity_changed.send(app, identity=Identity(token.code, 'token')) - metric_queue.authentication_count.Inc(labelvalues=['token', True]) - return True - - except model.DataModelException: - logger.debug('Invalid token: %s', credentials[1]) - metric_queue.authentication_count.Inc(labelvalues=['token', False]) - - elif credentials[0] == '$oauthtoken': - oauth_token = credentials[1] - return _validate_and_apply_oauth_token(oauth_token) - - elif '+' in credentials[0]: - logger.debug('Trying robot auth with credentials %s', str(credentials)) - # Use as robot auth - try: - robot = model.user.verify_robot(credentials[0], credentials[1]) - logger.debug('Successfully validated robot: %s', credentials[0]) - set_authenticated_user(robot) - - deferred_robot = QuayDeferredPermissionUser.for_user(robot) - identity_changed.send(app, identity=deferred_robot) - metric_queue.authentication_count.Inc(labelvalues=['robot', True]) - return True - except model.InvalidRobotException: - logger.debug('Invalid robot or password for robot: %s', credentials[0]) - metric_queue.authentication_count.Inc(labelvalues=['robot', False]) - - else: - (authenticated, _) = authentication.verify_and_link_user(credentials[0], credentials[1], - basic_auth=True) - if authenticated: - logger.debug('Successfully validated user: %s', authenticated.username) - set_authenticated_user(authenticated) - - new_identity = QuayDeferredPermissionUser.for_user(authenticated) - identity_changed.send(app, identity=new_identity) - metric_queue.authentication_count.Inc(labelvalues=['user', True]) - return True - else: - metric_queue.authentication_count.Inc(labelvalues=['user', False]) - - # We weren't able to authenticate via basic auth. - logger.debug('Basic auth present but could not be validated.') - return False - - -def has_basic_auth(username): - auth = request.headers.get('authorization', '') - if not auth: - return False - - credentials = _parse_basic_auth_header(auth) - if not credentials: - return False - - (authenticated, _) = authentication.verify_and_link_user(credentials[0], credentials[1], - basic_auth=True) - if not authenticated: - return False - - return authenticated.username == username - - -def generate_signed_token(grants, user_context): - ser = SecureCookieSessionInterface().get_signing_serializer(app) - data_to_sign = { - 'grants': grants, - 'user_context': user_context, - } - - encrypted = ser.dumps(data_to_sign) - return '{0}{1}'.format(SIGNATURE_PREFIX, encrypted) - - -def _process_signed_grant(auth): - normalized = [part.strip() for part in auth.split(' ') if part] - if normalized[0].lower() != 'token' or len(normalized) != 2: - logger.debug('Not a token: %s', auth) - return False - - if not normalized[1].startswith(SIGNATURE_PREFIX): - logger.debug('Not a signed grant token: %s', auth) - return False - - encrypted = normalized[1][len(SIGNATURE_PREFIX):] - ser = SecureCookieSessionInterface().get_signing_serializer(app) - - try: - token_data = ser.loads(encrypted, max_age=app.config['SIGNED_GRANT_EXPIRATION_SEC']) - except BadSignature: - logger.warning('Signed grant could not be validated: %s', encrypted) - metric_queue.authentication_count.Inc(labelvalues=['signed', False]) - abort(401, message='Signed grant could not be validated: %(auth)s', issue='invalid-auth-token', - auth=auth) - - logger.debug('Successfully validated signed grant with data: %s', token_data) - - loaded_identity = Identity(None, 'signed_grant') - - if token_data['user_context']: - set_grant_context({ - 'user': token_data['user_context'], - 'kind': 'user', - }) - - loaded_identity.provides.update(token_data['grants']) - identity_changed.send(app, identity=loaded_identity) - metric_queue.authentication_count.Inc(labelvalues=['signed', True]) - return True - - -def process_oauth(func): - @wraps(func) - def wrapper(*args, **kwargs): - auth = request.headers.get('authorization', '') - if auth: - normalized = [part.strip() for part in auth.split(' ') if part] - if normalized[0].lower() != 'bearer' or len(normalized) != 2: - logger.debug('Invalid oauth bearer token format.') - return func(*args, **kwargs) - - token = normalized[1] - _validate_and_apply_oauth_token(token) - elif _load_user_from_cookie() is None: - logger.debug('No auth header or login cookie.') - return func(*args, **kwargs) - return wrapper - - -def process_auth(func): - @wraps(func) - def wrapper(*args, **kwargs): - auth = request.headers.get('authorization', '') - - if auth: - logger.debug('Validating auth header: %s', auth) - _process_signed_grant(auth) - _process_basic_auth(auth) - else: - logger.debug('No auth header.') - - return func(*args, **kwargs) - return wrapper - - -def process_auth_or_cookie(func): - @wraps(func) - def wrapper(*args, **kwargs): - auth = request.headers.get('authorization', '') - - if auth: - logger.debug('Validating auth header: %s', auth) - _process_basic_auth(auth) - else: - logger.debug('No auth header.') - _load_user_from_cookie() - - return func(*args, **kwargs) - return wrapper +process_oauth = _auth_decorator(handlers=[validate_bearer_auth, validate_session_cookie]) +process_auth = _auth_decorator(handlers=[validate_signed_grant, validate_basic_auth]) +process_auth_or_cookie = _auth_decorator(handlers=[validate_basic_auth, validate_session_cookie]) +process_basic_auth = _auth_decorator(handlers=[validate_basic_auth], pass_result=True) def require_session_login(func): + """ Decorates a function and ensures that a valid session cookie exists or a 401 is raised. If + a valid session cookie does exist, the authenticated user and identity are also set. + """ @wraps(func) def wrapper(*args, **kwargs): - loaded = _load_user_from_cookie() - if loaded is None or loaded.organization: - abort(401, message='Method requires login and no valid login could be loaded.') - return func(*args, **kwargs) + result = validate_session_cookie() + if result.has_user: + result.apply_to_context() + metric_queue.authentication_count.Inc(labelvalues=[result.kind, True]) + return func(*args, **kwargs) + elif not result.missing: + metric_queue.authentication_count.Inc(labelvalues=[result.kind, False]) + + abort(401, message='Method requires login and no valid login could be loaded.') return wrapper def extract_namespace_repo_from_session(func): + """ Extracts the namespace and repository name from the current session (which must exist) + and passes them into the decorated function as the first and second arguments. If the + session doesn't exist or does not contain these arugments, a 400 error is raised. + """ @wraps(func) def wrapper(*args, **kwargs): if 'namespace' not in session or 'repository' not in session: diff --git a/auth/oauth.py b/auth/oauth.py new file mode 100644 index 000000000..662917e36 --- /dev/null +++ b/auth/oauth.py @@ -0,0 +1,49 @@ +import logging + +from datetime import datetime + +from auth.scopes import scopes_from_scope_string +from auth.validateresult import AuthKind, ValidateResult +from data import model + +logger = logging.getLogger(__name__) + +def validate_bearer_auth(auth_header): + """ Validates an OAuth token found inside a basic auth `Bearer` token, returning whether it + points to a valid OAuth token. + """ + if not auth_header: + return ValidateResult(AuthKind.oauth, missing=True) + + normalized = [part.strip() for part in auth_header.split(' ') if part] + if normalized[0].lower() != 'bearer' or len(normalized) != 2: + logger.debug('Got invalid bearer token format: %s', auth_header) + return ValidateResult(AuthKind.oauth, missing=True) + + (_, oauth_token) = normalized + return validate_oauth_token(oauth_token) + + +def validate_oauth_token(token): + """ Validates the specified OAuth token, returning whether it points to a valid OAuth token. + """ + validated = model.oauth.validate_access_token(token) + if not validated: + logger.warning('OAuth access token could not be validated: %s', token) + return ValidateResult(AuthKind.oauth, + error_message='OAuth access token could not be validated') + + if validated.expires_at <= datetime.utcnow(): + logger.info('OAuth access with an expired token: %s', token) + return ValidateResult(AuthKind.oauth, error_message='OAuth access token has expired') + + # Don't allow disabled users to login. + if not validated.authorized_user.enabled: + return ValidateResult(AuthKind.oauth, + error_message='Granter of the oauth access token is disabled') + + # We have a valid token + scope_set = scopes_from_scope_string(validated.scope) + logger.debug('Successfully validated oauth access token: %s with scope: %s', token, + scope_set) + return ValidateResult(AuthKind.oauth, oauthtoken=validated) diff --git a/auth/signedgrant.py b/auth/signedgrant.py new file mode 100644 index 000000000..b8169114d --- /dev/null +++ b/auth/signedgrant.py @@ -0,0 +1,55 @@ +import logging + +from flask.sessions import SecureCookieSessionInterface, BadSignature + +from app import app +from auth.validateresult import AuthKind, ValidateResult + +logger = logging.getLogger(__name__) + +# The prefix for all signatures of signed granted. +SIGNATURE_PREFIX = 'sigv2=' + +def generate_signed_token(grants, user_context): + """ Generates a signed session token with the given grants and user context. """ + ser = SecureCookieSessionInterface().get_signing_serializer(app) + data_to_sign = { + 'grants': grants, + 'user_context': user_context, + } + + encrypted = ser.dumps(data_to_sign) + return '{0}{1}'.format(SIGNATURE_PREFIX, encrypted) + + +def validate_signed_grant(auth_header): + """ Validates a signed grant as found inside an auth header and returns whether it points to + a valid grant. + """ + if not auth_header: + return ValidateResult(AuthKind.signed_grant, missing=True) + + # Try to parse the token from the header. + normalized = [part.strip() for part in auth_header.split(' ') if part] + if normalized[0].lower() != 'token' or len(normalized) != 2: + logger.debug('Not a token: %s', auth_header) + return ValidateResult(AuthKind.signed_grant, missing=True) + + # Check that it starts with the expected prefix. + if not normalized[1].startswith(SIGNATURE_PREFIX): + logger.debug('Not a signed grant token: %s', auth_header) + return ValidateResult(AuthKind.signed_grant, missing=True) + + # Decrypt the grant. + encrypted = normalized[1][len(SIGNATURE_PREFIX):] + ser = SecureCookieSessionInterface().get_signing_serializer(app) + + try: + token_data = ser.loads(encrypted, max_age=app.config['SIGNED_GRANT_EXPIRATION_SEC']) + except BadSignature: + logger.warning('Signed grant could not be validated: %s', encrypted) + return ValidateResult(AuthKind.signed_grant, + error_message='Signed grant could not be validated') + + logger.debug('Successfully validated signed grant with data: %s', token_data) + return ValidateResult(AuthKind.signed_grant, signed_data=token_data) diff --git a/auth/test/test_basic.py b/auth/test/test_basic.py new file mode 100644 index 000000000..1b55d7dbe --- /dev/null +++ b/auth/test/test_basic.py @@ -0,0 +1,69 @@ +import pytest + +from base64 import b64encode + +from auth.basic import validate_basic_auth, ACCESS_TOKEN_USERNAME, OAUTH_TOKEN_USERNAME +from auth.validateresult import AuthKind, ValidateResult +from data import model +from test.fixtures import app, appconfig, database_uri, init_db_path, sqlitedb_file + +def _token(username, password): + return 'basic ' + b64encode('%s:%s' % (username, password)) + +@pytest.mark.parametrize('token, expected_result', [ + ('', ValidateResult(AuthKind.basic, missing=True)), + ('someinvalidtoken', ValidateResult(AuthKind.basic, missing=True)), + ('somefoobartoken', ValidateResult(AuthKind.basic, missing=True)), + ('basic ', ValidateResult(AuthKind.basic, missing=True)), + ('basic some token', ValidateResult(AuthKind.basic, missing=True)), + ('basic sometoken', ValidateResult(AuthKind.basic, missing=True)), + + (_token(ACCESS_TOKEN_USERNAME, 'invalid'), + ValidateResult(AuthKind.basic, error_message='Invalid access token')), + + (_token(OAUTH_TOKEN_USERNAME, 'invalid'), + ValidateResult(AuthKind.oauth, error_message='OAuth access token could not be validated')), + + (_token('devtable', 'invalid'), + ValidateResult(AuthKind.basic, error_message='Invalid Username or Password')), + + (_token('devtable+somebot', 'invalid'), + ValidateResult(AuthKind.basic, + error_message='Could not find robot with username: devtable+somebot ' + + 'and supplied password.')), + + (_token('disabled', 'password'), + ValidateResult(AuthKind.basic, + error_message='This user has been disabled. Please contact your administrator.')), +]) +def test_validate_basic_auth_token(token, expected_result, app): + result = validate_basic_auth(token) + assert result == expected_result + + +def test_valid_user(app): + token = _token('devtable', 'password') + result = validate_basic_auth(token) + assert result == ValidateResult(AuthKind.basic, user=model.user.get_user('devtable')) + + +def test_valid_robot(app): + robot, password = model.user.create_robot('somerobot', model.user.get_user('devtable')) + token = _token(robot.username, password) + result = validate_basic_auth(token) + assert result == ValidateResult(AuthKind.basic, robot=robot) + + +def test_valid_token(app): + access_token = model.token.create_delegate_token('devtable', 'simple', 'sometoken') + token = _token(ACCESS_TOKEN_USERNAME, access_token.code) + result = validate_basic_auth(token) + assert result == ValidateResult(AuthKind.basic, token=access_token) + + +def test_valid_oauth(app): + user = model.user.get_user('devtable') + oauth_token = list(model.oauth.list_access_tokens_for_user(user))[0] + token = _token(OAUTH_TOKEN_USERNAME, oauth_token.access_token) + result = validate_basic_auth(token) + assert result == ValidateResult(AuthKind.oauth, oauthtoken=oauth_token) diff --git a/auth/test/test_cookie.py b/auth/test/test_cookie.py new file mode 100644 index 000000000..7e13a006f --- /dev/null +++ b/auth/test/test_cookie.py @@ -0,0 +1,61 @@ +import uuid + +from flask_login import login_user + +from app import LoginWrappedDBUser +from data import model +from auth.cookie import validate_session_cookie +from test.fixtures import app, appconfig, database_uri, init_db_path, sqlitedb_file + +def test_anonymous_cookie(app): + assert validate_session_cookie().missing + +def test_invalidformatted_cookie(app): + # "Login" with a non-UUID reference. + someuser = model.user.get_user('devtable') + login_user(LoginWrappedDBUser('somenonuuid', someuser)) + + # Ensure we get an invalid session cookie format error. + result = validate_session_cookie() + assert result.authed_user is None + assert result.identity is None + assert not result.has_user + assert result.error_message == 'Invalid session cookie format' + +def test_disabled_user(app): + # "Login" with a disabled user. + someuser = model.user.get_user('disabled') + login_user(LoginWrappedDBUser(someuser.uuid, someuser)) + + # Ensure we get an invalid session cookie format error. + result = validate_session_cookie() + assert result.authed_user is None + assert result.identity is None + assert not result.has_user + assert result.error_message == 'User account is disabled' + +def test_valid_user(app): + # Login with a valid user. + someuser = model.user.get_user('devtable') + login_user(LoginWrappedDBUser(someuser.uuid, someuser)) + + result = validate_session_cookie() + assert result.authed_user == someuser + assert result.identity is not None + assert result.has_user + assert result.error_message is None + +def test_valid_organization(app): + # "Login" with a valid organization. + someorg = model.user.get_namespace_user('buynlarge') + someorg.uuid = str(uuid.uuid4()) + someorg.verified = True + someorg.save() + + login_user(LoginWrappedDBUser(someorg.uuid, someorg)) + + result = validate_session_cookie() + assert result.authed_user is None + assert result.identity is None + assert not result.has_user + assert result.error_message == 'Cannot login to organization' diff --git a/auth/test/test_decorators.py b/auth/test/test_decorators.py new file mode 100644 index 000000000..5de82d3c1 --- /dev/null +++ b/auth/test/test_decorators.py @@ -0,0 +1,105 @@ +import pytest + +from flask import session +from flask_login import login_user +from werkzeug.exceptions import HTTPException + +from app import LoginWrappedDBUser +from auth.auth_context import get_authenticated_user +from auth.decorators import (extract_namespace_repo_from_session, require_session_login, + process_auth_or_cookie) +from data import model +from test.fixtures import app, appconfig, database_uri, init_db_path, sqlitedb_file + +def test_extract_namespace_repo_from_session_missing(app): + def emptyfunc(): + pass + + session.clear() + with pytest.raises(HTTPException): + extract_namespace_repo_from_session(emptyfunc)() + + +def test_extract_namespace_repo_from_session_present(app): + encountered = [] + + def somefunc(namespace, repository): + encountered.append(namespace) + encountered.append(repository) + + # Add the namespace and repository to the session. + session.clear() + session['namespace'] = 'foo' + session['repository'] = 'bar' + + # Call the decorated method. + extract_namespace_repo_from_session(somefunc)() + + assert encountered[0] == 'foo' + assert encountered[1] == 'bar' + + +def test_require_session_login_missing(app): + def emptyfunc(): + pass + + with pytest.raises(HTTPException): + require_session_login(emptyfunc)() + + +def test_require_session_login_valid_user(app): + def emptyfunc(): + pass + + # Login as a valid user. + someuser = model.user.get_user('devtable') + login_user(LoginWrappedDBUser(someuser.uuid, someuser)) + + # Call the function. + require_session_login(emptyfunc)() + + # Ensure the authenticated user was updated. + assert get_authenticated_user() == someuser + + +def test_require_session_login_invalid_user(app): + def emptyfunc(): + pass + + # "Login" as a disabled user. + someuser = model.user.get_user('disabled') + login_user(LoginWrappedDBUser(someuser.uuid, someuser)) + + # Call the function. + with pytest.raises(HTTPException): + require_session_login(emptyfunc)() + + # Ensure the authenticated user was not updated. + assert get_authenticated_user() is None + + +def test_process_auth_or_cookie_invalid_user(app): + def emptyfunc(): + pass + + # Call the function. + process_auth_or_cookie(emptyfunc)() + + # Ensure the authenticated user was not updated. + assert get_authenticated_user() is None + + +def test_process_auth_or_cookie_valid_user(app): + def emptyfunc(): + pass + + # Login as a valid user. + someuser = model.user.get_user('devtable') + login_user(LoginWrappedDBUser(someuser.uuid, someuser)) + + # Call the function. + process_auth_or_cookie(emptyfunc)() + + # Ensure the authenticated user was updated. + assert get_authenticated_user() == someuser + diff --git a/auth/test/test_oauth.py b/auth/test/test_oauth.py new file mode 100644 index 000000000..5a9d77a8e --- /dev/null +++ b/auth/test/test_oauth.py @@ -0,0 +1,48 @@ +import pytest + +from auth.oauth import validate_bearer_auth, validate_oauth_token +from auth.validateresult import AuthKind, ValidateResult +from data import model +from test.fixtures import app, appconfig, database_uri, init_db_path, sqlitedb_file + +@pytest.mark.parametrize('header, expected_result', [ + ('', ValidateResult(AuthKind.oauth, missing=True)), + ('somerandomtoken', ValidateResult(AuthKind.oauth, missing=True)), + ('bearer some random token', ValidateResult(AuthKind.oauth, missing=True)), + + ('bearer invalidtoken', + ValidateResult(AuthKind.oauth, error_message='OAuth access token could not be validated')), +]) +def test_bearer(header, expected_result, app): + assert validate_bearer_auth(header) == expected_result + +def test_valid_oauth(app): + user = model.user.get_user('devtable') + token = list(model.oauth.list_access_tokens_for_user(user))[0] + + result = validate_bearer_auth('bearer ' + token.access_token) + assert result.oauthtoken == token + assert result.authed_user == user + assert result.auth_valid + +def test_disabled_user_oauth(app): + user = model.user.get_user('disabled') + token = model.oauth.create_access_token_for_testing(user, 'deadbeef', 'repo:admin', + access_token='foo') + + result = validate_bearer_auth('bearer ' + token.access_token) + assert result.oauthtoken is None + assert result.authed_user is None + assert not result.auth_valid + assert result.error_message == 'Granter of the oauth access token is disabled' + +def test_expired_token(app): + user = model.user.get_user('devtable') + token = model.oauth.create_access_token_for_testing(user, 'deadbeef', 'repo:admin', + access_token='bar', expires_in=-1000) + + result = validate_bearer_auth('bearer ' + token.access_token) + assert result.oauthtoken is None + assert result.authed_user is None + assert not result.auth_valid + assert result.error_message == 'OAuth access token has expired' diff --git a/auth/test/test_scopes.py b/auth/test/test_scopes.py new file mode 100644 index 000000000..4f03fae10 --- /dev/null +++ b/auth/test/test_scopes.py @@ -0,0 +1,50 @@ +import pytest + +from auth.scopes import (scopes_from_scope_string, validate_scope_string, ALL_SCOPES, + is_subset_string) + +@pytest.mark.parametrize('scopes_string, expected', [ + # Valid single scopes. + ('repo:read', ['repo:read']), + ('repo:admin', ['repo:admin']), + + # Invalid scopes. + ('not:valid', []), + ('repo:admins', []), + + # Valid scope strings. + ('repo:read repo:admin', ['repo:read', 'repo:admin']), + ('repo:read,repo:admin', ['repo:read', 'repo:admin']), + ('repo:read,repo:admin repo:write', ['repo:read', 'repo:admin', 'repo:write']), + + # Partially invalid scopes. + ('repo:read,not:valid', []), + ('repo:read repo:admins', []), + + # Invalid scope strings. + ('repo:read|repo:admin', []), + + # Mixture of delimiters. + ('repo:read, repo:admin', []), +]) +def test_parsing(scopes_string, expected): + expected_scope_set = {ALL_SCOPES[scope_name] for scope_name in expected} + parsed_scope_set = scopes_from_scope_string(scopes_string) + assert parsed_scope_set == expected_scope_set + assert validate_scope_string(scopes_string) == bool(expected) + + +@pytest.mark.parametrize('superset, subset, result', [ + ('repo:read', 'repo:read', True), + ('repo:read repo:admin', 'repo:read', True), + ('repo:read,repo:admin', 'repo:read', True), + ('repo:read,repo:admin', 'repo:admin', True), + ('repo:read,repo:admin', 'repo:admin repo:read', True), + + ('', 'repo:read', False), + ('unknown:tag', 'repo:read', False), + ('repo:read unknown:tag', 'repo:read', False), + ('repo:read,unknown:tag', 'repo:read', False), +]) +def test_subset_string(superset, subset, result): + assert is_subset_string(superset, subset) == result diff --git a/auth/test/test_signedgrant.py b/auth/test/test_signedgrant.py new file mode 100644 index 000000000..620a162ee --- /dev/null +++ b/auth/test/test_signedgrant.py @@ -0,0 +1,18 @@ +import pytest + +from auth.signedgrant import validate_signed_grant, generate_signed_token, SIGNATURE_PREFIX +from auth.validateresult import AuthKind, ValidateResult + +@pytest.mark.parametrize('header, expected_result', [ + ('', ValidateResult(AuthKind.signed_grant, missing=True)), + ('somerandomtoken', ValidateResult(AuthKind.signed_grant, missing=True)), + ('token somerandomtoken', ValidateResult(AuthKind.signed_grant, missing=True)), + + ('token ' + SIGNATURE_PREFIX + 'foo', + ValidateResult(AuthKind.signed_grant, error_message='Signed grant could not be validated')), + + ('token ' + generate_signed_token({'a': 'b'}, {'c': 'd'}), + ValidateResult(AuthKind.signed_grant, signed_data={'grants': {'a': 'b'}, 'user_context': {'c': 'd'}})), +]) +def test_token(header, expected_result): + assert validate_signed_grant(header) == expected_result diff --git a/auth/test/test_validateresult.py b/auth/test/test_validateresult.py new file mode 100644 index 000000000..1f6523863 --- /dev/null +++ b/auth/test/test_validateresult.py @@ -0,0 +1,63 @@ +import pytest + +from auth.auth_context import (get_authenticated_user, get_grant_context, get_validated_token, + get_validated_oauth_token) +from auth.validateresult import AuthKind, ValidateResult +from data import model +from test.fixtures import app, appconfig, database_uri, init_db_path, sqlitedb_file + + +def get_user(): + return model.user.get_user('devtable') + +def get_robot(): + robot, _ = model.user.create_robot('somebot', get_user()) + return robot + +def get_token(): + return model.token.create_delegate_token('devtable', 'simple', 'sometoken') + +def get_oauthtoken(): + user = model.user.get_user('devtable') + return list(model.oauth.list_access_tokens_for_user(user))[0] + +def get_signeddata(): + return {'grants': {'a': 'b'}, 'user_context': {'c': 'd'}} + +@pytest.mark.parametrize('get_entity,entity_kind', [ + (get_user, 'user'), + (get_robot, 'robot'), + (get_token, 'token'), + (get_oauthtoken, 'oauthtoken'), + (get_signeddata, 'signed_data'), +]) +def test_apply_context(get_entity, entity_kind, app): + assert get_authenticated_user() is None + assert get_validated_token() is None + assert get_validated_oauth_token() is None + assert get_grant_context() is None + + entity = get_entity() + args = {} + args[entity_kind] = entity + + result = ValidateResult(AuthKind.basic, **args) + result.apply_to_context() + + expected_user = entity if entity_kind == 'user' or entity_kind == 'robot' else None + if entity_kind == 'oauthtoken': + expected_user = entity.authorized_user + + expected_token = entity if entity_kind == 'token' else None + expected_oauth = entity if entity_kind == 'oauthtoken' else None + + fake_grant = { + 'user': {'c': 'd'}, + 'kind': 'user', + } + expected_grant = fake_grant if entity_kind == 'signed_data' else None + + assert get_authenticated_user() == expected_user + assert get_validated_token() == expected_token + assert get_validated_oauth_token() == expected_oauth + assert get_grant_context() == expected_grant diff --git a/auth/validateresult.py b/auth/validateresult.py new file mode 100644 index 000000000..f6d08a9a8 --- /dev/null +++ b/auth/validateresult.py @@ -0,0 +1,101 @@ +from enum import Enum +from flask_principal import Identity, identity_changed + +from app import app +from auth.auth_context import (set_authenticated_user, set_validated_token, set_grant_context, + set_validated_oauth_token) +from auth.scopes import scopes_from_scope_string +from auth.permissions import QuayDeferredPermissionUser + + +class AuthKind(Enum): + cookie = 'cookie' + basic = 'basic' + oauth = 'oauth' + signed_grant = 'signed_grant' + + +class ValidateResult(object): + """ A result of validating auth in one form or another. """ + def __init__(self, kind, missing=False, user=None, token=None, oauthtoken=None, + robot=None, signed_data=None, error_message=None): + self.kind = kind + self.missing = missing + self.user = user + self.robot = robot + self.token = token + self.oauthtoken = oauthtoken + self.signed_data = signed_data + self.error_message = error_message + + def tuple(self): + return (self.kind, self.missing, self.user, self.token, self.oauthtoken, self.robot, + self.signed_data, self.error_message) + + def __eq__(self, other): + return self.tuple() == other.tuple() + + def apply_to_context(self): + """ Applies this auth result to the auth context and Flask-Principal. """ + # Set the various pieces of the auth context. + if self.oauthtoken: + set_authenticated_user(self.authed_user) + set_validated_oauth_token(self.oauthtoken) + elif self.authed_user: + set_authenticated_user(self.authed_user) + elif self.token: + set_validated_token(self.token) + elif self.signed_data: + if self.signed_data['user_context']: + set_grant_context({ + 'user': self.signed_data['user_context'], + 'kind': 'user', + }) + + # Set the identity for Flask-Principal. + if self.identity: + identity_changed.send(app, identity=self.identity) + + @property + def authed_user(self): + """ Returns the authenticated user, whether directly, or via an OAuth token. """ + if not self.auth_valid: + return None + + if self.oauthtoken: + return self.oauthtoken.authorized_user + + return self.user if self.user else self.robot + + @property + def identity(self): + """ Returns the identity for the auth result. """ + if not self.auth_valid: + return None + + if self.oauthtoken: + scope_set = scopes_from_scope_string(self.oauthtoken.scope) + return QuayDeferredPermissionUser.for_user(self.oauthtoken.authorized_user, scope_set) + + if self.authed_user: + return QuayDeferredPermissionUser.for_user(self.authed_user) + + if self.token: + return Identity(self.token.code, 'token') + + if self.signed_data: + identity = Identity(None, 'signed_grant') + identity.provides.update(self.signed_data['grants']) + return identity + + return None + + @property + def has_user(self): + """ Returns whether a user (not a robot) was authenticated successfully. """ + return bool(self.user) + + @property + def auth_valid(self): + """ Returns whether authentication successfully occurred. """ + return self.user or self.token or self.oauthtoken or self.robot or self.signed_data diff --git a/data/model/oauth.py b/data/model/oauth.py index cb484d5f7..003332acb 100644 --- a/data/model/oauth.py +++ b/data/model/oauth.py @@ -283,8 +283,9 @@ def list_applications_for_org(org): return query -def create_access_token_for_testing(user_obj, client_id, scope, access_token='test'): - expires_at = datetime.utcnow() + timedelta(seconds=10000) +def create_access_token_for_testing(user_obj, client_id, scope, access_token='test', + expires_in=10000): + expires_at = datetime.utcnow() + timedelta(seconds=expires_in) application = get_application_for_client_id(client_id) created = OAuthAccessToken.create(application=application, authorized_user=user_obj, scope=scope, token_type='token', access_token=access_token, diff --git a/endpoints/v1/index.py b/endpoints/v1/index.py index 51f68408f..f579e5f15 100644 --- a/endpoints/v1/index.py +++ b/endpoints/v1/index.py @@ -9,10 +9,11 @@ from flask import request, make_response, jsonify, session from data.interfaces.v1 import pre_oci_model as model from app import authentication, userevents, metric_queue from auth.auth_context import get_authenticated_user, get_validated_token, get_validated_oauth_token -from auth.decorators import process_auth, generate_signed_token +from auth.decorators import process_auth from auth.permissions import (ModifyRepositoryPermission, UserAdminPermission, ReadRepositoryPermission, CreateRepositoryPermission, repository_read_grant, repository_write_grant) +from auth.signedgrant import generate_signed_token from util.http import abort from util.names import REPOSITORY_NAME_REGEX from endpoints.common import parse_repository_name diff --git a/endpoints/web.py b/endpoints/web.py index dbee4fcb1..4cd7a1f74 100644 --- a/endpoints/web.py +++ b/endpoints/web.py @@ -14,8 +14,8 @@ from app import (app, billing as stripe, build_logs, avatar, signer, log_archive get_app_url, instance_keys, user_analytics) from auth import scopes from auth.auth_context import get_authenticated_user -from auth.decorators import (has_basic_auth, require_session_login, process_oauth, - process_auth_or_cookie) +from auth.basic import has_basic_auth +from auth.decorators import require_session_login, process_oauth, process_auth_or_cookie from auth.permissions import (AdministerOrganizationPermission, ReadRepositoryPermission, SuperUserPermission, AdministerRepositoryPermission, ModifyRepositoryPermission, OrganizationMemberPermission) diff --git a/test/test_auth.py b/test/test_auth.py deleted file mode 100644 index c5112ebf4..000000000 --- a/test/test_auth.py +++ /dev/null @@ -1,185 +0,0 @@ -import base64 -import unittest - -from datetime import datetime, timedelta - -from flask import g -from flask_principal import identity_loaded - -from app import app -from auth.scopes import (scopes_from_scope_string, is_subset_string, DIRECT_LOGIN, ADMIN_REPO, - ALL_SCOPES) -from auth.permissions import QuayDeferredPermissionUser -from auth.process import _process_basic_auth -from data import model -from data.database import OAuthApplication, OAuthAccessToken -from endpoints.api import api -from endpoints.api.user import User, Signin -from test.test_api_usage import ApiTestCase - - -ADMIN_ACCESS_USER = 'devtable' -DISABLED_USER = 'disabled' - - -@identity_loaded.connect_via(app) -def on_identity_loaded(sender, identity): - g.identity = identity - -class TestAuth(ApiTestCase): - def verify_cookie_auth(self, username): - resp = self.getJsonResponse(User) - self.assertEquals(resp['username'], username) - - def verify_identity(self, id): - try: - identity = g.identity - except: - identity = None - - self.assertIsNotNone(identity) - self.assertEquals(identity.id, id) - - def verify_no_identity(self): - try: - identity = g.identity - except: - identity = None - - self.assertIsNone(identity) - - def conduct_basic_auth(self, username, password): - encoded = base64.b64encode(username + ':' + password) - try: - _process_basic_auth('Basic ' + encoded) - except: - pass - - def create_oauth(self, user): - oauth_app = OAuthApplication.create(client_id='onetwothree', redirect_uri='', - application_uri='', organization=user, - name='someapp') - - expires_at = datetime.utcnow() + timedelta(seconds=50000) - OAuthAccessToken.create(application=oauth_app, authorized_user=user, - scope='repo:admin', - access_token='access1234', token_type='Bearer', - expires_at=expires_at, refresh_token=None, data={}) - - def test_login(self): - password = 'password' - resp = self.postJsonResponse(Signin, data=dict(username=ADMIN_ACCESS_USER, password=password)) - self.assertTrue(resp.get('success')) - self.verify_cookie_auth(ADMIN_ACCESS_USER) - - def test_login_disabled(self): - password = 'password' - self.postJsonResponse(Signin, data=dict(username=DISABLED_USER, password=password), - expected_code=403) - - def test_basic_auth_user(self): - user = model.user.get_user(ADMIN_ACCESS_USER) - self.conduct_basic_auth(ADMIN_ACCESS_USER, 'password') - self.verify_identity(user.uuid) - - def test_basic_auth_disabled_user(self): - user = model.user.get_user(DISABLED_USER) - self.conduct_basic_auth(DISABLED_USER, 'password') - self.verify_no_identity() - - def test_basic_auth_token(self): - token = model.token.create_delegate_token(ADMIN_ACCESS_USER, 'simple', 'sometoken') - self.conduct_basic_auth('$token', token.code) - self.verify_identity(token.code) - - def test_basic_auth_invalid_token(self): - self.conduct_basic_auth('$token', 'foobar') - self.verify_no_identity() - - def test_basic_auth_invalid_user(self): - self.conduct_basic_auth('foobarinvalid', 'foobar') - self.verify_no_identity() - - def test_oauth_invalid(self): - self.conduct_basic_auth('$oauthtoken', 'foobar') - self.verify_no_identity() - - def test_oauth_invalid_http_response(self): - rv = self.app.get(api.url_for(User), headers={'Authorization': 'Bearer bad_token'}) - assert 'WWW-Authenticate' in rv.headers - self.assertEquals(401, rv.status_code) - - def test_oauth_valid_user(self): - user = model.user.get_user(ADMIN_ACCESS_USER) - self.create_oauth(user) - self.conduct_basic_auth('$oauthtoken', 'access1234') - self.verify_identity(user.uuid) - - def test_oauth_disabled_user(self): - user = model.user.get_user(DISABLED_USER) - self.create_oauth(user) - self.conduct_basic_auth('$oauthtoken', 'access1234') - self.verify_no_identity() - - def test_basic_auth_robot(self): - user = model.user.get_user(ADMIN_ACCESS_USER) - robot, passcode = model.user.get_robot('dtrobot', user) - self.conduct_basic_auth(robot.username, passcode) - self.verify_identity(robot.uuid) - - def test_basic_auth_robot_invalidcode(self): - user = model.user.get_user(ADMIN_ACCESS_USER) - robot, _ = model.user.get_robot('dtrobot', user) - self.conduct_basic_auth(robot.username, 'someinvalidcode') - self.verify_no_identity() - - def test_deferred_permissions_scopes(self): - self.assertEquals(QuayDeferredPermissionUser.for_id('123454')._scope_set, {DIRECT_LOGIN}) - self.assertEquals(QuayDeferredPermissionUser.for_id('123454', {})._scope_set, {}) - self.assertEquals(QuayDeferredPermissionUser.for_id('123454', {ADMIN_REPO})._scope_set, {ADMIN_REPO}) - - def assertParsedScopes(self, scopes_str, *args): - expected_scope_set = {ALL_SCOPES[scope_name] for scope_name in args} - parsed_scope_set = scopes_from_scope_string(scopes_str) - self.assertEquals(parsed_scope_set, expected_scope_set) - - def test_scopes_parsing(self): - # Valid single scopes. - self.assertParsedScopes('repo:read', 'repo:read') - self.assertParsedScopes('repo:admin', 'repo:admin') - - # Invalid scopes. - self.assertParsedScopes('not:valid') - self.assertParsedScopes('repo:admins') - - # Valid scope strings. - self.assertParsedScopes('repo:read repo:admin', 'repo:read', 'repo:admin') - self.assertParsedScopes('repo:read,repo:admin', 'repo:read', 'repo:admin') - self.assertParsedScopes('repo:read,repo:admin repo:write', 'repo:read', 'repo:admin', - 'repo:write') - - # Partially invalid scopes. - self.assertParsedScopes('repo:read,not:valid') - self.assertParsedScopes('repo:read repo:admins') - - # Invalid scope strings. - self.assertParsedScopes('repo:read|repo:admin') - - # Mixture of delimiters. - self.assertParsedScopes('repo:read, repo:admin') - - def test_subset_string(self): - self.assertTrue(is_subset_string('repo:read', 'repo:read')) - self.assertTrue(is_subset_string('repo:read repo:admin', 'repo:read')) - self.assertTrue(is_subset_string('repo:read,repo:admin', 'repo:read')) - self.assertTrue(is_subset_string('repo:read,repo:admin', 'repo:admin')) - self.assertTrue(is_subset_string('repo:read,repo:admin', 'repo:admin repo:read')) - - self.assertFalse(is_subset_string('', 'repo:read')) - self.assertFalse(is_subset_string('unknown:tag', 'repo:read')) - self.assertFalse(is_subset_string('repo:read unknown:tag', 'repo:read')) - self.assertFalse(is_subset_string('repo:read,unknown:tag', 'repo:read')) - -if __name__ == '__main__': - unittest.main() - diff --git a/test/test_endpoints.py b/test/test_endpoints.py index a87f09377..39c9928ee 100644 --- a/test/test_endpoints.py +++ b/test/test_endpoints.py @@ -345,8 +345,6 @@ class OAuthTestCase(EndpointTestCase): self.postResponse('web.authorize_application', form=form, with_csrf=False, expected_code=403) def test_authorize_nocsrf_withinvalidheader(self): - self.login('devtable', 'password') - # Note: Defined in initdb.py form = { 'client_id': 'deadbeef', @@ -358,8 +356,6 @@ class OAuthTestCase(EndpointTestCase): self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=401) def test_authorize_nocsrf_withbadheader(self): - self.login('devtable', 'password') - # Note: Defined in initdb.py form = { 'client_id': 'deadbeef', @@ -368,7 +364,8 @@ class OAuthTestCase(EndpointTestCase): } headers = dict(authorization='Basic ' + base64.b64encode('devtable:invalidpassword')) - self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=401) + self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, + expected_code=401) def test_authorize_nocsrf_correctheader(self): # Note: Defined in initdb.py @@ -380,13 +377,15 @@ class OAuthTestCase(EndpointTestCase): # Try without the client id being in the whitelist. headers = dict(authorization='Basic ' + base64.b64encode('devtable:password')) - self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=403) + self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, + expected_code=403) # Add the client ID to the whitelist and try again. app.config['DIRECT_OAUTH_CLIENTID_WHITELIST'] = ['deadbeef'] headers = dict(authorization='Basic ' + base64.b64encode('devtable:password')) - resp = self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=True, expected_code=302) + resp = self.postResponse('web.authorize_application', headers=headers, form=form, + with_csrf=True, expected_code=302) self.assertTrue('access_token=' in resp.headers['Location']) def test_authorize_nocsrf_ratelimiting(self):