Extract credential handling into its own module

Will be used in Docker V1 and APPR protocols
This commit is contained in:
Joseph Schorr 2017-10-27 14:05:53 -04:00 committed by Joseph Schorr
parent 4a5626e64b
commit 6f3d9a6fce
5 changed files with 94 additions and 49 deletions

View file

@ -3,18 +3,11 @@ import logging
from base64 import b64decode
from flask import request
from app import authentication
from auth.oauth import validate_oauth_token
from auth.credentials import validate_credentials
from auth.validateresult import ValidateResult, AuthKind
from data import model
from util.names import parse_robot_username
logger = logging.getLogger(__name__)
ACCESS_TOKEN_USERNAME = '$token'
OAUTH_TOKEN_USERNAME = '$oauthtoken'
def has_basic_auth(username):
""" Returns true if a basic auth header exists with a username and password pair that validates
against the internal authentication system. Returns True on full success and False on any
@ -41,44 +34,7 @@ def validate_basic_auth(auth_header):
return ValidateResult(AuthKind.basic, missing=True)
auth_username, auth_password_or_token = credentials
# Check for access tokens.
if auth_username == ACCESS_TOKEN_USERNAME:
logger.debug('Found basic auth header for access token')
try:
token = model.token.load_token_data(auth_password_or_token)
logger.debug('Successfully validated basic auth for access token %s', token.id)
return ValidateResult(AuthKind.basic, token=token)
except model.DataModelException:
logger.warning('Failed to validate basic auth for access token %s', auth_password_or_token)
return ValidateResult(AuthKind.basic, error_message='Invalid access token')
# Check for OAuth tokens.
if auth_username == OAUTH_TOKEN_USERNAME:
return validate_oauth_token(auth_password_or_token)
# Check for robots and users.
is_robot = parse_robot_username(auth_username)
if is_robot:
logger.debug('Found basic auth header for robot %s', auth_username)
try:
robot = model.user.verify_robot(auth_username, auth_password_or_token)
logger.debug('Successfully validated basic auth for robot %s', auth_username)
return ValidateResult(AuthKind.basic, robot=robot)
except model.InvalidRobotException as ire:
logger.warning('Failed to validate basic auth for robot %s: %s', auth_username, ire.message)
return ValidateResult(AuthKind.basic, error_message=ire.message)
# Otherwise, treat as a standard user.
(authenticated, err) = authentication.verify_and_link_user(auth_username, auth_password_or_token,
basic_auth=True)
if authenticated:
logger.debug('Successfully validated basic auth for user %s', authenticated.username)
return ValidateResult(AuthKind.basic, user=authenticated)
else:
logger.warning('Failed to validate basic auth for user %s: %s', auth_username, err)
return ValidateResult(AuthKind.basic, error_message=err)
return validate_credentials(auth_username, auth_password_or_token).with_kind(AuthKind.basic)
def _parse_basic_auth_header(auth):

52
auth/credentials.py Normal file
View file

@ -0,0 +1,52 @@
import logging
from app import authentication
from auth.oauth import validate_oauth_token
from auth.validateresult import ValidateResult, AuthKind
from data import model
from util.names import parse_robot_username
logger = logging.getLogger(__name__)
ACCESS_TOKEN_USERNAME = '$token'
OAUTH_TOKEN_USERNAME = '$oauthtoken'
def validate_credentials(auth_username, auth_password_or_token):
""" Validates a pair of auth username and password/token credentials. """
# Check for access tokens.
if auth_username == ACCESS_TOKEN_USERNAME:
logger.debug('Found basic auth header for access token')
try:
token = model.token.load_token_data(auth_password_or_token)
logger.debug('Successfully validated basic auth for access token %s', token.id)
return ValidateResult(AuthKind.credentials, token=token)
except model.DataModelException:
logger.warning('Failed to validate basic auth for access token %s', auth_password_or_token)
return ValidateResult(AuthKind.credentials, error_message='Invalid access token')
# Check for OAuth tokens.
if auth_username == OAUTH_TOKEN_USERNAME:
return validate_oauth_token(auth_password_or_token)
# Check for robots and users.
is_robot = parse_robot_username(auth_username)
if is_robot:
logger.debug('Found basic auth header for robot %s', auth_username)
try:
robot = model.user.verify_robot(auth_username, auth_password_or_token)
logger.debug('Successfully validated basic auth for robot %s', auth_username)
return ValidateResult(AuthKind.credentials, robot=robot)
except model.InvalidRobotException as ire:
logger.warning('Failed to validate basic auth for robot %s: %s', auth_username, ire.message)
return ValidateResult(AuthKind.credentials, error_message=ire.message)
# Otherwise, treat as a standard user.
(authenticated, err) = authentication.verify_and_link_user(auth_username, auth_password_or_token,
basic_auth=True)
if authenticated:
logger.debug('Successfully validated basic auth for user %s', authenticated.username)
return ValidateResult(AuthKind.credentials, user=authenticated)
else:
logger.warning('Failed to validate basic auth for user %s: %s', auth_username, err)
return ValidateResult(AuthKind.credentials, error_message=err)

View file

@ -2,7 +2,8 @@ import pytest
from base64 import b64encode
from auth.basic import validate_basic_auth, ACCESS_TOKEN_USERNAME, OAUTH_TOKEN_USERNAME
from auth.basic import validate_basic_auth
from auth.credentials import ACCESS_TOKEN_USERNAME, OAUTH_TOKEN_USERNAME
from auth.validateresult import AuthKind, ValidateResult
from data import model
@ -23,7 +24,7 @@ def _token(username, password):
(_token(ACCESS_TOKEN_USERNAME, 'invalid'), ValidateResult(AuthKind.basic,
error_message='Invalid access token')),
(_token(OAUTH_TOKEN_USERNAME, 'invalid'),
ValidateResult(AuthKind.oauth, error_message='OAuth access token could not be validated')),
ValidateResult(AuthKind.basic, error_message='OAuth access token could not be validated')),
(_token('devtable', 'invalid'), ValidateResult(AuthKind.basic,
error_message='Invalid Username or Password')),
(_token('devtable+somebot', 'invalid'), ValidateResult(
@ -62,4 +63,4 @@ def test_valid_oauth(app):
oauth_token = list(model.oauth.list_access_tokens_for_user(user))[0]
token = _token(OAUTH_TOKEN_USERNAME, oauth_token.access_token)
result = validate_basic_auth(token)
assert result == ValidateResult(AuthKind.oauth, oauthtoken=oauth_token)
assert result == ValidateResult(AuthKind.basic, oauthtoken=oauth_token)

View file

@ -0,0 +1,30 @@
from auth.credentials import ACCESS_TOKEN_USERNAME, OAUTH_TOKEN_USERNAME, validate_credentials
from auth.validateresult import AuthKind, ValidateResult
from data import model
from test.fixtures import *
def test_valid_user(app):
result = validate_credentials('devtable', 'password')
assert result == ValidateResult(AuthKind.credentials, user=model.user.get_user('devtable'))
def test_valid_robot(app):
robot, password = model.user.create_robot('somerobot', model.user.get_user('devtable'))
result = validate_credentials(robot.username, password)
assert result == ValidateResult(AuthKind.credentials, robot=robot)
def test_valid_token(app):
access_token = model.token.create_delegate_token('devtable', 'simple', 'sometoken')
result = validate_credentials(ACCESS_TOKEN_USERNAME, access_token.code)
assert result == ValidateResult(AuthKind.credentials, token=access_token)
def test_valid_oauth(app):
user = model.user.get_user('devtable')
oauth_token = list(model.oauth.list_access_tokens_for_user(user))[0]
result = validate_credentials(OAUTH_TOKEN_USERNAME, oauth_token.access_token)
assert result == ValidateResult(AuthKind.oauth, oauthtoken=oauth_token)
def test_invalid_user(app):
result = validate_credentials('devtable', 'somepassword')
assert result == ValidateResult(AuthKind.credentials,
error_message='Invalid Username or Password')

View file

@ -13,6 +13,7 @@ class AuthKind(Enum):
basic = 'basic'
oauth = 'oauth'
signed_grant = 'signed_grant'
credentials = 'credentials'
class ValidateResult(object):
@ -56,6 +57,11 @@ class ValidateResult(object):
if self.identity:
identity_changed.send(app, identity=self.identity)
def with_kind(self, kind):
""" Returns a copy of this result, but with the kind replaced. """
return ValidateResult(kind, self.missing, self.user, self.token, self.oauthtoken, self.robot,
self.signed_data, self.error_message)
@property
def authed_user(self):
""" Returns the authenticated user, whether directly, or via an OAuth token. """