This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/model/oauth.py

185 lines
6.5 KiB
Python
Raw Normal View History

2014-03-12 16:37:06 +00:00
from datetime import datetime, timedelta
from oauth2lib.provider import AuthorizationProvider
from oauth2lib import utils
from data.database import OAuthApplication, OAuthAuthorizationCode, OAuthAccessToken, User
2014-03-12 16:37:06 +00:00
from auth import scopes
class DatabaseAuthorizationProvider(AuthorizationProvider):
def get_authorized_user(self):
raise NotImplementedError('Subclasses must fill in the ability to get the authorized_user.')
def validate_client_id(self, client_id):
2014-03-14 22:57:28 +00:00
return self.get_application_for_client_id(client_id) is not None
def get_application_for_client_id(self, client_id):
2014-03-12 16:37:06 +00:00
try:
2014-03-14 22:57:28 +00:00
return OAuthApplication.get(client_id=client_id)
2014-03-12 16:37:06 +00:00
except OAuthApplication.DoesNotExist:
2014-03-14 22:57:28 +00:00
return None
2014-03-12 16:37:06 +00:00
def validate_client_secret(self, client_id, client_secret):
try:
OAuthApplication.get(client_id=client_id, client_secret=client_secret)
return True
except OAuthApplication.DoesNotExist:
return False
def validate_redirect_uri(self, client_id, redirect_uri):
try:
app = OAuthApplication.get(client_id=client_id)
if app.redirect_uri and redirect_uri.startswith(app.redirect_uri):
return True
return False
except OAuthApplication.DoesNotExist:
return False
2014-03-14 22:57:28 +00:00
def validate_scope(self, client_id, scopes_string):
return scopes.validate_scope_string(scopes_string)
2014-03-12 16:37:06 +00:00
def validate_access(self):
return self.get_authorized_user() is not None
2014-03-14 22:57:28 +00:00
def lookup_access_token(self, client_id):
try:
found = (OAuthAccessToken
.select()
.join(OAuthApplication)
.where(OAuthApplication.client_id == client_id)
.get())
return found
except OAuthAccessToken.DoesNotExist:
return None
def validate_has_scopes(self, client_id, scope):
access_token = self.lookup_access_token(client_id)
if not access_token:
return False
# Make sure the token is not expired.
if access_token.expires_at <= datetime.now():
return False
# Make sure the token contains the given scopes (at least).
return scopes.is_subset_string(access_token.scope, scope)
2014-03-12 16:37:06 +00:00
def from_authorization_code(self, client_id, code, scope):
try:
found = (OAuthAuthorizationCode
.select()
.join(OAuthApplication)
.where(OAuthApplication.client_id == client_id, OAuthAuthorizationCode.code == code,
OAuthAuthorizationCode.scope == scope)
.get())
return found.data
except OAuthAuthorizationCode.DoesNotExist:
return None
def from_refresh_token(self, client_id, refresh_token, scope):
try:
found = (OAuthAccessToken
.select()
.join(OAuthApplication)
.where(OAuthApplication.client_id == client_id,
OAuthAccessToken.refresh_token == refresh_token,
OAuthAccessToken.scope == scope)
.get())
return found.data
except OAuthAccessToken.DoesNotExist:
return None
def persist_authorization_code(self, client_id, code, scope):
app = OAuthApplication.get(client_id=client_id)
OAuthAuthorizationCode.create(application=app, code=code, scope=scope)
def persist_token_information(self, client_id, scope, access_token, token_type, expires_in,
refresh_token, data):
app = OAuthApplication.get(client_id=client_id)
user = self.get_authorized_user()
expires_at = datetime.now() + timedelta(seconds=expires_in)
OAuthAccessToken.create(application=app, authorized_user=user, scope=scope,
access_token=access_token, token_type=token_type,
expires_at=expires_at, refresh_token=refresh_token, data=data)
def discard_authorization_code(self, client_id, code):
found = (AuthorizationCode
.select()
.join(OAuthApplication)
.where(OAuthApplication.client_id == client_id, OAuthAuthorizationCode.code == code)
.get())
found.delete_instance()
def discard_refresh_token(self, client_id, refresh_token):
found = (AccessToken
.select()
.join(OAuthApplication)
.where(OAuthApplication.client_id == client_id,
OAuthAccessToken.refresh_token == refresh_token)
.get())
found.delete_instance()
def get_token_response(self, response_type, client_id, redirect_uri, **params):
# Ensure proper response_type
if response_type != 'token':
err = 'unsupported_response_type'
return self._make_redirect_error_response(redirect_uri, err)
# Check redirect URI
is_valid_redirect_uri = self.validate_redirect_uri(client_id, redirect_uri)
if not is_valid_redirect_uri:
return self._invalid_redirect_uri_response()
# Check conditions
is_valid_client_id = self.validate_client_id(client_id)
is_valid_access = self.validate_access()
scope = params.get('scope', '')
2014-03-14 22:57:28 +00:00
are_valid_scopes = self.validate_scope(client_id, scope)
2014-03-12 16:37:06 +00:00
# Return proper error responses on invalid conditions
if not is_valid_client_id:
err = 'unauthorized_client'
return self._make_redirect_error_response(redirect_uri, err)
if not is_valid_access:
err = 'access_denied'
return self._make_redirect_error_response(redirect_uri, err)
2014-03-14 22:57:28 +00:00
if not are_valid_scopes:
2014-03-12 16:37:06 +00:00
err = 'invalid_scope'
return self._make_redirect_error_response(redirect_uri, err)
access_token = self.generate_access_token()
token_type = self.token_type
expires_in = self.token_expires_in
refresh_token = None # No refresh token for this kind of flow
self.persist_token_information(client_id=client_id, scope=scope, access_token=access_token,
token_type=token_type, expires_in=expires_in,
refresh_token=refresh_token, data='')
url = utils.build_url(redirect_uri, params)
url += '#access_token=%s&token_type=%s&expires_in=%s' % (access_token, token_type, expires_in)
return self._make_response(headers={'Location': url}, status_code=302)
2014-03-14 22:57:28 +00:00
def create_application(org, name, application_uri, redirect_uri, **kwargs):
return OAuthApplication.create(organization=org, name=name, application_uri=application_uri, redirect_uri=redirect_uri, **kwargs)
def validate_access_token(access_token):
try:
found = (OAuthAccessToken
.select(OAuthAccessToken, User)
.join(User)
.where(OAuthAccessToken.access_token == access_token)
.get())
return found
except OAuthAccessToken.DoesNotExist:
2014-03-14 22:57:28 +00:00
return None
def get_application_for_client_id(client_id):
try:
return OAuthApplication.get(client_id=client_id)
except OAuthApplication.DoesNotExist:
return None