2014-03-25 18:32:26 +00:00
|
|
|
import logging
|
2014-03-25 21:51:22 +00:00
|
|
|
import os
|
|
|
|
import base64
|
2014-03-25 18:32:26 +00:00
|
|
|
|
|
|
|
from flask import session, request
|
|
|
|
from functools import wraps
|
|
|
|
|
|
|
|
from app import app
|
|
|
|
from auth.auth_context import get_validated_oauth_token
|
|
|
|
from util.http import abort
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
2016-12-08 21:11:57 +00:00
|
|
|
OAUTH_CSRF_TOKEN_NAME = '_oauth_csrf_token'
|
|
|
|
_QUAY_CSRF_TOKEN_NAME = '_csrf_token'
|
|
|
|
|
|
|
|
def generate_csrf_token(session_token_name=_QUAY_CSRF_TOKEN_NAME):
|
|
|
|
""" If not present in the session, generates a new CSRF token with the given name
|
|
|
|
and places it into the session. Returns the generated token.
|
|
|
|
"""
|
|
|
|
if session_token_name not in session:
|
|
|
|
session[session_token_name] = base64.b64encode(os.urandom(48))
|
|
|
|
|
|
|
|
return session[session_token_name]
|
|
|
|
|
|
|
|
|
|
|
|
def verify_csrf(session_token_name=_QUAY_CSRF_TOKEN_NAME,
|
|
|
|
request_token_name=_QUAY_CSRF_TOKEN_NAME):
|
|
|
|
""" Verifies that the CSRF token with the given name is found in the session and
|
|
|
|
that the matching token is found in the request args or values.
|
|
|
|
"""
|
|
|
|
token = session.get(session_token_name, None)
|
|
|
|
found_token = request.values.get(request_token_name, None)
|
2014-12-23 19:01:00 +00:00
|
|
|
if not token or token != found_token:
|
2016-12-08 21:11:57 +00:00
|
|
|
msg = 'CSRF Failure. Session token (%s) was %s and request token (%s) was %s'
|
|
|
|
logger.error(msg, session_token_name, token, request_token_name, found_token)
|
2014-12-23 19:01:00 +00:00
|
|
|
abort(403, message='CSRF token was invalid or missing.')
|
2014-03-25 18:32:26 +00:00
|
|
|
|
|
|
|
|
2016-12-08 21:11:57 +00:00
|
|
|
def csrf_protect(session_token_name=_QUAY_CSRF_TOKEN_NAME,
|
|
|
|
request_token_name=_QUAY_CSRF_TOKEN_NAME,
|
|
|
|
all_methods=False):
|
|
|
|
def inner(func):
|
|
|
|
@wraps(func)
|
|
|
|
def wrapper(*args, **kwargs):
|
|
|
|
oauth_token = get_validated_oauth_token()
|
|
|
|
if oauth_token is None:
|
|
|
|
if all_methods or (request.method != "GET" and request.method != "HEAD"):
|
|
|
|
verify_csrf(session_token_name, request_token_name)
|
|
|
|
|
|
|
|
return func(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
return inner
|
2014-03-25 18:32:26 +00:00
|
|
|
|
|
|
|
|
2014-11-24 21:07:38 +00:00
|
|
|
app.jinja_env.globals['csrf_token'] = generate_csrf_token
|