Initial interfaces and support for team syncing worker

This commit is contained in:
Joseph Schorr 2017-02-21 21:07:48 -05:00
parent 94b07e6de9
commit eeadeb9383
12 changed files with 282 additions and 15 deletions

View file

@ -1,9 +1,11 @@
import json
import uuid
from datetime import datetime
from peewee import fn
from data.database import (Team, TeamMember, TeamRole, User, TeamMemberInvite, RepositoryPermission,
TeamSync, LoginService)
TeamSync, LoginService, FederatedLogin, db_random_func, db_transaction)
from data.model import (DataModelException, InvalidTeamException, UserAlreadyInTeam,
InvalidTeamMemberException, _basequery)
from data.text import prefix_search
@ -192,7 +194,7 @@ def get_matching_teams(team_prefix, organization):
return query.limit(10)
def get_teams_within_org(organization):
def get_teams_within_org(organization, has_external_auth=False):
""" Returns a AttrDict of team info (id, name, description), its role under the org,
the number of repositories on which it has permission, and the number of members.
"""
@ -209,6 +211,8 @@ def get_teams_within_org(organization):
'repo_count': 0,
'member_count': 0,
'is_synced': False,
}
teams = {team.id: _team_view(team) for team in query}
@ -236,6 +240,12 @@ def get_teams_within_org(organization):
for member_tuple in members_tuples:
teams[member_tuple[0]]['member_count'] = member_tuple[1]
# Add syncing information.
if has_external_auth:
sync_query = TeamSync.select(TeamSync.team).where(TeamSync.team << teams.keys())
for team_sync in sync_query:
teams[team_sync.team_id]['is_synced'] = True
return [AttrDict(team_info) for team_info in teams.values()]
@ -374,16 +384,72 @@ def confirm_team_invite(code, user_obj):
inviter = found.inviter
return (team, inviter)
def list_federated_team_members(team, login_service_name):
""" Returns a dict of all federated IDs for all team members in the team whose users are
bound to the login service withn the given name. The dictionary is from federated service
identifier (username) to their Quay User table ID.
"""
login_service = LoginService.get(name=login_service_name)
query = (FederatedLogin
.select(FederatedLogin.service_ident, User.id)
.join(User)
.join(TeamMember)
.join(Team)
.where(Team.id == team, User.robot == False, FederatedLogin.service == login_service))
return dict(query.tuples())
def list_team_users(team):
""" Returns an iterator of all the *users* found in a team. Does not include robots. """
return (User
.select()
.join(TeamMember)
.join(Team)
.where(Team.id == team, User.robot == False))
def set_team_syncing(team, login_service_name, config):
""" Sets the given team to sync to the given service using the given config. """
login_service = LoginService.get(name=login_service_name)
TeamSync.create(team=team, transaction_id='', service=login_service, config=json.dumps(config))
def remove_team_syncing(orgname, teamname):
""" Removes syncing on the team matching the given organization name and team name. """
existing = get_team_sync_information(orgname, teamname)
if existing:
existing.delete_instance()
def get_stale_team(stale_timespan):
""" Returns a team that is setup to sync to an external group, and who has not been synced in
now - stale_timespan. Returns None if none found.
"""
stale_at = datetime.now() - stale_timespan
try:
candidates = (TeamSync
.select(TeamSync.id)
.where((TeamSync.last_updated <= stale_at) | (TeamSync.last_updated >> None))
.limit(500)
.alias('candidates'))
found = (TeamSync
.select(candidates.c.id)
.from_(candidates)
.order_by(db_random_func())
.get())
if found is None:
return
return TeamSync.select(TeamSync, Team).join(Team).where(TeamSync.id == found.id).get()
except TeamSync.DoesNotExist:
return None
def get_team_sync_information(orgname, teamname):
""" Returns the team syncing information for the team with the given name under the organization
with the given name or None if none.
@ -400,3 +466,28 @@ def get_team_sync_information(orgname, teamname):
return query.get()
except TeamSync.DoesNotExist:
return None
def update_sync_status(team_sync_info):
""" Attempts to update the transaction ID and last updated time on a TeamSync object. If the
transaction ID on the entry in the DB does not match that found on the object, this method
returns False, which indicates another caller updated it first.
"""
new_transaction_id = str(uuid.uuid4())
query = (TeamSync
.update(transaction_id=new_transaction_id, last_updated=datetime.now())
.where(TeamSync.id == team_sync_info.id,
TeamSync.transaction_id == team_sync_info.transaction_id))
return query.execute() == 1
def delete_members_not_present(team, member_id_set):
""" Deletes all members of the given team that are not found in the member ID set. """
with db_transaction():
user_ids = set([u.id for u in list_team_users(team)])
to_delete = list(user_ids - member_id_set)
if to_delete:
query = TeamMember.delete().where(TeamMember.team == team, TeamMember.user << to_delete)
return query.execute()
return 0

View file

@ -175,6 +175,12 @@ class UserAuthentication(object):
"""
return self.state.link_user(username_or_email)
def get_federated_user(self, user_info):
""" Returns a tuple containing the database user record linked to the given UserInformation
pair and any error that occurred when trying to link the user.
"""
return self.state.get_federated_user(user_info)
def confirm_existing_user(self, username, password):
""" Verifies that the given password matches to the given DB username. Unlike
verify_credentials, this call first translates the DB user via the FederatedLogin table

View file

@ -24,6 +24,10 @@ class DatabaseUsers(object):
""" Never used since all users being added are already, by definition, in the database. """
return (None, 'Unsupported for this authentication system')
def get_federated_user(self, user_info):
""" Never used since all users being added are already, by definition, in the database. """
return (None, 'Unsupported for this authentication system')
def query_users(self, query, limit):
""" No need to implement, as we already query for users directly in the database. """
return (None, '', '')

View file

@ -38,11 +38,14 @@ class FederatedUsers(object):
return (None, 'Not supported')
def link_user(self, username_or_email):
(credentials, err_msg) = self.get_user(username_or_email)
if credentials is None:
(user_info, err_msg) = self.get_user(username_or_email)
if user_info is None:
return (None, err_msg)
return self._get_federated_user(credentials.username, credentials.email)
return self.get_federated_user(user_info)
def get_federated_user(self, user_info):
return self._get_federated_user(user_info.username, user_info.email)
def verify_and_link_user(self, username_or_email, password):
""" Verifies the given credentials and, if valid, creates/links a database user to the
@ -111,7 +114,7 @@ class FederatedUsers(object):
prompts=prompts)
else:
# Update the db attributes from the federated service.
if email:
if email and db_user.email != email:
db_user.email = email
db_user.save()

View file

@ -6,7 +6,7 @@ from flask import request
import features
from app import billing as stripe, avatar, all_queues
from app import billing as stripe, avatar, all_queues, authentication
from endpoints.api import (resource, nickname, ApiResource, validate_json_request, request_error,
related_user_resource, internal_only, require_user_admin, log_action,
show_if, path_param, require_scope, require_fresh_login)
@ -33,6 +33,8 @@ def team_view(orgname, team):
'repo_count': team.repo_count,
'member_count': team.member_count,
'is_synced': team.is_synced,
}
@ -157,7 +159,7 @@ class Organization(ApiResource):
teams = None
if OrganizationMemberPermission(orgname).can():
teams = model.team.get_teams_within_org(org)
teams = model.team.get_teams_within_org(org, bool(authentication.federated_service))
return org_view(org, teams)

View file

@ -291,7 +291,7 @@ class TeamMemberList(ApiResource):
}
if authentication.federated_service:
if SuperUserPermission().can():
if SuperUserPermission().can() and AdministerOrganizationPermission(orgname).can():
data['can_sync'] = {
'service': authentication.federated_service,
}

View file

@ -1521,7 +1521,7 @@ a:focus {
top: 11px;
left: 12px;
font-size: 22px;
color: #E4C212;
color: #FCA657;
}
.co-alert.co-alert-danger {
@ -1566,6 +1566,14 @@ a:focus {
left: 19px;
}
.co-alert-inline:before {
position: relative !important;
top: auto !important;
left: auto !important;
vertical-align: middle;
margin-right: 10px;
}
.co-alert-popin-warning {
margin-left: 10px;
}
@ -1579,6 +1587,14 @@ a:focus {
}
}
.co-alert-inline {
border: 0px;
display: inline-block;
background-color: transparent !important;
margin: 0px;
padding: 4px;
}
.co-list-table tr td:first-child {
font-weight: bold;
padding-right: 10px;

View file

@ -111,3 +111,7 @@
color: #aaa;
font-size: 14px;
}
.teams-manager .fa-refresh {
color: #aaa;
}

View file

@ -41,6 +41,7 @@
<table class="co-table" style="margin-top: 10px;">
<thead>
<td class="options-col" ng-if="::Config.AUTHENTICATION_TYPE != 'Database'"></td>
<td ng-class="TableService.tablePredicateClass('name', options.predicate, options.reverse)">
<a ng-click="TableService.orderBy('name', options)">Team Name</a>
</td>
@ -65,6 +66,9 @@
<tr class="co-checkable-row"
ng-repeat="team in orderedTeams.visibleEntries"
bindonce>
<td class="options-col" ng-if="::Config.AUTHENTICATION_TYPE != 'Database'">
<i class="fa fa-refresh" ng-if="team.is_synced" data-title="Team is synchronized with a backing group" bs-tooltip></i>
</td>
<td style="white-space: nowrap;">
<span class="avatar" data="team.avatar" size="24"></span>
<span bo-show="team.can_view">
@ -97,7 +101,8 @@
</td>
<td>
<span class="role-group" current-role="team.role" pull-left="true"
role-changed="setRole(role, team.name)" roles="teamRoles"></span>
role-changed="setRole(role, team.name)" roles="teamRoles"
read-only="!organization.is_admin"></span>
</td>
<td>
<span class="cor-options-menu" ng-show="organization.is_admin">

View file

@ -12,8 +12,9 @@ angular.module('quay').directive('teamsManager', function () {
'organization': '=organization',
'isEnabled': '=isEnabled'
},
controller: function($scope, $element, ApiService, $timeout, UserService, TableService, UIService) {
controller: function($scope, $element, ApiService, $timeout, UserService, TableService, UIService, Config) {
$scope.TableService = TableService;
$scope.Config = Config;
$scope.options = {
'predicate': 'ordered_team_index',

View file

@ -30,7 +30,7 @@
This team is synchronized with a group in <strong>{{ getServiceName(syncInfo.service) }}</strong> and its user membership is therefore <strong>read-only</strong>.
</div>
<div class="team-sync-header" ng-if="syncInfo.last_updated">
<div class="team-sync-header" ng-if="syncInfo.config">
<div class="section-header">Directory Synchronization</div>
<table class="team-sync-table">
<tr>
@ -44,11 +44,12 @@
<tr>
<td>Last Updated:</td>
<td ng-if="syncInfo.last_updated"><span am-time-ago="syncInfo.last_updated"></span> at {{ syncInfo.last_updated | amDateFormat:'dddd, MMMM Do YYYY, h:mm:ss a' }}</td>
<td ng-if="!syncInfo.last_updated">Never</td>
<td ng-if="!syncInfo.last_updated" style="color: #aaa;">Never</td>
</tr>
</table>
<button class="btn btn-default" ng-click="showDisableSyncing()">Remove Synchronization</button>
<button class="btn btn-default" ng-click="showDisableSyncing()" ng-if="canSync">Remove Synchronization</button>
<div ng-if="!canSync" class="co-alert co-alert-warning co-alert-inline">You must be an admin of this organization to disable team synchronization</div>
</div>
</div>

134
workers/teamsyncworker.py Normal file
View file

@ -0,0 +1,134 @@
import logging
import time
import json
from app import app, authentication
from data import model
from workers.worker import Worker
from util.timedeltastring import convert_to_timedelta
logger = logging.getLogger(__name__)
WORKER_FREQUENCY = app.config.get('TEAM_SYNC_WORKER_FREQUENCY', 10)
STALE_CUTOFF = convert_to_timedelta(app.config.get('TEAM_RESYNC_STALE_TIME', '30s'))
class TeamSynchronizationWorker(Worker):
""" Worker which synchronizes teams with their backing groups in LDAP/Keystone/etc.
"""
def __init__(self):
super(TeamSynchronizationWorker, self).__init__()
self.add_operation(self._sync_teams_to_groups, WORKER_FREQUENCY)
def _sync_teams_to_groups(self):
logger.debug('Looking up teams to sync to groups')
sync_team_tried = set()
while True:
# Find a stale team.
stale_team_sync = model.team.get_stale_team(STALE_CUTOFF)
if not stale_team_sync:
logger.debug('No additional stale team found; sleeping')
return
# Make sure we don't try to reprocess a team on this iteration.
if stale_team_sync.id in sync_team_tried:
continue
sync_team_tried.add(stale_team_sync.id)
sync_config = json.loads(stale_team_sync.config)
logger.info('Syncing team `%s` under organization %s via %s (#%s)', stale_team_sync.team.name,
stale_team_sync.team.organization.username, sync_config, stale_team_sync.team_id)
# Load all the existing members of the team in Quay that are bound to the auth service.
existing_users = model.team.list_federated_team_members(stale_team_sync.team,
authentication.federated_service)
logger.debug('Existing membership of %s for team `%s` under organization %s via %s (#%s)',
len(existing_users), stale_team_sync.team.name,
stale_team_sync.team.organization.username, sync_config, stale_team_sync.team_id)
# Load all the members of the team from the authenication system.
(member_iterator, err) = authentication.iterate_group_members(sync_config)
if err is not None:
logger.error('Got error when trying to iterate group members with config %s: %s',
sync_config, err)
continue
# Collect all the members currently found in the group, adding them to the team as we go
# along.
group_membership = set()
for (member_info, err) in member_iterator:
if err is not None:
logger.error('Got error when trying to construct a member: %s', err)
continue
# If the member is already in the team, nothing more to do.
if member_info.username in existing_users:
logger.debug('Member %s already in team `%s` under organization %s via %s (#%s)',
member_info.username, stale_team_sync.team.name,
stale_team_sync.team.organization.username, sync_config,
stale_team_sync.team_id)
group_membership.add(existing_users[member_info.username])
continue
# Retrieve the Quay user associated with the member info.
(quay_user, err) = authentication.get_federated_user(member_info)
if err is not None:
logger.error('Could not link external user %s to an internal user: %s',
member_info.username, err)
continue
# Add the user to the membership set.
group_membership.add(quay_user.id)
# Add the user to the team.
try:
logger.info('Adding member %s to team `%s` under organization %s via %s (#%s)',
quay_user.username, stale_team_sync.team.name,
stale_team_sync.team.organization.username, sync_config,
stale_team_sync.team_id)
model.team.add_user_to_team(quay_user, stale_team_sync.team)
except model.UserAlreadyInTeam:
# If the user is already present, nothing more to do for them.
pass
# Update the transaction and last_updated time of the team sync. Only if it matches
# the current value will we then perform the deletion step.
result = model.team.update_sync_status(stale_team_sync)
if not result:
# Another worker updated this team. Nothing more to do.
logger.debug('Another worker synced team `%s` under organization %s via %s (#%s)',
stale_team_sync.team.name,
stale_team_sync.team.organization.username, sync_config,
stale_team_sync.team_id)
continue
# Delete any team members not found in the backing auth system.
logger.debug('Deleting stale members for team `%s` under organization %s via %s (#%s)',
stale_team_sync.team.name, stale_team_sync.team.organization.username,
sync_config, stale_team_sync.team_id)
deleted = model.team.delete_members_not_present(stale_team_sync.team, group_membership)
# Done!
logger.info('Finishing sync for team `%s` under organization %s via %s (#%s): %s deleted',
stale_team_sync.team.name, stale_team_sync.team.organization.username,
sync_config, stale_team_sync.team_id, deleted)
def main():
logging.config.fileConfig('conf/logging_debug.conf', disable_existing_loggers=False)
if not authentication.federated_service:
logger.debug('No federated auth is used; sleeping')
while True:
time.sleep(100000)
worker = TeamSynchronizationWorker()
worker.start()
if __name__ == "__main__":
main()