This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/oauth/base.py

194 lines
6.8 KiB
Python

import copy
import logging
import urllib
import urlparse
from abc import ABCMeta, abstractmethod
from six import add_metaclass
from util.config import URLSchemeAndHostname
logger = logging.getLogger(__name__)
class OAuthEndpoint(object):
def __init__(self, base_url, params=None):
self.base_url = base_url
self.params = params or {}
def with_param(self, name, value):
params_copy = copy.copy(self.params)
params_copy[name] = value
return OAuthEndpoint(self.base_url, params_copy)
def with_params(self, parameters):
params_copy = copy.copy(self.params)
params_copy.update(parameters)
return OAuthEndpoint(self.base_url, params_copy)
def to_url(self):
(scheme, netloc, path, _, fragment) = urlparse.urlsplit(self.base_url)
updated_query = urllib.urlencode(self.params)
return urlparse.urlunsplit((scheme, netloc, path, updated_query, fragment))
class OAuthExchangeCodeException(Exception):
""" Exception raised if a code exchange fails. """
pass
class OAuthGetUserInfoException(Exception):
""" Exception raised if a call to get user information fails. """
pass
@add_metaclass(ABCMeta)
class OAuthService(object):
""" A base class for defining an external service, exposed via OAuth. """
def __init__(self, config, key_name):
self.key_name = key_name
self.config = config.get(key_name) or {}
@abstractmethod
def service_id(self):
""" The internal ID for this service. Must match the URL portion for the service, e.g. `github`
"""
pass
@abstractmethod
def service_name(self):
""" The user-readable name for the service, e.g. `GitHub`"""
pass
@abstractmethod
def token_endpoint(self):
""" Returns the endpoint at which the OAuth code can be exchanged for a token. """
pass
@abstractmethod
def user_endpoint(self):
""" Returns the endpoint at which user information can be looked up. """
pass
@abstractmethod
def authorize_endpoint(self):
""" Returns the for authorization of the OAuth service. """
pass
@abstractmethod
def validate_client_id_and_secret(self, http_client, url_scheme_and_hostname):
""" Performs validation of the client ID and secret, raising an exception on failure. """
pass
def requires_form_encoding(self):
""" Returns True if form encoding is necessary for the exchange_code_for_token call. """
return False
def client_id(self):
return self.config.get('CLIENT_ID')
def client_secret(self):
return self.config.get('CLIENT_SECRET')
def login_binding_field(self):
""" Returns the name of the field (`username` or `email`) used for auto binding an external
login service account to an *internal* login service account. For example, if the external
login service is GitHub and the internal login service is LDAP, a value of `email` here
will cause login-with-Github to conduct a search (via email) in LDAP for a user, an auto
bind the external and internal users together. May return None, in which case no binding
is performing, and login with this external account will simply create a new account in the
database.
"""
return self.config.get('LOGIN_BINDING_FIELD', None)
def get_auth_url(self, url_scheme_and_hostname, redirect_suffix, csrf_token, scopes):
""" Retrieves the authorization URL for this login service. """
redirect_uri = '%s/oauth2/%s/callback%s' % (url_scheme_and_hostname.get_url(),
self.service_id(),
redirect_suffix)
params = {
'client_id': self.client_id(),
'redirect_uri': redirect_uri,
'scope': ' '.join(scopes),
'state': csrf_token,
}
return self.authorize_endpoint().with_params(params).to_url()
def get_redirect_uri(self, url_scheme_and_hostname, redirect_suffix=''):
return '%s://%s/oauth2/%s/callback%s' % (url_scheme_and_hostname.url_scheme,
url_scheme_and_hostname.hostname,
self.service_id(),
redirect_suffix)
def get_user_info(self, http_client, token):
token_param = {
'alt': 'json',
}
headers = {
'Authorization': 'Bearer %s' % token,
}
got_user = http_client.get(self.user_endpoint().to_url(), params=token_param, headers=headers)
if got_user.status_code // 100 != 2:
raise OAuthGetUserInfoException('Non-2XX response code for user_info call: %s' %
got_user.status_code)
user_info = got_user.json()
if user_info is None:
raise OAuthGetUserInfoException()
return user_info
def exchange_code_for_token(self, app_config, http_client, code, form_encode=False,
redirect_suffix='', client_auth=False):
""" Exchanges an OAuth access code for the associated OAuth token. """
json_data = self.exchange_code(app_config, http_client, code, form_encode, redirect_suffix,
client_auth)
access_token = json_data.get('access_token', None)
if access_token is None:
logger.debug('Got successful get_access_token response with missing token: %s', json_data)
raise OAuthExchangeCodeException('Missing `access_token` in OAuth response')
return access_token
def exchange_code(self, app_config, http_client, code, form_encode=False, redirect_suffix='',
client_auth=False):
""" Exchanges an OAuth access code for associated OAuth token and other data. """
url_scheme_and_hostname = URLSchemeAndHostname.from_app_config(app_config)
payload = {
'code': code,
'grant_type': 'authorization_code',
'redirect_uri': self.get_redirect_uri(url_scheme_and_hostname, redirect_suffix)
}
headers = {
'Accept': 'application/json'
}
auth = None
if client_auth:
auth = (self.client_id(), self.client_secret())
else:
payload['client_id'] = self.client_id()
payload['client_secret'] = self.client_secret()
token_url = self.token_endpoint().to_url()
if form_encode:
get_access_token = http_client.post(token_url, data=payload, headers=headers, auth=auth)
else:
get_access_token = http_client.post(token_url, params=payload, headers=headers, auth=auth)
if get_access_token.status_code // 100 != 2:
logger.debug('Got get_access_token response %s', get_access_token.text)
raise OAuthExchangeCodeException('Got non-2XX response for code exchange: %s' %
get_access_token.status_code)
json_data = get_access_token.json()
if not json_data:
raise OAuthExchangeCodeException('Got non-JSON response for code exchange')
if 'error' in json_data:
raise OAuthExchangeCodeException(json_data.get('error_description', json_data['error']))
return json_data