841 lines
		
	
	
	
		
			25 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			841 lines
		
	
	
	
		
			25 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """ Manage the current user. """
 | |
| 
 | |
| import logging
 | |
| import json
 | |
| 
 | |
| from random import SystemRandom
 | |
| from flask import request, abort
 | |
| from flask.ext.login import logout_user
 | |
| from flask.ext.principal import identity_changed, AnonymousIdentity
 | |
| from peewee import IntegrityError
 | |
| 
 | |
| from app import app, billing as stripe, authentication, avatar
 | |
| from endpoints.api import (ApiResource, nickname, resource, validate_json_request, request_error,
 | |
|                            log_action, internal_only, NotFound, require_user_admin, parse_args,
 | |
|                            query_param, InvalidToken, require_scope, format_date, hide_if, show_if,
 | |
|                            license_error, require_fresh_login, path_param, define_json_response,
 | |
|                            RepositoryParamResource)
 | |
| from endpoints.api.subscribe import subscribe
 | |
| from endpoints.common import common_login
 | |
| from endpoints.decorators import anon_allowed
 | |
| from endpoints.api.team import try_accept_invite
 | |
| 
 | |
| from data import model
 | |
| from data.billing import get_plan
 | |
| from auth.permissions import (AdministerOrganizationPermission, CreateRepositoryPermission,
 | |
|                               UserAdminPermission, UserReadPermission, SuperUserPermission)
 | |
| from auth.auth_context import get_authenticated_user
 | |
| from auth import scopes
 | |
| from util.useremails import (send_confirmation_email, send_recovery_email, send_change_email, send_password_changed)
 | |
| from util.names import parse_single_urn
 | |
| 
 | |
| import features
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| def user_view(user):
 | |
|   def org_view(o):
 | |
|     admin_org = AdministerOrganizationPermission(o.username)
 | |
|     return {
 | |
|       'name': o.username,
 | |
|       'avatar': avatar.get_data_for_org(o),
 | |
|       'is_org_admin': admin_org.can(),
 | |
|       'can_create_repo': admin_org.can() or CreateRepositoryPermission(o.username).can(),
 | |
|       'preferred_namespace': not (o.stripe_id is None)
 | |
|     }
 | |
| 
 | |
|   organizations = model.get_user_organizations(user.username)
 | |
| 
 | |
|   def login_view(login):
 | |
|     try:
 | |
|       metadata = json.loads(login.metadata_json)
 | |
|     except:
 | |
|       metadata = {}
 | |
| 
 | |
|     return {
 | |
|       'service': login.service.name,
 | |
|       'service_identifier': login.service_ident,
 | |
|       'metadata': metadata
 | |
|     }
 | |
| 
 | |
|   logins = model.list_federated_logins(user)
 | |
| 
 | |
|   user_response = {
 | |
|     'anonymous': False,
 | |
|     'username': user.username,
 | |
|     'avatar': avatar.get_data_for_user(user)
 | |
|   }
 | |
| 
 | |
|   user_admin = UserAdminPermission(user.username)
 | |
|   if user_admin.can():
 | |
|     user_response.update({
 | |
|       'is_me': True,
 | |
|       'verified': user.verified,
 | |
|       'email': user.email,
 | |
|       'organizations': [org_view(o) for o in organizations],
 | |
|       'logins': [login_view(login) for login in logins],
 | |
|       'can_create_repo': True,
 | |
|       'invoice_email': user.invoice_email,
 | |
|       'preferred_namespace': not (user.stripe_id is None),
 | |
|       'tag_expiration': user.removed_tag_expiration_s,
 | |
|     })
 | |
| 
 | |
|   if features.SUPER_USERS and SuperUserPermission().can():
 | |
|     user_response.update({
 | |
|       'super_user': user and user == get_authenticated_user() and SuperUserPermission().can()
 | |
|     })
 | |
| 
 | |
|   return user_response
 | |
| 
 | |
| 
 | |
| def notification_view(notification):
 | |
|   return {
 | |
|     'id': notification.uuid,
 | |
|     'organization': notification.target.username if notification.target.organization else None,
 | |
|     'kind': notification.kind.name,
 | |
|     'created': format_date(notification.created),
 | |
|     'metadata': json.loads(notification.metadata_json),
 | |
|     'dismissed': notification.dismissed
 | |
|   }
 | |
| 
 | |
| 
 | |
| @resource('/v1/user/')
 | |
| class User(ApiResource):
 | |
|   """ Operations related to users. """
 | |
|   schemas = {
 | |
|     'NewUser': {
 | |
| 
 | |
|       'id': 'NewUser',
 | |
|       'type': 'object',
 | |
|       'description': 'Fields which must be specified for a new user.',
 | |
|       'required': [
 | |
|         'username',
 | |
|         'password',
 | |
|         'email',
 | |
|       ],
 | |
|       'properties': {
 | |
|         'username': {
 | |
|           'type': 'string',
 | |
|           'description': 'The user\'s username',
 | |
|         },
 | |
|         'password': {
 | |
|           'type': 'string',
 | |
|           'description': 'The user\'s password',
 | |
|         },
 | |
|         'email': {
 | |
|           'type': 'string',
 | |
|           'description': 'The user\'s email address',
 | |
|         },
 | |
|         'invite_code': {
 | |
|           'type': 'string',
 | |
|           'description': 'The optional invite code'
 | |
|         }
 | |
|       }
 | |
|     },
 | |
|     'UpdateUser': {
 | |
|       'id': 'UpdateUser',
 | |
|       'type': 'object',
 | |
|       'description': 'Fields which can be updated in a user.',
 | |
|       'properties': {
 | |
|         'password': {
 | |
|           'type': 'string',
 | |
|           'description': 'The user\'s password',
 | |
|         },
 | |
|         'invoice_email': {
 | |
|           'type': 'boolean',
 | |
|           'description': 'Whether the user desires to receive an invoice email.',
 | |
|         },
 | |
|         'email': {
 | |
|           'type': 'string',
 | |
|           'description': 'The user\'s email address',
 | |
|         },
 | |
|         'tag_expiration': {
 | |
|           'type': 'integer',
 | |
|           'maximum': 2592000,
 | |
|           'minimum': 0,
 | |
|         },
 | |
|         'username': {
 | |
|           'type': 'string',
 | |
|           'description': 'The user\'s username',
 | |
|         },
 | |
|       },
 | |
|     },
 | |
|     'UserView': {
 | |
|       'id': 'UserView',
 | |
|       'type': 'object',
 | |
|       'description': 'Describes a user',
 | |
|       'required': ['verified', 'anonymous', 'avatar'],
 | |
|       'properties': {
 | |
|         'verified': {
 | |
|           'type': 'boolean',
 | |
|           'description': 'Whether the user\'s email address has been verified'
 | |
|         },
 | |
|         'anonymous': {
 | |
|           'type': 'boolean',
 | |
|           'description': 'true if this user data represents a guest user'
 | |
|         },
 | |
|         'email': {
 | |
|           'type': 'string',
 | |
|           'description': 'The user\'s email address',
 | |
|         },
 | |
|         'avatar': {
 | |
|           'type': 'object',
 | |
|           'description': 'Avatar data representing the user\'s icon'
 | |
|         },
 | |
|         'organizations': {
 | |
|           'type': 'array',
 | |
|           'description': 'Information about the organizations in which the user is a member',
 | |
|           'items': {
 | |
|             'type': 'object'
 | |
|           }
 | |
|         },
 | |
|         'logins': {
 | |
|           'type': 'array',
 | |
|           'description': 'The list of external login providers against which the user has authenticated',
 | |
|           'items': {
 | |
|             'type': 'object'
 | |
|           }
 | |
|         },
 | |
|         'can_create_repo': {
 | |
|           'type': 'boolean',
 | |
|           'description': 'Whether the user has permission to create repositories'
 | |
|         },
 | |
|         'preferred_namespace': {
 | |
|           'type': 'boolean',
 | |
|           'description': 'If true, the user\'s namespace is the preferred namespace to display'
 | |
|         }
 | |
|       }
 | |
|     },
 | |
|   }
 | |
| 
 | |
|   @require_scope(scopes.READ_USER)
 | |
|   @nickname('getLoggedInUser')
 | |
|   @define_json_response('UserView')
 | |
|   @anon_allowed
 | |
|   def get(self):
 | |
|     """ Get user information for the authenticated user. """
 | |
|     user = get_authenticated_user()
 | |
|     if user is None or user.organization or not UserReadPermission(user.username).can():
 | |
|       raise InvalidToken("Requires authentication", payload={'session_required': False})
 | |
| 
 | |
|     return user_view(user)
 | |
| 
 | |
|   @require_user_admin
 | |
|   @require_fresh_login
 | |
|   @nickname('changeUserDetails')
 | |
|   @internal_only
 | |
|   @validate_json_request('UpdateUser')
 | |
|   @define_json_response('UserView')
 | |
|   def put(self):
 | |
|     """ Update a users details such as password or email. """
 | |
|     user = get_authenticated_user()
 | |
|     user_data = request.get_json()
 | |
| 
 | |
|     try:
 | |
|       if 'password' in user_data:
 | |
|         logger.debug('Changing password for user: %s', user.username)
 | |
|         log_action('account_change_password', user.username)
 | |
| 
 | |
|         # Change the user's password.
 | |
|         model.change_password(user, user_data['password'])
 | |
| 
 | |
|         # Login again to reset their session cookie.
 | |
|         common_login(user)
 | |
| 
 | |
|         if features.MAILING:
 | |
|           send_password_changed(user.username, user.email)
 | |
| 
 | |
|       if 'invoice_email' in user_data:
 | |
|         logger.debug('Changing invoice_email for user: %s', user.username)
 | |
|         model.change_invoice_email(user, user_data['invoice_email'])
 | |
| 
 | |
|       if 'tag_expiration' in user_data:
 | |
|         logger.debug('Changing user tag expiration to: %ss', user_data['tag_expiration'])
 | |
|         model.change_user_tag_expiration(user, user_data['tag_expiration'])
 | |
| 
 | |
|       if 'email' in user_data and user_data['email'] != user.email:
 | |
|         new_email = user_data['email']
 | |
|         if model.find_user_by_email(new_email):
 | |
|           # Email already used.
 | |
|           raise request_error(message='E-mail address already used')
 | |
| 
 | |
|         if features.MAILING:
 | |
|           logger.debug('Sending email to change email address for user: %s',
 | |
|                        user.username)
 | |
|           code = model.create_confirm_email_code(user, new_email=new_email)
 | |
|           send_change_email(user.username, user_data['email'], code.code)
 | |
|         else:
 | |
|           model.update_email(user, new_email, auto_verify=not features.MAILING)
 | |
| 
 | |
|       if ('username' in user_data and user_data['username'] != user.username and
 | |
|           features.USER_RENAME):
 | |
|         new_username = user_data['username']
 | |
|         if model.get_user_or_org(new_username) is not None:
 | |
|           # Username already used
 | |
|           raise request_error(message='Username is already in use')
 | |
| 
 | |
|         model.change_username(user.id, new_username)
 | |
| 
 | |
|     except model.InvalidPasswordException, ex:
 | |
|       raise request_error(exception=ex)
 | |
| 
 | |
|     return user_view(user)
 | |
| 
 | |
|   @show_if(features.USER_CREATION)
 | |
|   @nickname('createNewUser')
 | |
|   @internal_only
 | |
|   @validate_json_request('NewUser')
 | |
|   def post(self):
 | |
|     """ Create a new user. """
 | |
|     user_data = request.get_json()
 | |
|     invite_code = user_data.get('invite_code', '')
 | |
| 
 | |
|     existing_user = model.get_nonrobot_user(user_data['username'])
 | |
|     if existing_user:
 | |
|       raise request_error(message='The username already exists')
 | |
| 
 | |
|     try:
 | |
|       new_user = model.create_user(user_data['username'], user_data['password'],
 | |
|                                    user_data['email'], auto_verify=not features.MAILING)
 | |
| 
 | |
|       # Handle any invite codes.
 | |
|       parsed_invite = parse_single_urn(invite_code)
 | |
|       if parsed_invite is not None:
 | |
|         if parsed_invite[0] == 'teaminvite':
 | |
|           # Add the user to the team.
 | |
|           try:
 | |
|             try_accept_invite(invite_code, new_user)
 | |
|           except model.DataModelException:
 | |
|             pass
 | |
| 
 | |
| 
 | |
|       if features.MAILING:
 | |
|         code = model.create_confirm_email_code(new_user)
 | |
|         send_confirmation_email(new_user.username, new_user.email, code.code)
 | |
|         return {
 | |
|           'awaiting_verification': True
 | |
|         }
 | |
|       else:
 | |
|         common_login(new_user)
 | |
|         return user_view(new_user)
 | |
| 
 | |
|     except model.TooManyUsersException as ex:
 | |
|       raise license_error(exception=ex)
 | |
|     except model.DataModelException as ex:
 | |
|       raise request_error(exception=ex)
 | |
| 
 | |
| @resource('/v1/user/private')
 | |
| @internal_only
 | |
| @show_if(features.BILLING)
 | |
| class PrivateRepositories(ApiResource):
 | |
|   """ Operations dealing with the available count of private repositories. """
 | |
|   @require_user_admin
 | |
|   @nickname('getUserPrivateAllowed')
 | |
|   def get(self):
 | |
|     """ Get the number of private repos this user has, and whether they are allowed to create more.
 | |
|     """
 | |
|     user = get_authenticated_user()
 | |
|     private_repos = model.get_private_repo_count(user.username)
 | |
|     repos_allowed = 0
 | |
| 
 | |
|     if user.stripe_id:
 | |
|       cus = stripe.Customer.retrieve(user.stripe_id)
 | |
|       if cus.subscription:
 | |
|         plan = get_plan(cus.subscription.plan.id)
 | |
|         if plan:
 | |
|           repos_allowed = plan['privateRepos']
 | |
| 
 | |
|     return {
 | |
|       'privateCount': private_repos,
 | |
|       'privateAllowed': (private_repos < repos_allowed)
 | |
|     }
 | |
| 
 | |
| 
 | |
| @resource('/v1/user/clientkey')
 | |
| @internal_only
 | |
| class ClientKey(ApiResource):
 | |
|   """ Operations for returning an encrypted key which can be used in place of a password
 | |
|       for the Docker client. """
 | |
|   schemas = {
 | |
|     'GenerateClientKey': {
 | |
|       'id': 'GenerateClientKey',
 | |
|       'type': 'object',
 | |
|       'required': [
 | |
|         'password',
 | |
|       ],
 | |
|       'properties': {
 | |
|         'password': {
 | |
|           'type': 'string',
 | |
|           'description': 'The user\'s password',
 | |
|         },
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   @require_user_admin
 | |
|   @nickname('generateUserClientKey')
 | |
|   @validate_json_request('GenerateClientKey')
 | |
|   def post(self):
 | |
|     """  Return's the user's private client key. """
 | |
|     username = get_authenticated_user().username
 | |
|     password = request.get_json()['password']
 | |
|     (result, error_message) = authentication.confirm_existing_user(username, password)
 | |
|     if not result:
 | |
|       raise request_error(message=error_message)
 | |
| 
 | |
|     return {
 | |
|       'key': authentication.encrypt_user_password(password)
 | |
|     }
 | |
| 
 | |
| 
 | |
| def conduct_signin(username_or_email, password):
 | |
|   needs_email_verification = False
 | |
|   invalid_credentials = False
 | |
| 
 | |
|   verified = None
 | |
|   try:
 | |
|     (verified, error_message) = authentication.verify_user(username_or_email, password)
 | |
|   except model.TooManyUsersException as ex:
 | |
|     raise license_error(exception=ex)
 | |
| 
 | |
|   if verified:
 | |
|     if common_login(verified):
 | |
|       return {'success': True}
 | |
|     else:
 | |
|       needs_email_verification = True
 | |
| 
 | |
|   else:
 | |
|     invalid_credentials = True
 | |
| 
 | |
|   return {
 | |
|     'needsEmailVerification': needs_email_verification,
 | |
|     'invalidCredentials': invalid_credentials,
 | |
|     'message': error_message
 | |
|   }, 403
 | |
| 
 | |
| 
 | |
| @resource('/v1/user/convert')
 | |
| @internal_only
 | |
| @show_if(app.config['AUTHENTICATION_TYPE'] == 'Database')
 | |
| class ConvertToOrganization(ApiResource):
 | |
|   """ Operations for converting a user to an organization. """
 | |
|   schemas = {
 | |
|     'ConvertUser': {
 | |
|       'id': 'ConvertUser',
 | |
|       'type': 'object',
 | |
|       'description': 'Information required to convert a user to an organization.',
 | |
|       'required': [
 | |
|         'adminUser',
 | |
|         'adminPassword'
 | |
|       ],
 | |
|       'properties': {
 | |
|         'adminUser': {
 | |
|           'type': 'string',
 | |
|           'description': 'The user who will become an org admin\'s username',
 | |
|         },
 | |
|         'adminPassword': {
 | |
|           'type': 'string',
 | |
|           'description': 'The user who will become an org admin\'s password',
 | |
|         },
 | |
|         'plan': {
 | |
|           'type': 'string',
 | |
|           'description': 'The plan to which the organization should be subscribed',
 | |
|         },
 | |
|       },
 | |
|     },
 | |
|   }
 | |
| 
 | |
|   @require_user_admin
 | |
|   @nickname('convertUserToOrganization')
 | |
|   @validate_json_request('ConvertUser')
 | |
|   def post(self):
 | |
|     """ Convert the user to an organization. """
 | |
|     user = get_authenticated_user()
 | |
|     convert_data = request.get_json()
 | |
| 
 | |
|     # Ensure that the sign in credentials work.
 | |
|     admin_username = convert_data['adminUser']
 | |
|     admin_password = convert_data['adminPassword']
 | |
|     (admin_user, error_message) =  authentication.verify_user(admin_username, admin_password)
 | |
|     if not admin_user:
 | |
|       raise request_error(reason='invaliduser',
 | |
|                            message='The admin user credentials are not valid')
 | |
| 
 | |
|     # Ensure that the new admin user is the not user being converted.
 | |
|     if admin_user.id == user.id:
 | |
|       raise request_error(reason='invaliduser',
 | |
|                            message='The admin user is not valid')
 | |
| 
 | |
|     # Subscribe the organization to the new plan.
 | |
|     if features.BILLING:
 | |
|       plan = convert_data.get('plan', 'free')
 | |
|       subscribe(user, plan, None, True)  # Require business plans
 | |
| 
 | |
|     # Convert the user to an organization.
 | |
|     model.convert_user_to_organization(user, admin_user)
 | |
|     log_action('account_convert', user.username)
 | |
| 
 | |
|     # And finally login with the admin credentials.
 | |
|     return conduct_signin(admin_username, admin_password)
 | |
| 
 | |
| 
 | |
| @resource('/v1/signin')
 | |
| @internal_only
 | |
| class Signin(ApiResource):
 | |
|   """ Operations for signing in the user. """
 | |
|   schemas = {
 | |
|     'SigninUser': {
 | |
|       'id': 'SigninUser',
 | |
|       'type': 'object',
 | |
|       'description': 'Information required to sign in a user.',
 | |
|       'required': [
 | |
|         'username',
 | |
|         'password',
 | |
|       ],
 | |
|       'properties': {
 | |
|         'username': {
 | |
|           'type': 'string',
 | |
|           'description': 'The user\'s username',
 | |
|         },
 | |
|         'password': {
 | |
|           'type': 'string',
 | |
|           'description': 'The user\'s password',
 | |
|         },
 | |
|       },
 | |
|     },
 | |
|   }
 | |
| 
 | |
|   @nickname('signinUser')
 | |
|   @validate_json_request('SigninUser')
 | |
|   @anon_allowed
 | |
|   def post(self):
 | |
|     """ Sign in the user with the specified credentials. """
 | |
|     signin_data = request.get_json()
 | |
|     if not signin_data:
 | |
|       raise NotFound()
 | |
| 
 | |
|     username = signin_data['username']
 | |
|     password = signin_data['password']
 | |
| 
 | |
|     return conduct_signin(username, password)
 | |
| 
 | |
| 
 | |
| @resource('/v1/signin/verify')
 | |
| @internal_only
 | |
| class VerifyUser(ApiResource):
 | |
|   """ Operations for verifying the existing user. """
 | |
|   schemas = {
 | |
|     'VerifyUser': {
 | |
|       'id': 'VerifyUser',
 | |
|       'type': 'object',
 | |
|       'description': 'Information required to verify the signed in user.',
 | |
|       'required': [
 | |
|         'password',
 | |
|       ],
 | |
|       'properties': {
 | |
|         'password': {
 | |
|           'type': 'string',
 | |
|           'description': 'The user\'s password',
 | |
|         },
 | |
|       },
 | |
|     },
 | |
|   }
 | |
| 
 | |
|   @require_user_admin
 | |
|   @nickname('verifyUser')
 | |
|   @validate_json_request('VerifyUser')
 | |
|   def post(self):
 | |
|     """ Verifies the signed in the user with the specified credentials. """
 | |
|     signin_data = request.get_json()
 | |
|     password = signin_data['password']
 | |
| 
 | |
|     username = get_authenticated_user().username
 | |
|     (result, error_message) = authentication.confirm_existing_user(username, password)
 | |
|     if not result:
 | |
|       return {
 | |
|         'message': error_message,
 | |
|         'invalidCredentials': True,
 | |
|       }, 403
 | |
| 
 | |
|     common_login(result)
 | |
|     return {'success': True}
 | |
| 
 | |
| 
 | |
| @resource('/v1/signout')
 | |
| @internal_only
 | |
| class Signout(ApiResource):
 | |
|   """ Resource for signing out users. """
 | |
|   @nickname('logout')
 | |
|   def post(self):
 | |
|     """ Request that the current user be signed out. """
 | |
|     logout_user()
 | |
|     identity_changed.send(app, identity=AnonymousIdentity())
 | |
|     return {'success': True}
 | |
| 
 | |
| 
 | |
| 
 | |
| @resource('/v1/detachexternal/<servicename>')
 | |
| @internal_only
 | |
| class DetachExternal(ApiResource):
 | |
|   """ Resource for detaching an external login. """
 | |
|   @require_user_admin
 | |
|   @nickname('detachExternalLogin')
 | |
|   def post(self, servicename):
 | |
|     """ Request that the current user be detached from the external login service. """
 | |
|     model.detach_external_login(get_authenticated_user(), servicename)
 | |
|     return {'success': True}
 | |
| 
 | |
| 
 | |
| @resource("/v1/recovery")
 | |
| @show_if(features.MAILING)
 | |
| @internal_only
 | |
| class Recovery(ApiResource):
 | |
|   """ Resource for requesting a password recovery email. """
 | |
|   schemas = {
 | |
|     'RequestRecovery': {
 | |
|       'id': 'RequestRecovery',
 | |
|       'type': 'object',
 | |
|       'description': 'Information required to sign in a user.',
 | |
|       'required': [
 | |
|         'email',
 | |
|       ],
 | |
|       'properties': {
 | |
|         'email': {
 | |
|           'type': 'string',
 | |
|           'description': 'The user\'s email address',
 | |
|         },
 | |
|       },
 | |
|     },
 | |
|   }
 | |
| 
 | |
|   @nickname('requestRecoveryEmail')
 | |
|   @validate_json_request('RequestRecovery')
 | |
|   def post(self):
 | |
|     """ Request a password recovery email."""
 | |
|     email = request.get_json()['email']
 | |
|     code = model.create_reset_password_email_code(email)
 | |
|     send_recovery_email(email, code.code)
 | |
|     return 'Created', 201
 | |
| 
 | |
| 
 | |
| @resource('/v1/user/notifications')
 | |
| @internal_only
 | |
| class UserNotificationList(ApiResource):
 | |
|   @require_user_admin
 | |
|   @parse_args
 | |
|   @query_param('page', 'Offset page number. (int)', type=int, default=0)
 | |
|   @query_param('limit', 'Limit on the number of results (int)', type=int, default=5)
 | |
|   @nickname('listUserNotifications')
 | |
|   def get(self, args):
 | |
|     page = args['page']
 | |
|     limit = args['limit']
 | |
| 
 | |
|     notifications = list(model.list_notifications(get_authenticated_user(), page=page, limit=limit + 1))
 | |
|     has_more = False
 | |
| 
 | |
|     if len(notifications) > limit:
 | |
|       has_more = True
 | |
|       notifications = notifications[0:limit]
 | |
| 
 | |
|     return {
 | |
|       'notifications': [notification_view(notification) for notification in notifications],
 | |
|       'additional': has_more
 | |
|     }
 | |
| 
 | |
| 
 | |
| @resource('/v1/user/notifications/<uuid>')
 | |
| @path_param('uuid', 'The uuid of the user notification')
 | |
| @internal_only
 | |
| class UserNotification(ApiResource):
 | |
|   schemas = {
 | |
|     'UpdateNotification': {
 | |
|       'id': 'UpdateNotification',
 | |
|       'type': 'object',
 | |
|       'description': 'Information for updating a notification',
 | |
|       'properties': {
 | |
|         'dismissed': {
 | |
|           'type': 'boolean',
 | |
|           'description': 'Whether the notification is dismissed by the user',
 | |
|         },
 | |
|       },
 | |
|     },
 | |
|   }
 | |
| 
 | |
|   @require_user_admin
 | |
|   @nickname('getUserNotification')
 | |
|   def get(self, uuid):
 | |
|     notification = model.lookup_notification(get_authenticated_user(), uuid)
 | |
|     if not notification:
 | |
|       raise NotFound()
 | |
| 
 | |
|     return notification_view(notification)
 | |
| 
 | |
|   @require_user_admin
 | |
|   @nickname('updateUserNotification')
 | |
|   @validate_json_request('UpdateNotification')
 | |
|   def put(self, uuid):
 | |
|     notification = model.lookup_notification(get_authenticated_user(), uuid)
 | |
|     if not notification:
 | |
|       raise NotFound()
 | |
| 
 | |
|     notification.dismissed = request.get_json().get('dismissed', False)
 | |
|     notification.save()
 | |
| 
 | |
|     return notification_view(notification)
 | |
| 
 | |
| 
 | |
| def authorization_view(access_token):
 | |
|   oauth_app = access_token.application
 | |
|   app_email = oauth_app.avatar_email or oauth_app.organization.email
 | |
|   return {
 | |
|     'application': {
 | |
|       'name': oauth_app.name,
 | |
|       'description': oauth_app.description,
 | |
|       'url': oauth_app.application_uri,
 | |
|       'avatar': avatar.get_data(oauth_app.name, app_email, 'app'),
 | |
|       'organization': {
 | |
|         'name': oauth_app.organization.username,
 | |
|         'avatar': avatar.get_data_for_org(oauth_app.organization)
 | |
|       }
 | |
|     },
 | |
|     'scopes': scopes.get_scope_information(access_token.scope),
 | |
|     'uuid': access_token.uuid
 | |
|   }
 | |
| 
 | |
| @resource('/v1/user/authorizations')
 | |
| @internal_only
 | |
| class UserAuthorizationList(ApiResource):
 | |
|   @require_user_admin
 | |
|   @nickname('listUserAuthorizations')
 | |
|   def get(self):
 | |
|     access_tokens = model.oauth.list_access_tokens_for_user(get_authenticated_user())
 | |
| 
 | |
|     return {
 | |
|       'authorizations': [authorization_view(token) for token in access_tokens]
 | |
|     }
 | |
| 
 | |
| 
 | |
| @resource('/v1/user/authorizations/<access_token_uuid>')
 | |
| @path_param('access_token_uuid', 'The uuid of the access token')
 | |
| @internal_only
 | |
| class UserAuthorization(ApiResource):
 | |
|   @require_user_admin
 | |
|   @nickname('getUserAuthorization')
 | |
|   def get(self, access_token_uuid):
 | |
|     access_token = model.oauth.lookup_access_token_for_user(get_authenticated_user(),
 | |
|                                                             access_token_uuid)
 | |
|     if not access_token:
 | |
|       raise NotFound()
 | |
| 
 | |
|     return authorization_view(access_token)
 | |
| 
 | |
|   @require_user_admin
 | |
|   @nickname('deleteUserAuthorization')
 | |
|   def delete(self, access_token_uuid):
 | |
|     access_token = model.oauth.lookup_access_token_for_user(get_authenticated_user(),
 | |
|                                                             access_token_uuid)
 | |
|     if not access_token:
 | |
|       raise NotFound()
 | |
| 
 | |
|     access_token.delete_instance(recursive=True, delete_nullable=True)
 | |
|     return 'Deleted', 204
 | |
| 
 | |
| @resource('/v1/user/starred')
 | |
| class StarredRepositoryList(ApiResource):
 | |
|   """ Operations for creating and listing starred repositories. """
 | |
|   schemas = {
 | |
|     'NewStarredRepository': {
 | |
|       'id': 'NewStarredRepository',
 | |
|       'type': 'object',
 | |
|       'required': [
 | |
|         'namespace',
 | |
|         'repository',
 | |
|       ],
 | |
|       'properties': {
 | |
|         'namespace': {
 | |
|           'type': 'string',
 | |
|           'description': 'Namespace in which the repository belongs',
 | |
|         },
 | |
|         'repository': {
 | |
|           'type': 'string',
 | |
|           'description': 'Repository name'
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   @nickname('listStarredRepos')
 | |
|   @parse_args
 | |
|   @query_param('page', 'Offset page number. (int)', type=int)
 | |
|   @query_param('limit', 'Limit on the number of results (int)', type=int)
 | |
|   @require_user_admin
 | |
|   def get(self, args):
 | |
|     """ List all starred repositories. """
 | |
|     page = args['page']
 | |
|     limit = args['limit']
 | |
|     starred_repos = model.get_user_starred_repositories(get_authenticated_user(),
 | |
|                                                         page=page,
 | |
|                                                         limit=limit)
 | |
|     def repo_view(repo_obj):
 | |
|       return {
 | |
|         'namespace': repo_obj.namespace_user.username,
 | |
|         'name': repo_obj.name,
 | |
|         'description': repo_obj.description,
 | |
|         'is_public': repo_obj.visibility.name == 'public',
 | |
|       }
 | |
| 
 | |
|     return {'repositories': [repo_view(repo) for repo in starred_repos]}
 | |
| 
 | |
|   @require_scope(scopes.READ_REPO)
 | |
|   @nickname('createStar')
 | |
|   @validate_json_request('NewStarredRepository')
 | |
|   @require_user_admin
 | |
|   def post(self):
 | |
|     """ Star a repository. """
 | |
|     user = get_authenticated_user()
 | |
|     req = request.get_json()
 | |
|     namespace = req['namespace']
 | |
|     repository = req['repository']
 | |
|     repo = model.get_repository(namespace, repository)
 | |
| 
 | |
|     if repo:
 | |
|       try:
 | |
|         model.star_repository(user, repo)
 | |
|       except IntegrityError:
 | |
|         pass
 | |
| 
 | |
|       return {
 | |
|         'namespace': namespace,
 | |
|         'repository': repository,
 | |
|       }, 201
 | |
| 
 | |
| 
 | |
| @resource('/v1/user/starred/<repopath:repository>')
 | |
| @path_param('repository', 'The full path of the repository. e.g. namespace/name')
 | |
| class StarredRepository(RepositoryParamResource):
 | |
|   """ Operations for managing a specific starred repository. """
 | |
|   @nickname('deleteStar')
 | |
|   @require_user_admin
 | |
|   def delete(self, namespace, repository):
 | |
|     """ Removes a star from a repository. """
 | |
|     user = get_authenticated_user()
 | |
|     repo = model.get_repository(namespace, repository)
 | |
| 
 | |
|     if repo:
 | |
|       model.unstar_repository(user, repo)
 | |
|       return 'Deleted', 204
 | |
| 
 | |
| 
 | |
| @resource('/v1/users/<username>')
 | |
| class Users(ApiResource):
 | |
|   """ Operations related to retrieving information about other users. """
 | |
|   @nickname('getUserInformation')
 | |
|   def get(self, username):
 | |
|     """ Get user information for the specified user. """
 | |
|     user = model.get_nonrobot_user(username)
 | |
|     if user is None:
 | |
|       abort(404)
 | |
| 
 | |
|     return user_view(user)
 | |
| 
 |