112 lines
3.7 KiB
Python
112 lines
3.7 KiB
Python
import logging
|
|
|
|
from datetime import datetime
|
|
|
|
from cachetools import lru_cache
|
|
from peewee import PeeweeException
|
|
|
|
from data.database import AppSpecificAuthToken, User, db_transaction
|
|
from data.model import config
|
|
from util.timedeltastring import convert_to_timedelta
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def _default_expiration():
|
|
expiration_str = config.app_config.get('APP_SPECIFIC_TOKEN_EXPIRATION')
|
|
return datetime.now() + convert_to_timedelta(expiration_str) if expiration_str else expiration_str
|
|
|
|
|
|
# Define a "unique" value so that callers can specifiy an expiration of None and *not* have it
|
|
# use the default.
|
|
_default_expiration_opt = '__deo'
|
|
|
|
def create_token(user, title, expiration=_default_expiration_opt):
|
|
""" Creates and returns an app specific token for the given user. If no expiration is specified
|
|
(including `None`), then the default from config is used. """
|
|
expiration = expiration if expiration != _default_expiration_opt else _default_expiration()
|
|
return AppSpecificAuthToken.create(user=user, title=title, expiration=expiration)
|
|
|
|
|
|
def list_tokens(user):
|
|
""" Lists all tokens for the given user. """
|
|
return AppSpecificAuthToken.select().where(AppSpecificAuthToken.user == user)
|
|
|
|
|
|
def revoke_token(token):
|
|
""" Revokes an app specific token by deleting it. """
|
|
token.delete_instance()
|
|
|
|
|
|
def get_expiring_tokens(user, soon):
|
|
""" Returns all tokens owned by the given user that will be expiring "soon", where soon is defined
|
|
by the soon parameter (a timedelta from now).
|
|
"""
|
|
soon_datetime = datetime.now() + soon
|
|
return (AppSpecificAuthToken
|
|
.select()
|
|
.where(AppSpecificAuthToken.user == user,
|
|
AppSpecificAuthToken.expiration <= soon_datetime))
|
|
|
|
|
|
def gc_expired_tokens(user):
|
|
""" Deletes all expired tokens owned by the given user. """
|
|
(AppSpecificAuthToken
|
|
.delete()
|
|
.where(AppSpecificAuthToken.user == user, AppSpecificAuthToken.expiration < datetime.now())
|
|
.execute())
|
|
|
|
|
|
def get_token_by_uuid(uuid, owner=None):
|
|
""" Looks up an unexpired app specific token with the given uuid. Returns it if found or
|
|
None if none. If owner is specified, only tokens owned by the owner user will be
|
|
returned.
|
|
"""
|
|
try:
|
|
query = (AppSpecificAuthToken
|
|
.select()
|
|
.where(AppSpecificAuthToken.uuid == uuid,
|
|
((AppSpecificAuthToken.expiration > datetime.now()) |
|
|
(AppSpecificAuthToken.expiration >> None))))
|
|
if owner is not None:
|
|
query = query.where(AppSpecificAuthToken.user == owner)
|
|
|
|
return query.get()
|
|
except AppSpecificAuthToken.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def access_valid_token(token_code):
|
|
""" Looks up an unexpired app specific token with the given token code. If found, the token's
|
|
last_accessed field is set to now and the token is returned. If not found, returns None.
|
|
"""
|
|
with db_transaction():
|
|
try:
|
|
token = (AppSpecificAuthToken
|
|
.select(AppSpecificAuthToken, User)
|
|
.join(User)
|
|
.where(AppSpecificAuthToken.token_code == token_code,
|
|
((AppSpecificAuthToken.expiration > datetime.now()) |
|
|
(AppSpecificAuthToken.expiration >> None)))
|
|
.get())
|
|
except AppSpecificAuthToken.DoesNotExist:
|
|
return None
|
|
|
|
token.last_accessed = datetime.now()
|
|
|
|
try:
|
|
token.save()
|
|
except PeeweeException as ex:
|
|
strict_logging_disabled = config.app_config.get('ALLOW_PULLS_WITHOUT_STRICT_LOGGING')
|
|
if strict_logging_disabled:
|
|
data = {
|
|
'exception': ex,
|
|
'token': token.id,
|
|
}
|
|
|
|
logger.exception('update last_accessed for token failed', extra=data)
|
|
else:
|
|
raise
|
|
|
|
return token
|