From eeadeb9383aaf60a9a5d6d4e3a5a67118fe65237 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 21 Feb 2017 21:07:48 -0500 Subject: [PATCH] Initial interfaces and support for team syncing worker --- data/model/team.py | 95 ++++++++++++++- data/users/__init__.py | 6 + data/users/database.py | 4 + data/users/federated.py | 11 +- endpoints/api/organization.py | 6 +- endpoints/api/team.py | 2 +- static/css/core-ui.css | 18 ++- static/css/directives/ui/teams-manager.css | 4 + static/directives/teams-manager.html | 7 +- static/js/directives/ui/teams-manager.js | 3 +- static/partials/team-view.html | 7 +- workers/teamsyncworker.py | 134 +++++++++++++++++++++ 12 files changed, 282 insertions(+), 15 deletions(-) create mode 100644 workers/teamsyncworker.py diff --git a/data/model/team.py b/data/model/team.py index 52a58b842..9def9fdb4 100644 --- a/data/model/team.py +++ b/data/model/team.py @@ -1,9 +1,11 @@ import json +import uuid +from datetime import datetime from peewee import fn from data.database import (Team, TeamMember, TeamRole, User, TeamMemberInvite, RepositoryPermission, - TeamSync, LoginService) + TeamSync, LoginService, FederatedLogin, db_random_func, db_transaction) from data.model import (DataModelException, InvalidTeamException, UserAlreadyInTeam, InvalidTeamMemberException, _basequery) from data.text import prefix_search @@ -192,7 +194,7 @@ def get_matching_teams(team_prefix, organization): return query.limit(10) -def get_teams_within_org(organization): +def get_teams_within_org(organization, has_external_auth=False): """ Returns a AttrDict of team info (id, name, description), its role under the org, the number of repositories on which it has permission, and the number of members. """ @@ -209,6 +211,8 @@ def get_teams_within_org(organization): 'repo_count': 0, 'member_count': 0, + + 'is_synced': False, } teams = {team.id: _team_view(team) for team in query} @@ -236,6 +240,12 @@ def get_teams_within_org(organization): for member_tuple in members_tuples: teams[member_tuple[0]]['member_count'] = member_tuple[1] + # Add syncing information. + if has_external_auth: + sync_query = TeamSync.select(TeamSync.team).where(TeamSync.team << teams.keys()) + for team_sync in sync_query: + teams[team_sync.team_id]['is_synced'] = True + return [AttrDict(team_info) for team_info in teams.values()] @@ -374,16 +384,72 @@ def confirm_team_invite(code, user_obj): inviter = found.inviter return (team, inviter) + +def list_federated_team_members(team, login_service_name): + """ Returns a dict of all federated IDs for all team members in the team whose users are + bound to the login service withn the given name. The dictionary is from federated service + identifier (username) to their Quay User table ID. + """ + login_service = LoginService.get(name=login_service_name) + + query = (FederatedLogin + .select(FederatedLogin.service_ident, User.id) + .join(User) + .join(TeamMember) + .join(Team) + .where(Team.id == team, User.robot == False, FederatedLogin.service == login_service)) + return dict(query.tuples()) + + +def list_team_users(team): + """ Returns an iterator of all the *users* found in a team. Does not include robots. """ + return (User + .select() + .join(TeamMember) + .join(Team) + .where(Team.id == team, User.robot == False)) + + def set_team_syncing(team, login_service_name, config): """ Sets the given team to sync to the given service using the given config. """ login_service = LoginService.get(name=login_service_name) TeamSync.create(team=team, transaction_id='', service=login_service, config=json.dumps(config)) + def remove_team_syncing(orgname, teamname): + """ Removes syncing on the team matching the given organization name and team name. """ existing = get_team_sync_information(orgname, teamname) if existing: existing.delete_instance() + +def get_stale_team(stale_timespan): + """ Returns a team that is setup to sync to an external group, and who has not been synced in + now - stale_timespan. Returns None if none found. + """ + stale_at = datetime.now() - stale_timespan + + try: + candidates = (TeamSync + .select(TeamSync.id) + .where((TeamSync.last_updated <= stale_at) | (TeamSync.last_updated >> None)) + .limit(500) + .alias('candidates')) + + found = (TeamSync + .select(candidates.c.id) + .from_(candidates) + .order_by(db_random_func()) + .get()) + + if found is None: + return + + return TeamSync.select(TeamSync, Team).join(Team).where(TeamSync.id == found.id).get() + except TeamSync.DoesNotExist: + return None + + def get_team_sync_information(orgname, teamname): """ Returns the team syncing information for the team with the given name under the organization with the given name or None if none. @@ -400,3 +466,28 @@ def get_team_sync_information(orgname, teamname): return query.get() except TeamSync.DoesNotExist: return None + + +def update_sync_status(team_sync_info): + """ Attempts to update the transaction ID and last updated time on a TeamSync object. If the + transaction ID on the entry in the DB does not match that found on the object, this method + returns False, which indicates another caller updated it first. + """ + new_transaction_id = str(uuid.uuid4()) + query = (TeamSync + .update(transaction_id=new_transaction_id, last_updated=datetime.now()) + .where(TeamSync.id == team_sync_info.id, + TeamSync.transaction_id == team_sync_info.transaction_id)) + return query.execute() == 1 + + +def delete_members_not_present(team, member_id_set): + """ Deletes all members of the given team that are not found in the member ID set. """ + with db_transaction(): + user_ids = set([u.id for u in list_team_users(team)]) + to_delete = list(user_ids - member_id_set) + if to_delete: + query = TeamMember.delete().where(TeamMember.team == team, TeamMember.user << to_delete) + return query.execute() + + return 0 diff --git a/data/users/__init__.py b/data/users/__init__.py index 7cf3bccef..d53413b74 100644 --- a/data/users/__init__.py +++ b/data/users/__init__.py @@ -175,6 +175,12 @@ class UserAuthentication(object): """ return self.state.link_user(username_or_email) + def get_federated_user(self, user_info): + """ Returns a tuple containing the database user record linked to the given UserInformation + pair and any error that occurred when trying to link the user. + """ + return self.state.get_federated_user(user_info) + def confirm_existing_user(self, username, password): """ Verifies that the given password matches to the given DB username. Unlike verify_credentials, this call first translates the DB user via the FederatedLogin table diff --git a/data/users/database.py b/data/users/database.py index 6e5bf9949..07e327688 100644 --- a/data/users/database.py +++ b/data/users/database.py @@ -24,6 +24,10 @@ class DatabaseUsers(object): """ Never used since all users being added are already, by definition, in the database. """ return (None, 'Unsupported for this authentication system') + def get_federated_user(self, user_info): + """ Never used since all users being added are already, by definition, in the database. """ + return (None, 'Unsupported for this authentication system') + def query_users(self, query, limit): """ No need to implement, as we already query for users directly in the database. """ return (None, '', '') diff --git a/data/users/federated.py b/data/users/federated.py index 09d2eb748..fbb982367 100644 --- a/data/users/federated.py +++ b/data/users/federated.py @@ -38,11 +38,14 @@ class FederatedUsers(object): return (None, 'Not supported') def link_user(self, username_or_email): - (credentials, err_msg) = self.get_user(username_or_email) - if credentials is None: + (user_info, err_msg) = self.get_user(username_or_email) + if user_info is None: return (None, err_msg) - return self._get_federated_user(credentials.username, credentials.email) + return self.get_federated_user(user_info) + + def get_federated_user(self, user_info): + return self._get_federated_user(user_info.username, user_info.email) def verify_and_link_user(self, username_or_email, password): """ Verifies the given credentials and, if valid, creates/links a database user to the @@ -111,7 +114,7 @@ class FederatedUsers(object): prompts=prompts) else: # Update the db attributes from the federated service. - if email: + if email and db_user.email != email: db_user.email = email db_user.save() diff --git a/endpoints/api/organization.py b/endpoints/api/organization.py index 854451454..49e3194d9 100644 --- a/endpoints/api/organization.py +++ b/endpoints/api/organization.py @@ -6,7 +6,7 @@ from flask import request import features -from app import billing as stripe, avatar, all_queues +from app import billing as stripe, avatar, all_queues, authentication from endpoints.api import (resource, nickname, ApiResource, validate_json_request, request_error, related_user_resource, internal_only, require_user_admin, log_action, show_if, path_param, require_scope, require_fresh_login) @@ -33,6 +33,8 @@ def team_view(orgname, team): 'repo_count': team.repo_count, 'member_count': team.member_count, + + 'is_synced': team.is_synced, } @@ -157,7 +159,7 @@ class Organization(ApiResource): teams = None if OrganizationMemberPermission(orgname).can(): - teams = model.team.get_teams_within_org(org) + teams = model.team.get_teams_within_org(org, bool(authentication.federated_service)) return org_view(org, teams) diff --git a/endpoints/api/team.py b/endpoints/api/team.py index 80c2ad5ef..42c1f3f3c 100644 --- a/endpoints/api/team.py +++ b/endpoints/api/team.py @@ -291,7 +291,7 @@ class TeamMemberList(ApiResource): } if authentication.federated_service: - if SuperUserPermission().can(): + if SuperUserPermission().can() and AdministerOrganizationPermission(orgname).can(): data['can_sync'] = { 'service': authentication.federated_service, } diff --git a/static/css/core-ui.css b/static/css/core-ui.css index 217d31de1..36b2ce342 100644 --- a/static/css/core-ui.css +++ b/static/css/core-ui.css @@ -1521,7 +1521,7 @@ a:focus { top: 11px; left: 12px; font-size: 22px; - color: #E4C212; + color: #FCA657; } .co-alert.co-alert-danger { @@ -1566,6 +1566,14 @@ a:focus { left: 19px; } +.co-alert-inline:before { + position: relative !important; + top: auto !important; + left: auto !important; + vertical-align: middle; + margin-right: 10px; +} + .co-alert-popin-warning { margin-left: 10px; } @@ -1579,6 +1587,14 @@ a:focus { } } +.co-alert-inline { + border: 0px; + display: inline-block; + background-color: transparent !important; + margin: 0px; + padding: 4px; +} + .co-list-table tr td:first-child { font-weight: bold; padding-right: 10px; diff --git a/static/css/directives/ui/teams-manager.css b/static/css/directives/ui/teams-manager.css index a93a844ac..4c7c6e6b5 100644 --- a/static/css/directives/ui/teams-manager.css +++ b/static/css/directives/ui/teams-manager.css @@ -111,3 +111,7 @@ color: #aaa; font-size: 14px; } + +.teams-manager .fa-refresh { + color: #aaa; +} diff --git a/static/directives/teams-manager.html b/static/directives/teams-manager.html index 714b5711a..217dcbaef 100644 --- a/static/directives/teams-manager.html +++ b/static/directives/teams-manager.html @@ -41,6 +41,7 @@ + @@ -65,6 +66,9 @@ +
Team Name
+ + @@ -97,7 +101,8 @@ + role-changed="setRole(role, team.name)" roles="teamRoles" + read-only="!organization.is_admin"> diff --git a/static/js/directives/ui/teams-manager.js b/static/js/directives/ui/teams-manager.js index 7235c91f2..240eeb5fa 100644 --- a/static/js/directives/ui/teams-manager.js +++ b/static/js/directives/ui/teams-manager.js @@ -12,8 +12,9 @@ angular.module('quay').directive('teamsManager', function () { 'organization': '=organization', 'isEnabled': '=isEnabled' }, - controller: function($scope, $element, ApiService, $timeout, UserService, TableService, UIService) { + controller: function($scope, $element, ApiService, $timeout, UserService, TableService, UIService, Config) { $scope.TableService = TableService; + $scope.Config = Config; $scope.options = { 'predicate': 'ordered_team_index', diff --git a/static/partials/team-view.html b/static/partials/team-view.html index 02ec3a24e..6da8e07c3 100644 --- a/static/partials/team-view.html +++ b/static/partials/team-view.html @@ -30,7 +30,7 @@ This team is synchronized with a group in {{ getServiceName(syncInfo.service) }} and its user membership is therefore read-only. -
+
Directory Synchronization
@@ -44,11 +44,12 @@ - +
Last Updated: at {{ syncInfo.last_updated | amDateFormat:'dddd, MMMM Do YYYY, h:mm:ss a' }}NeverNever
- + +
You must be an admin of this organization to disable team synchronization
diff --git a/workers/teamsyncworker.py b/workers/teamsyncworker.py new file mode 100644 index 000000000..d658bd64f --- /dev/null +++ b/workers/teamsyncworker.py @@ -0,0 +1,134 @@ +import logging +import time +import json + +from app import app, authentication +from data import model +from workers.worker import Worker +from util.timedeltastring import convert_to_timedelta + +logger = logging.getLogger(__name__) + +WORKER_FREQUENCY = app.config.get('TEAM_SYNC_WORKER_FREQUENCY', 10) +STALE_CUTOFF = convert_to_timedelta(app.config.get('TEAM_RESYNC_STALE_TIME', '30s')) + +class TeamSynchronizationWorker(Worker): + """ Worker which synchronizes teams with their backing groups in LDAP/Keystone/etc. + """ + def __init__(self): + super(TeamSynchronizationWorker, self).__init__() + self.add_operation(self._sync_teams_to_groups, WORKER_FREQUENCY) + + def _sync_teams_to_groups(self): + logger.debug('Looking up teams to sync to groups') + + sync_team_tried = set() + while True: + # Find a stale team. + stale_team_sync = model.team.get_stale_team(STALE_CUTOFF) + if not stale_team_sync: + logger.debug('No additional stale team found; sleeping') + return + + # Make sure we don't try to reprocess a team on this iteration. + if stale_team_sync.id in sync_team_tried: + continue + + sync_team_tried.add(stale_team_sync.id) + + sync_config = json.loads(stale_team_sync.config) + logger.info('Syncing team `%s` under organization %s via %s (#%s)', stale_team_sync.team.name, + stale_team_sync.team.organization.username, sync_config, stale_team_sync.team_id) + + # Load all the existing members of the team in Quay that are bound to the auth service. + existing_users = model.team.list_federated_team_members(stale_team_sync.team, + authentication.federated_service) + + logger.debug('Existing membership of %s for team `%s` under organization %s via %s (#%s)', + len(existing_users), stale_team_sync.team.name, + stale_team_sync.team.organization.username, sync_config, stale_team_sync.team_id) + + # Load all the members of the team from the authenication system. + (member_iterator, err) = authentication.iterate_group_members(sync_config) + if err is not None: + logger.error('Got error when trying to iterate group members with config %s: %s', + sync_config, err) + continue + + # Collect all the members currently found in the group, adding them to the team as we go + # along. + group_membership = set() + for (member_info, err) in member_iterator: + if err is not None: + logger.error('Got error when trying to construct a member: %s', err) + continue + + # If the member is already in the team, nothing more to do. + if member_info.username in existing_users: + logger.debug('Member %s already in team `%s` under organization %s via %s (#%s)', + member_info.username, stale_team_sync.team.name, + stale_team_sync.team.organization.username, sync_config, + stale_team_sync.team_id) + + group_membership.add(existing_users[member_info.username]) + continue + + # Retrieve the Quay user associated with the member info. + (quay_user, err) = authentication.get_federated_user(member_info) + if err is not None: + logger.error('Could not link external user %s to an internal user: %s', + member_info.username, err) + continue + + # Add the user to the membership set. + group_membership.add(quay_user.id) + + # Add the user to the team. + try: + logger.info('Adding member %s to team `%s` under organization %s via %s (#%s)', + quay_user.username, stale_team_sync.team.name, + stale_team_sync.team.organization.username, sync_config, + stale_team_sync.team_id) + + model.team.add_user_to_team(quay_user, stale_team_sync.team) + except model.UserAlreadyInTeam: + # If the user is already present, nothing more to do for them. + pass + + # Update the transaction and last_updated time of the team sync. Only if it matches + # the current value will we then perform the deletion step. + result = model.team.update_sync_status(stale_team_sync) + if not result: + # Another worker updated this team. Nothing more to do. + logger.debug('Another worker synced team `%s` under organization %s via %s (#%s)', + stale_team_sync.team.name, + stale_team_sync.team.organization.username, sync_config, + stale_team_sync.team_id) + continue + + # Delete any team members not found in the backing auth system. + logger.debug('Deleting stale members for team `%s` under organization %s via %s (#%s)', + stale_team_sync.team.name, stale_team_sync.team.organization.username, + sync_config, stale_team_sync.team_id) + + deleted = model.team.delete_members_not_present(stale_team_sync.team, group_membership) + + # Done! + logger.info('Finishing sync for team `%s` under organization %s via %s (#%s): %s deleted', + stale_team_sync.team.name, stale_team_sync.team.organization.username, + sync_config, stale_team_sync.team_id, deleted) + +def main(): + logging.config.fileConfig('conf/logging_debug.conf', disable_existing_loggers=False) + + if not authentication.federated_service: + logger.debug('No federated auth is used; sleeping') + while True: + time.sleep(100000) + + worker = TeamSynchronizationWorker() + worker.start() + + +if __name__ == "__main__": + main()