This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/users/federated.py

118 lines
4.5 KiB
Python

import logging
import features
from collections import namedtuple
from data import model
from util.validation import generate_valid_usernames
logger = logging.getLogger(__name__)
UserInformation = namedtuple('UserInformation', ['username', 'email', 'id'])
class FederatedUsers(object):
""" Base class for all federated users systems. """
def __init__(self, federated_service, requires_email):
self._federated_service = federated_service
self._requires_email = requires_email
@property
def federated_service(self):
return self._federated_service
def get_user(self, username_or_email):
""" Retrieves the user with the given username or email, returning a tuple containing
a UserInformation (if success) and the error message (on failure).
"""
raise NotImplementedError
def verify_credentials(self, username_or_email, password):
""" Verifies the given credentials against the backing federated service, returning
a tuple containing a UserInformation (on success) and the error message (on failure).
"""
raise NotImplementedError
def query_users(self, query, limit=20):
""" If implemented, get_user must be implemented as well. """
return (None, 'Not supported')
def link_user(self, username_or_email):
(credentials, err_msg) = self.get_user(username_or_email)
if credentials is None:
return (None, err_msg)
return self._get_federated_user(credentials.username, credentials.email)
def verify_and_link_user(self, username_or_email, password):
""" Verifies the given credentials and, if valid, creates/links a database user to the
associated federated service.
"""
(credentials, err_msg) = self.verify_credentials(username_or_email, password)
if credentials is None:
return (None, err_msg)
return self._get_federated_user(credentials.username, credentials.email)
def confirm_existing_user(self, username, password):
""" Confirms that the given *database* username and service password are valid for the linked
service. This method is used when the federated service's username is not known.
"""
db_user = model.user.get_user(username)
if not db_user:
return (None, 'Invalid user')
federated_login = model.user.lookup_federated_login(db_user, self._federated_service)
if not federated_login:
return (None, 'Invalid user')
(credentials, err_msg) = self.verify_credentials(federated_login.service_ident, password)
if credentials is None:
return (None, err_msg)
return (db_user, None)
def service_metadata(self):
""" Returns a dictionary of extra metadata to present to *superusers* about this auth engine.
For example, LDAP returns the base DN so we can display to the user during sync setup.
"""
return {}
def check_group_lookup_args(self, group_lookup_args):
""" Verifies that the given group lookup args point to a valid group. Returns a tuple consisting
of a boolean status and an error message (if any).
"""
return (False, 'Not supported')
def iterate_group_members(self, group_lookup_args, page_size=None, disable_pagination=False):
""" Returns an iterator over all the members of the group matching the given lookup args
dictionary. The format of the lookup args dictionary is specific to the implementation.
"""
return (None, 'Not supported')
def _get_federated_user(self, username, email):
db_user = model.user.verify_federated_login(self._federated_service, username)
if not db_user:
# We must create the user in our db
valid_username = None
for valid_username in generate_valid_usernames(username):
if model.user.is_username_unique(valid_username):
break
if not valid_username:
logger.error('Unable to pick a username for user: %s', username)
return (None, 'Unable to pick a username. Please report this to your administrator.')
prompts = model.user.get_default_user_prompts(features)
db_user = model.user.create_federated_user(valid_username, email, self._federated_service,
username,
set_password_notification=False,
email_required=self._requires_email,
prompts=prompts)
else:
# Update the db attributes from the federated service.
if email:
db_user.email = email
db_user.save()
return (db_user, None)