Fix some bugs with the permissions API. Prevent the user from removing themelves as admin.
This commit is contained in:
parent
479e23b09d
commit
53f2a31547
2 changed files with 54 additions and 22 deletions
|
@ -8,6 +8,10 @@ from database import *
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class DataModelException(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def create_user(username, password, email):
|
def create_user(username, password, email):
|
||||||
pw_hash = bcrypt.hashpw(password, bcrypt.gensalt())
|
pw_hash = bcrypt.hashpw(password, bcrypt.gensalt())
|
||||||
new_user = User.create(username=username, password_hash=pw_hash,
|
new_user = User.create(username=username, password_hash=pw_hash,
|
||||||
|
@ -130,7 +134,7 @@ def set_image_metadata(image_id, namespace_name, repository_name,
|
||||||
Image.image_id == image_id))
|
Image.image_id == image_id))
|
||||||
|
|
||||||
if not image_list:
|
if not image_list:
|
||||||
raise RuntimeError('No image with specified id and repository')
|
raise DataModelException('No image with specified id and repository')
|
||||||
|
|
||||||
fetched = image_list[0]
|
fetched = image_list[0]
|
||||||
fetched.created = dateutil.parser.parse(created_date_str)
|
fetched.created = dateutil.parser.parse(created_date_str)
|
||||||
|
@ -160,7 +164,7 @@ def get_tag_image(namespace_name, repository_name, tag_name):
|
||||||
RepositoryTag.name == tag_name))
|
RepositoryTag.name == tag_name))
|
||||||
|
|
||||||
if not fetched:
|
if not fetched:
|
||||||
raise Image.DoesNotExist('Unable to find image for tag.')
|
raise DataModelException('Unable to find image for tag.')
|
||||||
|
|
||||||
return fetched[0]
|
return fetched[0]
|
||||||
|
|
||||||
|
@ -202,23 +206,39 @@ def get_user_repo_permissions(user, repository):
|
||||||
RepositoryPermission.repository == repository)
|
RepositoryPermission.repository == repository)
|
||||||
|
|
||||||
|
|
||||||
def get_user_reponame_permission(user_obj, namespace_name, repository_name):
|
def user_permission_repo_query(username, namespace_name, repository_name):
|
||||||
repo = Repository.get(Repository.name == repository_name,
|
selected = RepositoryPermission.select(User, Repository, Role,
|
||||||
Repository.namespace == namespace_name)
|
RepositoryPermission)
|
||||||
perm = RepositoryPermission.get(RepositoryPermission.user == user_obj,
|
with_user = selected.join(User)
|
||||||
RepositoryPermission.repository == repo)
|
with_role = with_user.switch(RepositoryPermission).join(Role)
|
||||||
return perm
|
with_repo = with_role.switch(RepositoryPermission).join(Repository)
|
||||||
|
return with_repo.where(Repository.name == repository_name,
|
||||||
|
Repository.namespace == namespace_name,
|
||||||
|
User.username == username)
|
||||||
|
|
||||||
|
|
||||||
def set_user_repo_permission(user_obj, namespace_name, repository_name,
|
def get_user_reponame_permission(username, namespace_name, repository_name):
|
||||||
|
fetched = list(user_permission_repo_query(username, namespace_name,
|
||||||
|
repository_name))
|
||||||
|
if not fetched:
|
||||||
|
raise DataModelException('User does not have permission for repo.')
|
||||||
|
|
||||||
|
return fetched[0]
|
||||||
|
|
||||||
|
|
||||||
|
def set_user_repo_permission(username, namespace_name, repository_name,
|
||||||
role_name):
|
role_name):
|
||||||
|
if username == namespace_name:
|
||||||
|
raise DataModelException('Namespace owner must always be admin.')
|
||||||
|
|
||||||
|
user = User.get(User.username == username)
|
||||||
repo = Repository.get(Repository.name == repository_name,
|
repo = Repository.get(Repository.name == repository_name,
|
||||||
Repository.namespace == namespace_name)
|
Repository.namespace == namespace_name)
|
||||||
new_role = Role.get(Role.name == role_name)
|
new_role = Role.get(Role.name == role_name)
|
||||||
|
|
||||||
# Fetch any existing permission for this user on the repo
|
# Fetch any existing permission for this user on the repo
|
||||||
try:
|
try:
|
||||||
perm = RepositoryPermission.get(RepositoryPermission.user == user_obj,
|
perm = RepositoryPermission.get(RepositoryPermission.user == user,
|
||||||
RepositoryPermission.repository == repo)
|
RepositoryPermission.repository == repo)
|
||||||
perm.role = new_role
|
perm.role = new_role
|
||||||
perm.save()
|
perm.save()
|
||||||
|
@ -228,9 +248,13 @@ def set_user_repo_permission(user_obj, namespace_name, repository_name,
|
||||||
role=new_role)
|
role=new_role)
|
||||||
return new_perm
|
return new_perm
|
||||||
|
|
||||||
def delete_user_permission(user_obj, namespace_name, repository_name):
|
def delete_user_permission(username, namespace_name, repository_name):
|
||||||
repo = Repository.get(Repository.name == repository_name,
|
if username == namespace_name:
|
||||||
Repository.namespace == namespace_name)
|
raise DataModelException('Namespace owner must always be admin.')
|
||||||
perm = RepositoryPermission.get(RepositoryPermission.user == user_obj,
|
|
||||||
RepositoryPermission.repository == repo)
|
fetched = list(user_permission_repo_query(username, namespace_name,
|
||||||
perm.delete_instance()
|
repository_name))
|
||||||
|
if not fetched:
|
||||||
|
raise DataModelException('User does not have permission for repo.')
|
||||||
|
|
||||||
|
fetched[0].delete_instance()
|
||||||
|
|
|
@ -153,8 +153,7 @@ def get_permissions(namespace, repository, username):
|
||||||
(namespace, repository, username))
|
(namespace, repository, username))
|
||||||
permission = AdministerRepositoryPermission(namespace, repository)
|
permission = AdministerRepositoryPermission(namespace, repository)
|
||||||
if permission.can():
|
if permission.can():
|
||||||
user = current_user.db_user
|
perm = model.get_user_reponame_permission(username, namespace, repository)
|
||||||
perm = model.get_user_reponame_permission(user, namespace, repository)
|
|
||||||
return jsonify(role_view(perm))
|
return jsonify(role_view(perm))
|
||||||
|
|
||||||
abort(403) # Permission denied
|
abort(403) # Permission denied
|
||||||
|
@ -169,11 +168,15 @@ def change_permissions(namespace, repository, username):
|
||||||
if permission.can():
|
if permission.can():
|
||||||
new_permission = request.get_json()
|
new_permission = request.get_json()
|
||||||
|
|
||||||
user = current_user.db_user
|
|
||||||
logger.debug('Setting permission to: %s for user %s' %
|
logger.debug('Setting permission to: %s for user %s' %
|
||||||
(new_permission['role'], username))
|
(new_permission['role'], username))
|
||||||
perm = model.set_user_repo_permission(user, namespace, repository,
|
|
||||||
new_permission['role'])
|
try:
|
||||||
|
perm = model.set_user_repo_permission(username, namespace, repository,
|
||||||
|
new_permission['role'])
|
||||||
|
except model.DataModelException:
|
||||||
|
logger.warning('User tried to remove themselves as admin.')
|
||||||
|
abort(409)
|
||||||
|
|
||||||
resp = jsonify(role_view(perm))
|
resp = jsonify(role_view(perm))
|
||||||
if request.method == 'POST':
|
if request.method == 'POST':
|
||||||
|
@ -189,7 +192,12 @@ def change_permissions(namespace, repository, username):
|
||||||
def delete_permissions(namespace, repository, username):
|
def delete_permissions(namespace, repository, username):
|
||||||
permission = AdministerRepositoryPermission(namespace, repository)
|
permission = AdministerRepositoryPermission(namespace, repository)
|
||||||
if permission.can():
|
if permission.can():
|
||||||
model.delete_user_permission(current_user.db_user, namespace, repository)
|
try:
|
||||||
|
model.delete_user_permission(username, namespace, repository)
|
||||||
|
except model.DataModelException:
|
||||||
|
logger.warning('User tried to remove themselves as admin.')
|
||||||
|
abort(409)
|
||||||
|
|
||||||
return make_response('Deleted', 204)
|
return make_response('Deleted', 204)
|
||||||
|
|
||||||
abort(403) # Permission denied
|
abort(403) # Permission denied
|
||||||
|
|
Reference in a new issue