This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/util/config/superusermanager.py

39 lines
1.4 KiB
Python
Raw Normal View History

from multiprocessing.sharedctypes import Value, Array
from util.validation import MAX_LENGTH
class SuperUserManager(object):
""" In-memory helper class for quickly accessing (and updating) the valid
set of super users. This class communicates across processes to ensure
that the shared set is always the same.
"""
def __init__(self, app):
usernames = app.config.get('SUPER_USERS', [])
usernames_str = ','.join(usernames)
self._max_length = len(usernames_str) + MAX_LENGTH + 1
self._array = Array('c', self._max_length, lock=True)
self._array.value = usernames_str
def is_superuser(self, username):
""" Returns if the given username represents a super user. """
usernames = self._array.value.split(',')
return username in usernames
def register_superuser(self, username):
""" Registers a new username as a super user for the duration of the container.
Note that this does *not* change any underlying config files.
"""
usernames = self._array.value.split(',')
usernames.append(username)
new_string = ','.join(usernames)
if len(new_string) <= self._max_length:
self._array.value = new_string
else:
raise Exception('Maximum superuser count reached. Please report this to support.')
def has_superusers(self):
""" Returns whether there are any superusers defined. """
return bool(self._array.value)