Add support for direct granting of OAuth tokens and add tests

This allows a client (when authorized in a whitelist) to send direct credentials via a Basic auth header and therefore bypass the OAuth approval UI for that user.
This commit is contained in:
Joseph Schorr 2016-05-13 14:52:22 -04:00
parent f957fbe96d
commit 7933aecf25
5 changed files with 216 additions and 18 deletions

View file

@ -3,6 +3,7 @@
import json as py_json
import time
import unittest
import base64
from urllib import urlencode
from urlparse import urlparse, urlunparse, parse_qs
@ -86,6 +87,17 @@ class EndpointTestCase(unittest.TestCase):
self.assertEquals(rv.status_code, expected_code)
return rv.data
def postResponse(self, resource_name, headers=None, form=None, with_csrf=True, expected_code=200, **kwargs):
headers = headers or {}
form = form or {}
url = url_for(resource_name, **kwargs)
if with_csrf:
url = EndpointTestCase._add_csrf(url)
rv = self.app.post(url, headers=headers, data=form)
self.assertEquals(rv.status_code, expected_code)
return rv
def login(self, username, password):
rv = self.app.post(EndpointTestCase._add_csrf(api.url_for(Signin)),
data=py_json.dumps(dict(username=username, password=password)),
@ -191,6 +203,136 @@ class WebEndpointTestCase(EndpointTestCase):
self.getResponse('web.redirect_to_namespace', namespace='buynlarge', expected_code=302)
class OAuthTestCase(EndpointTestCase):
def test_authorize_nologin(self):
form = {
'client_id': 'someclient',
'redirect_uri': 'http://localhost:5000/foobar',
'scope': 'user:admin',
}
self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=401)
def test_authorize_invalidclient(self):
self.login('devtable', 'password')
form = {
'client_id': 'someclient',
'redirect_uri': 'http://localhost:5000/foobar',
'scope': 'user:admin',
}
resp = self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=302)
self.assertEquals('http://localhost:5000/foobar?error=unauthorized_client', resp.headers['Location'])
def test_authorize_invalidscope(self):
self.login('devtable', 'password')
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'invalid:scope',
}
resp = self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=302)
self.assertEquals('http://localhost:8000/o2c.html?error=invalid_scope', resp.headers['Location'])
def test_authorize_invalidredirecturi(self):
self.login('devtable', 'password')
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://some/invalid/uri',
'scope': 'user:admin',
}
self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=400)
def test_authorize_success(self):
self.login('devtable', 'password')
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
resp = self.postResponse('web.authorize_application', form=form, with_csrf=True, expected_code=302)
self.assertTrue('access_token=' in resp.headers['Location'])
def test_authorize_nocsrf(self):
self.login('devtable', 'password')
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
self.postResponse('web.authorize_application', form=form, with_csrf=False, expected_code=403)
def test_authorize_nocsrf_withinvalidheader(self):
self.login('devtable', 'password')
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
headers = dict(authorization='Some random header')
self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=401)
def test_authorize_nocsrf_withbadheader(self):
self.login('devtable', 'password')
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
headers = dict(authorization='Basic ' + base64.b64encode('devtable:invalidpassword'))
self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=401)
def test_authorize_nocsrf_correctheader(self):
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
# Try without the client id being in the whitelist.
headers = dict(authorization='Basic ' + base64.b64encode('devtable:password'))
self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=403)
# Add the client ID to the whitelist and try again.
app.config['DIRECT_OAUTH_CLIENTID_WHITELIST'] = ['deadbeef']
headers = dict(authorization='Basic ' + base64.b64encode('devtable:password'))
resp = self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=True, expected_code=302)
self.assertTrue('access_token=' in resp.headers['Location'])
def test_authorize_nocsrf_ratelimiting(self):
# Note: Defined in initdb.py
form = {
'client_id': 'deadbeef',
'redirect_uri': 'http://localhost:8000/o2c.html',
'scope': 'user:admin',
}
# Try without the client id being in the whitelist a few times, making sure we eventually get rate limited.
headers = dict(authorization='Basic ' + base64.b64encode('devtable:invalidpassword'))
self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=401)
self.postResponse('web.authorize_application', headers=headers, form=form, with_csrf=False, expected_code=429)
class KeyServerTestCase(EndpointTestCase):
def _get_test_jwt_payload(self):
return {