This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/util/oauth.py

174 lines
5.5 KiB
Python
Raw Normal View History

import urlparse
import github
class OAuthConfig(object):
def __init__(self, config, key_name):
self.key_name = key_name
self.config = config.get(key_name) or {}
def service_name(self):
raise NotImplementedError
def token_endpoint(self):
raise NotImplementedError
def user_endpoint(self):
raise NotImplementedError
def login_endpoint(self):
raise NotImplementedError
def validate_client_id_and_secret(self, http_client):
raise NotImplementedError
def client_id(self):
return self.config.get('CLIENT_ID')
def client_secret(self):
return self.config.get('CLIENT_SECRET')
def _get_url(self, endpoint, *args):
for arg in args:
endpoint = urlparse.urljoin(endpoint, arg)
return endpoint
class GithubOAuthConfig(OAuthConfig):
def __init__(self, config, key_name):
super(GithubOAuthConfig, self).__init__(config, key_name)
def service_name(self):
return 'GitHub'
def allowed_organizations(self):
if not self.config.get('ORG_RESTRICT', False):
return None
allowed = self.config.get('ALLOWED_ORGANIZATIONS', None)
if allowed is None:
return None
return [org.lower() for org in allowed]
def _endpoint(self):
endpoint = self.config.get('GITHUB_ENDPOINT', 'https://github.com')
if not endpoint.endswith('/'):
endpoint = endpoint + '/'
return endpoint
def is_enterprise(self):
return self._endpoint().find('.github.com') < 0
def authorize_endpoint(self):
return self._get_url(self._endpoint(), '/login/oauth/authorize') + '?'
def token_endpoint(self):
return self._get_url(self._endpoint(), '/login/oauth/access_token')
def _api_endpoint(self):
return self.config.get('API_ENDPOINT', self._get_url(self._endpoint(), '/api/v3/'))
def api_endpoint(self):
return self._api_endpoint()[0:-1]
def user_endpoint(self):
api_endpoint = self._api_endpoint()
return self._get_url(api_endpoint, 'user')
def email_endpoint(self):
api_endpoint = self._api_endpoint()
return self._get_url(api_endpoint, 'user/emails')
def orgs_endpoint(self):
api_endpoint = self._api_endpoint()
return self._get_url(api_endpoint, 'user/orgs')
def validate_client_id_and_secret(self, http_client):
# First: Verify that the github endpoint is actually Github by checking for the
# X-GitHub-Request-Id here.
api_endpoint = self._api_endpoint()
result = http_client.get(api_endpoint, auth=(self.client_id(), self.client_secret()), timeout=5)
if not 'X-GitHub-Request-Id' in result.headers:
raise Exception('Endpoint is not a Github (Enterprise) installation')
# Next: Verify the client ID and secret.
# Note: The following code is a hack until such time as Github officially adds an API endpoint
# for verifying a {client_id, client_secret} pair. That being said, this hack was given to us
# *by a Github Engineer*, so I think it is okay for the time being :)
#
# TODO(jschorr): Replace with the real API call once added.
#
# Hitting the endpoint applications/{client_id}/tokens/foo will result in the following
# behavior IF the client_id is given as the HTTP username and the client_secret as the HTTP
# password:
# - If the {client_id, client_secret} pair is invalid in some way, we get a 401 error.
# - If the pair is valid, then we get a 404 because the 'foo' token does not exists.
validate_endpoint = self._get_url(api_endpoint, 'applications/%s/tokens/foo' % self.client_id())
result = http_client.get(validate_endpoint, auth=(self.client_id(), self.client_secret()),
timeout=5)
return result.status_code == 404
def validate_organization(self, organization_id, http_client):
api_endpoint = self._api_endpoint()
org_endpoint = self._get_url(api_endpoint, 'orgs/%s' % organization_id.lower())
result = http_client.get(org_endpoint,
headers={'Accept': 'application/vnd.github.moondragon+json'},
timeout=5)
return result.status_code == 200
def get_public_config(self):
return {
'CLIENT_ID': self.client_id(),
'AUTHORIZE_ENDPOINT': self.authorize_endpoint(),
2015-03-05 17:07:39 +00:00
'GITHUB_ENDPOINT': self._endpoint(),
'ORG_RESTRICT': self.config.get('ORG_RESTRICT', False)
}
class GoogleOAuthConfig(OAuthConfig):
def __init__(self, config, key_name):
super(GoogleOAuthConfig, self).__init__(config, key_name)
def service_name(self):
return 'Google'
def authorize_endpoint(self):
return 'https://accounts.google.com/o/oauth2/auth?response_type=code&'
def token_endpoint(self):
return 'https://accounts.google.com/o/oauth2/token'
def user_endpoint(self):
return 'https://www.googleapis.com/oauth2/v1/userinfo'
def validate_client_id_and_secret(self, http_client):
# To verify the Google client ID and secret, we hit the
# https://www.googleapis.com/oauth2/v3/token endpoint with an invalid request. If the client
# ID or secret are invalid, we get returned a 403 Unauthorized. Otherwise, we get returned
# another response code.
url = 'https://www.googleapis.com/oauth2/v3/token'
data = {
'code': 'fakecode',
'client_id': self.client_id(),
'client_secret': self.client_secret(),
'grant_type': 'authorization_code',
'redirect_uri': 'http://example.com'
}
result = http_client.post(url, data=data, timeout=5)
return result.status_code != 401
def get_public_config(self):
return {
'CLIENT_ID': self.client_id(),
'AUTHORIZE_ENDPOINT': self.authorize_endpoint()
}