from util import slash_join from util.oauth.base import OAuthService class GithubOAuthService(OAuthService): def __init__(self, config, key_name): super(GithubOAuthService, self).__init__(config, key_name) def service_name(self): return 'GitHub' def allowed_organizations(self): if not self.config.get('ORG_RESTRICT', False): return None allowed = self.config.get('ALLOWED_ORGANIZATIONS', None) if allowed is None: return None return [org.lower() for org in allowed] def get_public_url(self, suffix): return slash_join(self._endpoint(), suffix) def _endpoint(self): return self.config.get('GITHUB_ENDPOINT', 'https://github.com') def is_enterprise(self): return self._endpoint().find('.github.com') < 0 def authorize_endpoint(self): return slash_join(self._endpoint(), '/login/oauth/authorize') + '?' def token_endpoint(self): return slash_join(self._endpoint(), '/login/oauth/access_token') def _api_endpoint(self): return self.config.get('API_ENDPOINT', slash_join(self._endpoint(), '/api/v3/')) def api_endpoint(self): endpoint = self._api_endpoint() if endpoint.endswith('/'): return endpoint[0:-1] return endpoint def user_endpoint(self): return slash_join(self._api_endpoint(), 'user') def email_endpoint(self): return slash_join(self._api_endpoint(), 'user/emails') def orgs_endpoint(self): return slash_join(self._api_endpoint(), 'user/orgs') def validate_client_id_and_secret(self, http_client, app_config): # First: Verify that the github endpoint is actually Github by checking for the # X-GitHub-Request-Id here. api_endpoint = self._api_endpoint() result = http_client.get(api_endpoint, auth=(self.client_id(), self.client_secret()), timeout=5) if not 'X-GitHub-Request-Id' in result.headers: raise Exception('Endpoint is not a Github (Enterprise) installation') # Next: Verify the client ID and secret. # Note: The following code is a hack until such time as Github officially adds an API endpoint # for verifying a {client_id, client_secret} pair. That being said, this hack was given to us # *by a Github Engineer*, so I think it is okay for the time being :) # # TODO(jschorr): Replace with the real API call once added. # # Hitting the endpoint applications/{client_id}/tokens/foo will result in the following # behavior IF the client_id is given as the HTTP username and the client_secret as the HTTP # password: # - If the {client_id, client_secret} pair is invalid in some way, we get a 401 error. # - If the pair is valid, then we get a 404 because the 'foo' token does not exists. validate_endpoint = slash_join(api_endpoint, 'applications/%s/tokens/foo' % self.client_id()) result = http_client.get(validate_endpoint, auth=(self.client_id(), self.client_secret()), timeout=5) return result.status_code == 404 def validate_organization(self, organization_id, http_client): org_endpoint = slash_join(self._api_endpoint(), 'orgs/%s' % organization_id.lower()) result = http_client.get(org_endpoint, headers={'Accept': 'application/vnd.github.moondragon+json'}, timeout=5) return result.status_code == 200 def get_public_config(self): return { 'CLIENT_ID': self.client_id(), 'AUTHORIZE_ENDPOINT': self.authorize_endpoint(), 'GITHUB_ENDPOINT': self._endpoint(), 'ORG_RESTRICT': self.config.get('ORG_RESTRICT', False) } class GoogleOAuthService(OAuthService): def __init__(self, config, key_name): super(GoogleOAuthService, self).__init__(config, key_name) def service_name(self): return 'Google' def authorize_endpoint(self): return 'https://accounts.google.com/o/oauth2/auth?response_type=code&' def token_endpoint(self): return 'https://accounts.google.com/o/oauth2/token' def user_endpoint(self): return 'https://www.googleapis.com/oauth2/v1/userinfo' def validate_client_id_and_secret(self, http_client, app_config): # To verify the Google client ID and secret, we hit the # https://www.googleapis.com/oauth2/v3/token endpoint with an invalid request. If the client # ID or secret are invalid, we get returned a 403 Unauthorized. Otherwise, we get returned # another response code. url = 'https://www.googleapis.com/oauth2/v3/token' data = { 'code': 'fakecode', 'client_id': self.client_id(), 'client_secret': self.client_secret(), 'grant_type': 'authorization_code', 'redirect_uri': 'http://example.com' } result = http_client.post(url, data=data, timeout=5) return result.status_code != 401 def get_public_config(self): return { 'CLIENT_ID': self.client_id(), 'AUTHORIZE_ENDPOINT': self.authorize_endpoint() } class GitLabOAuthService(OAuthService): def __init__(self, config, key_name): super(GitLabOAuthService, self).__init__(config, key_name) def _endpoint(self): return self.config.get('GITLAB_ENDPOINT', 'https://gitlab.com') def user_endpoint(self): raise NotImplementedError def api_endpoint(self): return self._endpoint() def get_public_url(self, suffix): return slash_join(self._endpoint(), suffix) def service_name(self): return 'GitLab' def authorize_endpoint(self): return slash_join(self._endpoint(), '/oauth/authorize') def token_endpoint(self): return slash_join(self._endpoint(), '/oauth/token') def validate_client_id_and_secret(self, http_client, app_config): url = self.token_endpoint() redirect_uri = self.get_redirect_uri(app_config, redirect_suffix='trigger') data = { 'code': 'fakecode', 'client_id': self.client_id(), 'client_secret': self.client_secret(), 'grant_type': 'authorization_code', 'redirect_uri': redirect_uri } # We validate by checking the error code we receive from this call. result = http_client.post(url, data=data, timeout=5) value = result.json() if not value: return False return value.get('error', '') != 'invalid_client' def get_public_config(self): return { 'CLIENT_ID': self.client_id(), 'AUTHORIZE_ENDPOINT': self.authorize_endpoint(), 'GITLAB_ENDPOINT': self._endpoint(), }