135 lines
5.5 KiB
Python
135 lines
5.5 KiB
Python
|
import logging
|
||
|
import time
|
||
|
import json
|
||
|
|
||
|
from app import app, authentication
|
||
|
from data import model
|
||
|
from workers.worker import Worker
|
||
|
from util.timedeltastring import convert_to_timedelta
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
|
||
|
WORKER_FREQUENCY = app.config.get('TEAM_SYNC_WORKER_FREQUENCY', 10)
|
||
|
STALE_CUTOFF = convert_to_timedelta(app.config.get('TEAM_RESYNC_STALE_TIME', '30s'))
|
||
|
|
||
|
class TeamSynchronizationWorker(Worker):
|
||
|
""" Worker which synchronizes teams with their backing groups in LDAP/Keystone/etc.
|
||
|
"""
|
||
|
def __init__(self):
|
||
|
super(TeamSynchronizationWorker, self).__init__()
|
||
|
self.add_operation(self._sync_teams_to_groups, WORKER_FREQUENCY)
|
||
|
|
||
|
def _sync_teams_to_groups(self):
|
||
|
logger.debug('Looking up teams to sync to groups')
|
||
|
|
||
|
sync_team_tried = set()
|
||
|
while True:
|
||
|
# Find a stale team.
|
||
|
stale_team_sync = model.team.get_stale_team(STALE_CUTOFF)
|
||
|
if not stale_team_sync:
|
||
|
logger.debug('No additional stale team found; sleeping')
|
||
|
return
|
||
|
|
||
|
# Make sure we don't try to reprocess a team on this iteration.
|
||
|
if stale_team_sync.id in sync_team_tried:
|
||
|
continue
|
||
|
|
||
|
sync_team_tried.add(stale_team_sync.id)
|
||
|
|
||
|
sync_config = json.loads(stale_team_sync.config)
|
||
|
logger.info('Syncing team `%s` under organization %s via %s (#%s)', stale_team_sync.team.name,
|
||
|
stale_team_sync.team.organization.username, sync_config, stale_team_sync.team_id)
|
||
|
|
||
|
# Load all the existing members of the team in Quay that are bound to the auth service.
|
||
|
existing_users = model.team.list_federated_team_members(stale_team_sync.team,
|
||
|
authentication.federated_service)
|
||
|
|
||
|
logger.debug('Existing membership of %s for team `%s` under organization %s via %s (#%s)',
|
||
|
len(existing_users), stale_team_sync.team.name,
|
||
|
stale_team_sync.team.organization.username, sync_config, stale_team_sync.team_id)
|
||
|
|
||
|
# Load all the members of the team from the authenication system.
|
||
|
(member_iterator, err) = authentication.iterate_group_members(sync_config)
|
||
|
if err is not None:
|
||
|
logger.error('Got error when trying to iterate group members with config %s: %s',
|
||
|
sync_config, err)
|
||
|
continue
|
||
|
|
||
|
# Collect all the members currently found in the group, adding them to the team as we go
|
||
|
# along.
|
||
|
group_membership = set()
|
||
|
for (member_info, err) in member_iterator:
|
||
|
if err is not None:
|
||
|
logger.error('Got error when trying to construct a member: %s', err)
|
||
|
continue
|
||
|
|
||
|
# If the member is already in the team, nothing more to do.
|
||
|
if member_info.username in existing_users:
|
||
|
logger.debug('Member %s already in team `%s` under organization %s via %s (#%s)',
|
||
|
member_info.username, stale_team_sync.team.name,
|
||
|
stale_team_sync.team.organization.username, sync_config,
|
||
|
stale_team_sync.team_id)
|
||
|
|
||
|
group_membership.add(existing_users[member_info.username])
|
||
|
continue
|
||
|
|
||
|
# Retrieve the Quay user associated with the member info.
|
||
|
(quay_user, err) = authentication.get_federated_user(member_info)
|
||
|
if err is not None:
|
||
|
logger.error('Could not link external user %s to an internal user: %s',
|
||
|
member_info.username, err)
|
||
|
continue
|
||
|
|
||
|
# Add the user to the membership set.
|
||
|
group_membership.add(quay_user.id)
|
||
|
|
||
|
# Add the user to the team.
|
||
|
try:
|
||
|
logger.info('Adding member %s to team `%s` under organization %s via %s (#%s)',
|
||
|
quay_user.username, stale_team_sync.team.name,
|
||
|
stale_team_sync.team.organization.username, sync_config,
|
||
|
stale_team_sync.team_id)
|
||
|
|
||
|
model.team.add_user_to_team(quay_user, stale_team_sync.team)
|
||
|
except model.UserAlreadyInTeam:
|
||
|
# If the user is already present, nothing more to do for them.
|
||
|
pass
|
||
|
|
||
|
# Update the transaction and last_updated time of the team sync. Only if it matches
|
||
|
# the current value will we then perform the deletion step.
|
||
|
result = model.team.update_sync_status(stale_team_sync)
|
||
|
if not result:
|
||
|
# Another worker updated this team. Nothing more to do.
|
||
|
logger.debug('Another worker synced team `%s` under organization %s via %s (#%s)',
|
||
|
stale_team_sync.team.name,
|
||
|
stale_team_sync.team.organization.username, sync_config,
|
||
|
stale_team_sync.team_id)
|
||
|
continue
|
||
|
|
||
|
# Delete any team members not found in the backing auth system.
|
||
|
logger.debug('Deleting stale members for team `%s` under organization %s via %s (#%s)',
|
||
|
stale_team_sync.team.name, stale_team_sync.team.organization.username,
|
||
|
sync_config, stale_team_sync.team_id)
|
||
|
|
||
|
deleted = model.team.delete_members_not_present(stale_team_sync.team, group_membership)
|
||
|
|
||
|
# Done!
|
||
|
logger.info('Finishing sync for team `%s` under organization %s via %s (#%s): %s deleted',
|
||
|
stale_team_sync.team.name, stale_team_sync.team.organization.username,
|
||
|
sync_config, stale_team_sync.team_id, deleted)
|
||
|
|
||
|
def main():
|
||
|
logging.config.fileConfig('conf/logging_debug.conf', disable_existing_loggers=False)
|
||
|
|
||
|
if not authentication.federated_service:
|
||
|
logger.debug('No federated auth is used; sleeping')
|
||
|
while True:
|
||
|
time.sleep(100000)
|
||
|
|
||
|
worker = TeamSynchronizationWorker()
|
||
|
worker.start()
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|