This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/data/users/federated.py

94 lines
3.4 KiB
Python

import logging
from collections import namedtuple
from data import model
from util.validation import generate_valid_usernames
logger = logging.getLogger(__name__)
UserInformation = namedtuple('UserInformation', ['username', 'email', 'id'])
class FederatedUsers(object):
""" Base class for all federated users systems. """
def __init__(self, federated_service):
self._federated_service = federated_service
@property
def federated_service(self):
return self._federated_service
def get_user(self, username_or_email):
""" Retrieves the user with the given username or email, returning a tuple containing
a UserInformation (if success) and the error message (on failure).
"""
raise NotImplementedError
def verify_credentials(self, username_or_email, password):
""" Verifies the given credentials against the backing federated service, returning
a tuple containing a UserInformation (on success) and the error message (on failure).
"""
raise NotImplementedError
def query_users(self, query, limit=20):
""" If implemented, get_user must be implemented as well. """
return (None, 'Not supported')
def _get_federated_user(self, username, email):
db_user = model.user.verify_federated_login(self._federated_service, username)
if not db_user:
# We must create the user in our db
valid_username = None
for valid_username in generate_valid_usernames(username):
if model.user.is_username_unique(valid_username):
break
if not valid_username:
logger.error('Unable to pick a username for user: %s', username)
return (None, 'Unable to pick a username. Please report this to your administrator.')
db_user = model.user.create_federated_user(valid_username, email, self._federated_service,
username,
set_password_notification=False)
else:
# Update the db attributes from the federated service.
db_user.email = email
db_user.save()
return (db_user, None)
def link_user(self, username_or_email):
(credentials, err_msg) = self.get_user(username_or_email)
if credentials is None:
return (None, err_msg)
return self._get_federated_user(credentials.username, credentials.email)
def verify_and_link_user(self, username_or_email, password):
""" Verifies the given credentials and, if valid, creates/links a database user to the
associated federated service.
"""
(credentials, err_msg) = self.verify_credentials(username_or_email, password)
if credentials is None:
return (None, err_msg)
return self._get_federated_user(credentials.username, credentials.email)
def confirm_existing_user(self, username, password):
""" Confirms that the given *database* username and service password are valid for the linked
service. This method is used when the federated service's username is not known.
"""
db_user = model.user.get_user(username)
if not db_user:
return (None, 'Invalid user')
federated_login = model.user.lookup_federated_login(db_user, self._federated_service)
if not federated_login:
return (None, 'Invalid user')
(credentials, err_msg) = self.verify_credentials(federated_login.service_ident, password)
if credentials is None:
return (None, err_msg)
return (db_user, None)