import urlparse class OAuthConfig(object): def __init__(self, config, key_name): self.key_name = key_name self.config = config.get(key_name) or {} def service_name(self): raise NotImplementedError def token_endpoint(self): raise NotImplementedError def user_endpoint(self): raise NotImplementedError def login_endpoint(self): raise NotImplementedError def validate_client_id_and_secret(self, http_client): raise NotImplementedError def client_id(self): return self.config.get('CLIENT_ID') def client_secret(self): return self.config.get('CLIENT_SECRET') def _get_url(self, endpoint, *args): for arg in args: endpoint = urlparse.urljoin(endpoint, arg) return endpoint class GithubOAuthConfig(OAuthConfig): def __init__(self, config, key_name): super(GithubOAuthConfig, self).__init__(config, key_name) def service_name(self): return 'GitHub' def _endpoint(self): endpoint = self.config.get('GITHUB_ENDPOINT', 'https://github.com') if not endpoint.endswith('/'): endpoint = endpoint + '/' return endpoint def authorize_endpoint(self): return self._get_url(self._endpoint(), '/login/oauth/authorize') + '?' def token_endpoint(self): return self._get_url(self._endpoint(), '/login/oauth/access_token') def _api_endpoint(self): return self.config.get('API_ENDPOINT', self._get_url(self._endpoint(), '/api/v3/')) def api_endpoint(self): return self._api_endpoint()[0:-1] def user_endpoint(self): api_endpoint = self._api_endpoint() return self._get_url(api_endpoint, 'user') def email_endpoint(self): api_endpoint = self._api_endpoint() return self._get_url(api_endpoint, 'user/emails') def validate_client_id_and_secret(self, http_client): # First: Verify that the github endpoint is actually Github by checking for the # X-GitHub-Request-Id here. api_endpoint = self._api_endpoint() result = http_client.get(api_endpoint, auth=(self.client_id(), self.client_secret()), timeout=5) if not 'X-GitHub-Request-Id' in result.headers: raise Exception('Endpoint is not a Github (Enterprise) installation') # Next: Verify the client ID and secret. # Note: The following code is a hack until such time as Github officially adds an API endpoint # for verifying a {client_id, client_secret} pair. That being said, this hack was given to us # *by a Github Engineer*, so I think it is okay for the time being :) # # TODO(jschorr): Replace with the real API call once added. # # Hitting the endpoint applications/{client_id}/tokens/foo will result in the following # behavior IF the client_id is given as the HTTP username and the client_secret as the HTTP # password: # - If the {client_id, client_secret} pair is invalid in some way, we get a 401 error. # - If the pair is valid, then we get a 404 because the 'foo' token does not exists. validate_endpoint = self._get_url(api_endpoint, 'applications/%s/tokens/foo' % self.client_id()) result = http_client.get(validate_endpoint, auth=(self.client_id(), self.client_secret()), timeout=5) return result.status_code == 404 def get_public_config(self): return { 'CLIENT_ID': self.client_id(), 'AUTHORIZE_ENDPOINT': self.authorize_endpoint(), 'GITHUB_ENDPOINT': self._endpoint() } class GoogleOAuthConfig(OAuthConfig): def __init__(self, config, key_name): super(GoogleOAuthConfig, self).__init__(config, key_name) def service_name(self): return 'Google' def authorize_endpoint(self): return 'https://accounts.google.com/o/oauth2/auth?response_type=code&' def token_endpoint(self): return 'https://accounts.google.com/o/oauth2/token' def user_endpoint(self): return 'https://www.googleapis.com/oauth2/v1/userinfo' def validate_client_id_and_secret(self, http_client): # To verify the Google client ID and secret, we hit the # https://www.googleapis.com/oauth2/v3/token endpoint with an invalid request. If the client # ID or secret are invalid, we get returned a 403 Unauthorized. Otherwise, we get returned # another response code. url = 'https://www.googleapis.com/oauth2/v3/token' data = { 'code': 'fakecode', 'client_id': self.client_id(), 'client_secret': self.client_secret(), 'grant_type': 'authorization_code', 'redirect_uri': 'http://example.com' } result = http_client.post(url, data=data, timeout=5) return result.status_code != 401 def get_public_config(self): return { 'CLIENT_ID': self.client_id(), 'AUTHORIZE_ENDPOINT': self.authorize_endpoint() }