This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/endpoints/api/search.py

129 lines
4.1 KiB
Python

from endpoints.api import (ApiResource, parse_args, query_param, truthy_bool, nickname, resource,
require_scope)
from data import model
from auth.permissions import (OrganizationMemberPermission, ViewTeamPermission,
ReadRepositoryPermission, UserAdminPermission,
AdministerOrganizationPermission)
from auth.auth_context import get_authenticated_user
from auth import scopes
from util.gravatar import compute_hash
@resource('/v1/entities/<prefix>')
class EntitySearch(ApiResource):
""" Resource for searching entities. """
@parse_args
@query_param('namespace', 'Namespace to use when querying for org entities.', type=str,
default='')
@query_param('includeTeams', 'Whether to include team names.', type=truthy_bool, default=False)
@query_param('includeOrgs', 'Whether to include orgs names.', type=truthy_bool, default=False)
@nickname('getMatchingEntities')
def get(self, args, prefix):
""" Get a list of entities that match the specified prefix. """
teams = []
org_data = []
namespace_name = args['namespace']
robot_namespace = None
organization = None
try:
organization = model.get_organization(namespace_name)
# namespace name was an org
permission = OrganizationMemberPermission(namespace_name)
if permission.can():
robot_namespace = namespace_name
if args['includeTeams']:
teams = model.get_matching_teams(prefix, organization)
if args['includeOrgs'] and AdministerOrganizationPermission(namespace_name) \
and namespace_name.startswith(prefix):
org_data = [{
'name': namespace_name,
'kind': 'org',
'is_org_member': True,
'gravatar': compute_hash(organization.email),
}]
except model.InvalidOrganizationException:
# namespace name was a user
user = get_authenticated_user()
if user and user.username == namespace_name:
# Check if there is admin user permissions (login only)
admin_permission = UserAdminPermission(user.username)
if admin_permission.can():
robot_namespace = namespace_name
users = model.get_matching_users(prefix, robot_namespace, organization)
def entity_team_view(team):
result = {
'name': team.name,
'kind': 'team',
'is_org_member': True
}
return result
def user_view(user):
user_json = {
'name': user.username,
'kind': 'user',
'is_robot': user.is_robot,
}
if organization is not None:
user_json['is_org_member'] = user.is_robot or user.is_org_member
return user_json
team_data = [entity_team_view(team) for team in teams]
user_data = [user_view(user) for user in users]
return {
'results': team_data + user_data + org_data
}
def team_view(orgname, team):
view_permission = ViewTeamPermission(orgname, team.name)
role = model.get_team_org_role(team).name
return {
'id': team.id,
'name': team.name,
'description': team.description,
'can_view': view_permission.can(),
'role': role
}
@resource('/v1/find/repository')
class FindRepositories(ApiResource):
""" Resource for finding repositories. """
@parse_args
@query_param('query', 'The prefix to use when querying for repositories.', type=str, default='')
@require_scope(scopes.READ_REPO)
@nickname('findRepos')
def get(self, args):
""" Get a list of repositories that match the specified prefix query. """
prefix = args['query']
def repo_view(repo):
return {
'namespace': repo.namespace_user.username,
'name': repo.name,
'description': repo.description
}
username = None
user = get_authenticated_user()
if user is not None:
username = user.username
matching = model.get_matching_repositories(prefix, username)
return {
'repositories': [repo_view(repo) for repo in matching
if (repo.visibility.name == 'public' or
ReadRepositoryPermission(repo.namespace_user.username, repo.name).can())]
}