2017-12-08 22:05:59 +00:00
|
|
|
import logging
|
|
|
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
from cachetools import lru_cache
|
|
|
|
|
|
|
|
from data.database import AppSpecificAuthToken, User, db_transaction
|
|
|
|
from data.model import config
|
2018-03-13 00:30:19 +00:00
|
|
|
from data.model._basequery import update_last_accessed
|
2017-12-08 22:05:59 +00:00
|
|
|
from util.timedeltastring import convert_to_timedelta
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
2017-12-12 22:51:54 +00:00
|
|
|
|
2018-02-02 20:14:43 +00:00
|
|
|
def _default_expiration_duration():
|
2017-12-08 22:05:59 +00:00
|
|
|
expiration_str = config.app_config.get('APP_SPECIFIC_TOKEN_EXPIRATION')
|
2018-02-02 20:14:43 +00:00
|
|
|
return convert_to_timedelta(expiration_str) if expiration_str else None
|
2017-12-08 22:05:59 +00:00
|
|
|
|
|
|
|
|
2017-12-12 22:51:54 +00:00
|
|
|
# Define a "unique" value so that callers can specifiy an expiration of None and *not* have it
|
|
|
|
# use the default.
|
2018-02-02 20:14:43 +00:00
|
|
|
_default_expiration_duration_opt = '__deo'
|
2017-12-12 22:51:54 +00:00
|
|
|
|
2018-02-02 20:14:43 +00:00
|
|
|
def create_token(user, title, expiration=_default_expiration_duration_opt):
|
2017-12-08 22:05:59 +00:00
|
|
|
""" Creates and returns an app specific token for the given user. If no expiration is specified
|
|
|
|
(including `None`), then the default from config is used. """
|
2018-02-02 20:14:43 +00:00
|
|
|
if expiration == _default_expiration_duration_opt:
|
|
|
|
duration = _default_expiration_duration()
|
|
|
|
expiration = duration + datetime.now() if duration else None
|
|
|
|
|
2017-12-08 22:05:59 +00:00
|
|
|
return AppSpecificAuthToken.create(user=user, title=title, expiration=expiration)
|
|
|
|
|
|
|
|
|
|
|
|
def list_tokens(user):
|
|
|
|
""" Lists all tokens for the given user. """
|
|
|
|
return AppSpecificAuthToken.select().where(AppSpecificAuthToken.user == user)
|
|
|
|
|
|
|
|
|
|
|
|
def revoke_token(token):
|
|
|
|
""" Revokes an app specific token by deleting it. """
|
|
|
|
token.delete_instance()
|
|
|
|
|
|
|
|
|
2018-01-23 16:40:51 +00:00
|
|
|
def revoke_token_by_uuid(uuid, owner):
|
|
|
|
""" Revokes an app specific token by deleting it. """
|
|
|
|
try:
|
|
|
|
token = AppSpecificAuthToken.get(uuid=uuid, user=owner)
|
|
|
|
except AppSpecificAuthToken.DoesNotExist:
|
|
|
|
return None
|
|
|
|
|
|
|
|
revoke_token(token)
|
|
|
|
return token
|
|
|
|
|
|
|
|
|
2017-12-08 22:05:59 +00:00
|
|
|
def get_expiring_tokens(user, soon):
|
|
|
|
""" Returns all tokens owned by the given user that will be expiring "soon", where soon is defined
|
|
|
|
by the soon parameter (a timedelta from now).
|
|
|
|
"""
|
|
|
|
soon_datetime = datetime.now() + soon
|
|
|
|
return (AppSpecificAuthToken
|
|
|
|
.select()
|
|
|
|
.where(AppSpecificAuthToken.user == user,
|
2018-02-12 22:07:50 +00:00
|
|
|
AppSpecificAuthToken.expiration <= soon_datetime,
|
|
|
|
AppSpecificAuthToken.expiration > datetime.now()))
|
2017-12-08 22:05:59 +00:00
|
|
|
|
|
|
|
|
2018-02-12 19:56:01 +00:00
|
|
|
def gc_expired_tokens(expiration_window):
|
|
|
|
""" Deletes all expired tokens outside of the expiration window. """
|
2017-12-08 22:05:59 +00:00
|
|
|
(AppSpecificAuthToken
|
|
|
|
.delete()
|
2018-02-12 19:56:01 +00:00
|
|
|
.where(AppSpecificAuthToken.expiration < (datetime.now() - expiration_window))
|
2017-12-08 22:05:59 +00:00
|
|
|
.execute())
|
|
|
|
|
|
|
|
|
|
|
|
def get_token_by_uuid(uuid, owner=None):
|
|
|
|
""" Looks up an unexpired app specific token with the given uuid. Returns it if found or
|
|
|
|
None if none. If owner is specified, only tokens owned by the owner user will be
|
|
|
|
returned.
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
query = (AppSpecificAuthToken
|
|
|
|
.select()
|
|
|
|
.where(AppSpecificAuthToken.uuid == uuid,
|
|
|
|
((AppSpecificAuthToken.expiration > datetime.now()) |
|
|
|
|
(AppSpecificAuthToken.expiration >> None))))
|
|
|
|
if owner is not None:
|
|
|
|
query = query.where(AppSpecificAuthToken.user == owner)
|
|
|
|
|
|
|
|
return query.get()
|
|
|
|
except AppSpecificAuthToken.DoesNotExist:
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def access_valid_token(token_code):
|
|
|
|
""" Looks up an unexpired app specific token with the given token code. If found, the token's
|
|
|
|
last_accessed field is set to now and the token is returned. If not found, returns None.
|
|
|
|
"""
|
|
|
|
with db_transaction():
|
|
|
|
try:
|
|
|
|
token = (AppSpecificAuthToken
|
|
|
|
.select(AppSpecificAuthToken, User)
|
|
|
|
.join(User)
|
|
|
|
.where(AppSpecificAuthToken.token_code == token_code,
|
|
|
|
((AppSpecificAuthToken.expiration > datetime.now()) |
|
|
|
|
(AppSpecificAuthToken.expiration >> None)))
|
|
|
|
.get())
|
2018-03-13 00:30:19 +00:00
|
|
|
update_last_accessed(token)
|
|
|
|
return token
|
2017-12-08 22:05:59 +00:00
|
|
|
except AppSpecificAuthToken.DoesNotExist:
|
|
|
|
return None
|