This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/api/search.py

105 lines
3.1 KiB
Python
Raw Normal View History

from endpoints.api import ApiResource, parse_args, query_param, truthy_bool, nickname, resource
from data import model
from auth.permissions import OrganizationMemberPermission, ViewTeamPermission
from auth.auth_context import get_authenticated_user
@resource('/v1/entities/<prefix>')
class EntitySearch(ApiResource):
""" Resource for searching entities. """
@parse_args
@query_param('namespace', 'Namespace to use when querying for org entities.', type=str,
default='')
@query_param('includeTeams', 'Whether to include team names.', type=truthy_bool, default=False)
@nickname('getMatchingEntities')
def get(self, args, prefix):
""" Get a list of entities that match the specified prefix. """
teams = []
namespace_name = args['namespace']
robot_namespace = None
organization = None
try:
organization = model.get_organization(namespace_name)
# namespace name was an org
permission = OrganizationMemberPermission(namespace_name)
if permission.can():
robot_namespace = namespace_name
if args['includeTeams']:
teams = model.get_matching_teams(prefix, organization)
except model.InvalidOrganizationException:
# namespace name was a user
if get_authenticated_user().username == namespace_name:
robot_namespace = namespace_name
users = model.get_matching_users(prefix, robot_namespace, organization)
def entity_team_view(team):
result = {
'name': team.name,
'kind': 'team',
'is_org_member': True
}
return result
def user_view(user):
user_json = {
'name': user.username,
'kind': 'user',
'is_robot': user.is_robot,
}
if organization is not None:
user_json['is_org_member'] = user.is_robot or user.is_org_member
return user_json
team_data = [entity_team_view(team) for team in teams]
user_data = [user_view(user) for user in users]
return {
'results': team_data + user_data
}
def team_view(orgname, team):
view_permission = ViewTeamPermission(orgname, team.name)
role = model.get_team_org_role(team).name
return {
'id': team.id,
'name': team.name,
'description': team.description,
'can_view': view_permission.can(),
'role': role
}
@resource('/v1/find/repository')
class FindRepositories(ApiResource):
""" Resource for finding repositories. """
@parse_args
@query_param('query', 'The prefix to use when querying for repositories.', type=str, default='')
@nickname('findRepos')
def get(self, args):
""" Get a list of repositories that match the specified prefix query. """
prefix = args['query']
def repo_view(repo):
return {
'namespace': repo.namespace,
'name': repo.name,
'description': repo.description
}
username = None
if get_authenticated_user() is not None:
username = get_authenticated_user().username
matching = model.get_matching_repositories(prefix, username)
return {
'repositories': [repo_view(repo) for repo in matching]
}