from peewee import JOIN_LEFT_OUTER, Clause, SQL from cachetools import lru_cache from data.database import (Repository, User, Team, TeamMember, RepositoryPermission, TeamRole, Namespace, Visibility, db_for_update) def prefix_search(field, prefix_query): """ Returns the wildcard match for searching for the given prefix query. """ # Escape the known wildcard characters. prefix_query = (prefix_query .replace('!', '!!') .replace('%', '!%') .replace('_', '!_') .replace('[', '![')) return field ** Clause(prefix_query + '%', SQL("ESCAPE '!'")) def get_existing_repository(namespace_name, repository_name, for_update=False): query = (Repository .select(Repository, Namespace) .join(Namespace, on=(Repository.namespace_user == Namespace.id)) .where(Namespace.username == namespace_name, Repository.name == repository_name)) if for_update: query = db_for_update(query) return query.get() @lru_cache(maxsize=1) def get_public_repo_visibility(): return Visibility.get(name='public') def filter_to_repos_for_user(query, username=None, namespace=None, include_public=True): if not include_public and not username: return Repository.select().where(Repository.id == '-1') # Build a set of queries that, when unioned together, return the full set of visible repositories # for the filters specified. queries = [] where_clause = (True) if namespace: where_clause = (Namespace.username == namespace) if include_public: queries.append(query.clone() .where(Repository.visibility == get_public_repo_visibility(), where_clause)) if username: UserThroughTeam = User.alias() Org = User.alias() AdminTeam = Team.alias() AdminTeamMember = TeamMember.alias() AdminUser = User.alias() # Add repositories in which the user has permission. queries.append(query.clone() .switch(RepositoryPermission) .join(User) .where(User.username == username, where_clause)) # Add repositories in which the user is a member of a team that has permission. queries.append(query.clone() .switch(RepositoryPermission) .join(Team) .join(TeamMember) .join(UserThroughTeam, on=(UserThroughTeam.id == TeamMember.user)) .where(UserThroughTeam.username == username, where_clause)) # Add repositories under namespaces in which the user is the org admin. queries.append(query.clone() .switch(Repository) .join(Org, on=(Repository.namespace_user == Org.id)) .join(AdminTeam, on=(Org.id == AdminTeam.organization)) .join(TeamRole, on=(AdminTeam.role == TeamRole.id)) .switch(AdminTeam) .join(AdminTeamMember, on=(AdminTeam.id == AdminTeamMember.team)) .join(AdminUser, on=(AdminTeamMember.user == AdminUser.id)) .where(AdminUser.username == username, where_clause)) return reduce(lambda l, r: l | r, queries) def get_user_organizations(username): UserAlias = User.alias() return (User .select() .distinct() .join(Team) .join(TeamMember) .join(UserAlias, on=(UserAlias.id == TeamMember.user)) .where(User.organization == True, UserAlias.username == username))