""" Conduct searches against all registry context. """ from endpoints.api import (ApiResource, parse_args, query_param, truthy_bool, nickname, resource, require_scope, path_param, internal_only, Unauthorized, InvalidRequest, show_if) from data import model from auth.permissions import (OrganizationMemberPermission, ReadRepositoryPermission, UserAdminPermission, AdministerOrganizationPermission, ReadRepositoryPermission) from auth.auth_context import get_authenticated_user from auth import scopes from app import avatar, authentication from flask import abort from operator import itemgetter from stringscore import liquidmetal from util.names import parse_robot_username import anunidecode # Don't listen to pylint's lies. This import is required. import math @resource('/v1/entities/link/') @internal_only class LinkExternalEntity(ApiResource): """ Resource for linking external entities to internal users. """ @nickname('linkExternalUser') def post(self, username): if not authentication.federated_service: abort(404) # Only allowed if there is a logged in user. if not get_authenticated_user(): raise Unauthorized() # Try to link the user with the given *external* username, to an internal record. (user, err_msg) = authentication.link_user(username) if user is None: raise InvalidRequest(err_msg, payload={'username': username}) return { 'entity': { 'name': user.username, 'kind': 'user', 'is_robot': False, 'avatar': avatar.get_data_for_user(user) } } @resource('/v1/entities/') class EntitySearch(ApiResource): """ Resource for searching entities. """ @path_param('prefix', 'The prefix of the entities being looked up') @parse_args() @query_param('namespace', 'Namespace to use when querying for org entities.', type=str, default='') @query_param('includeTeams', 'Whether to include team names.', type=truthy_bool, default=False) @query_param('includeOrgs', 'Whether to include orgs names.', type=truthy_bool, default=False) @nickname('getMatchingEntities') def get(self, prefix, parsed_args): """ Get a list of entities that match the specified prefix. """ # Ensure we don't have any unicode characters in the search, as it breaks the search. Nothing # being searched can have unicode in it anyway, so this is a safe operation. prefix = prefix.encode('unidecode', 'ignore').replace(' ', '').lower() teams = [] org_data = [] namespace_name = parsed_args['namespace'] robot_namespace = None organization = None try: organization = model.organization.get_organization(namespace_name) # namespace name was an org permission = OrganizationMemberPermission(namespace_name) if permission.can(): robot_namespace = namespace_name if parsed_args['includeTeams']: teams = model.team.get_matching_teams(prefix, organization) if (parsed_args['includeOrgs'] and AdministerOrganizationPermission(namespace_name) and namespace_name.startswith(prefix)): org_data = [{ 'name': namespace_name, 'kind': 'org', 'is_org_member': True, 'avatar': avatar.get_data_for_org(organization), }] except model.organization.InvalidOrganizationException: # namespace name was a user user = get_authenticated_user() if user and user.username == namespace_name: # Check if there is admin user permissions (login only) admin_permission = UserAdminPermission(user.username) if admin_permission.can(): robot_namespace = namespace_name # Lookup users in the database for the prefix query. users = model.user.get_matching_users(prefix, robot_namespace, organization, limit=10) # Lookup users via the user system for the prefix query. We'll filter out any users that # already exist in the database. external_users, federated_id, _ = authentication.query_users(prefix, limit=10) filtered_external_users = [] if external_users and federated_id is not None: users = list(users) user_ids = [user.id for user in users] # Filter the users if any are already found via the database. We do so by looking up all # the found users in the federated user system. federated_query = model.user.get_federated_logins(user_ids, federated_id) found = {result.service_ident for result in federated_query} filtered_external_users = [user for user in external_users if not user.username in found] def entity_team_view(team): result = { 'name': team.name, 'kind': 'team', 'is_org_member': True, 'avatar': avatar.get_data_for_team(team) } return result def user_view(user): user_json = { 'name': user.username, 'kind': 'user', 'is_robot': user.robot, 'avatar': avatar.get_data_for_user(user) } if organization is not None: user_json['is_org_member'] = user.robot or user.is_org_member return user_json def external_view(user): result = { 'name': user.username, 'kind': 'external', 'title': user.email or '', 'avatar': avatar.get_data_for_external_user(user) } return result team_data = [entity_team_view(team) for team in teams] user_data = [user_view(user) for user in users] external_data = [external_view(user) for user in filtered_external_users] return { 'results': team_data + user_data + org_data + external_data } def search_entity_view(username, entity, get_short_name=None): kind = 'user' avatar_data = avatar.get_data_for_user(entity) href = '/user/' + entity.username if entity.organization: kind = 'organization' avatar_data = avatar.get_data_for_org(entity) href = '/organization/' + entity.username elif entity.robot: parts = parse_robot_username(entity.username) if parts[0] == username: href = '/user/' + username + '?tab=robots&showRobot=' + entity.username else: href = '/organization/' + parts[0] + '?tab=robots&showRobot=' + entity.username kind = 'robot' avatar_data = None data = { 'kind': kind, 'avatar': avatar_data, 'name': entity.username, 'score': 1, 'href': href } if get_short_name: data['short_name'] = get_short_name(entity.username) return data def conduct_team_search(username, query, encountered_teams, results): """ Finds the matching teams where the user is a member. """ matching_teams = model.team.get_matching_user_teams(query, get_authenticated_user(), limit=5) for team in matching_teams: if team.id in encountered_teams: continue encountered_teams.add(team.id) results.append({ 'kind': 'team', 'name': team.name, 'organization': search_entity_view(username, team.organization), 'avatar': avatar.get_data_for_team(team), 'score': 2, 'href': '/organization/' + team.organization.username + '/teams/' + team.name }) def conduct_admined_team_search(username, query, encountered_teams, results): """ Finds matching teams in orgs admined by the user. """ matching_teams = model.team.get_matching_admined_teams(query, get_authenticated_user(), limit=5) for team in matching_teams: if team.id in encountered_teams: continue encountered_teams.add(team.id) results.append({ 'kind': 'team', 'name': team.name, 'organization': search_entity_view(username, team.organization), 'avatar': avatar.get_data_for_team(team), 'score': 2, 'href': '/organization/' + team.organization.username + '/teams/' + team.name }) def conduct_repo_search(username, query, results): """ Finds matching repositories. """ def can_read(repo): if repo.is_public: return True return ReadRepositoryPermission(repo.namespace_user.username, repo.name).can() only_public = username is None matching_repos = model.repository.get_sorted_matching_repositories(query, only_public, can_read, limit=5) for repo in matching_repos: repo_score = math.log(repo.count or 1, 10) or 1 # If the repository is under the user's namespace, give it 20% more weight. namespace = repo.namespace_user.username if OrganizationMemberPermission(namespace).can() or namespace == username: repo_score = repo_score * 1.2 results.append({ 'kind': 'repository', 'namespace': search_entity_view(username, repo.namespace_user), 'name': repo.name, 'description': repo.description, 'is_public': repo.is_public, 'score': repo_score, 'href': '/repository/' + repo.namespace_user.username + '/' + repo.name }) def conduct_namespace_search(username, query, results): """ Finds matching users and organizations. """ matching_entities = model.user.get_matching_user_namespaces(query, username, limit=5) for entity in matching_entities: results.append(search_entity_view(username, entity)) def conduct_robot_search(username, query, results): """ Finds matching robot accounts. """ def get_short_name(name): return parse_robot_username(name)[1] matching_robots = model.user.get_matching_robots(query, username, limit=5) for robot in matching_robots: results.append(search_entity_view(username, robot, get_short_name)) @resource('/v1/find/all') class ConductSearch(ApiResource): """ Resource for finding users, repositories, teams, etc. """ @parse_args() @query_param('query', 'The search query.', type=str, default='') @require_scope(scopes.READ_REPO) @nickname('conductSearch') def get(self, parsed_args): """ Get a list of entities and resources that match the specified query. """ query = parsed_args['query'] if not query: return {'results': []} username = None results = [] if get_authenticated_user(): username = get_authenticated_user().username # Search for teams. encountered_teams = set() conduct_team_search(username, query, encountered_teams, results) conduct_admined_team_search(username, query, encountered_teams, results) # Search for robot accounts. conduct_robot_search(username, query, results) # Search for repos. conduct_repo_search(username, query, results) # Search for users and orgs. conduct_namespace_search(username, query, results) # Modify the results' scores via how close the query term is to each result's name. for result in results: name = result.get('short_name', result['name']) lm_score = liquidmetal.score(name, query) or 0.5 result['score'] = result['score'] * lm_score return {'results': sorted(results, key=itemgetter('score'), reverse=True)}