More AP C2S support

This commit is contained in:
Thomas Sileo 2022-12-16 20:20:40 +01:00
parent 5cf54c2782
commit 7b506f2519
3 changed files with 93 additions and 7 deletions

View file

@ -13,6 +13,7 @@ from fastapi.responses import JSONResponse
from loguru import logger
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.orm import joinedload
from app import config
from app import models
@ -115,7 +116,7 @@ async def indieauth_authorization_endpoint(
"url": registered_client.client_uri,
}
else:
client = await indieauth.get_client_id_data(client_id)
client = await indieauth.get_client_id_data(client_id) # type: ignore
return await templates.render_template(
db_session,
@ -321,8 +322,10 @@ async def _check_access_token(
) -> tuple[bool, models.IndieAuthAccessToken | None]:
access_token_info = (
await db_session.scalars(
select(models.IndieAuthAccessToken).where(
models.IndieAuthAccessToken.access_token == token
select(models.IndieAuthAccessToken)
.where(models.IndieAuthAccessToken.access_token == token)
.options(
joinedload(models.IndieAuthAccessToken.indieauth_authorization_request)
)
)
).one_or_none()
@ -345,6 +348,7 @@ async def _check_access_token(
@dataclass(frozen=True)
class AccessTokenInfo:
scopes: list[str]
client_id: str | None
async def verify_access_token(
@ -371,9 +375,57 @@ async def verify_access_token(
return AccessTokenInfo(
scopes=access_token.scope.split(),
client_id=(
access_token.indieauth_authorization_request.client_id
if access_token.indieauth_authorization_request
else None
),
)
async def check_access_token(
request: Request,
db_session: AsyncSession = Depends(get_db_session),
) -> AccessTokenInfo | None:
token = request.headers.get("Authorization", "").removeprefix("Bearer ")
if not token:
return None
is_token_valid, access_token = await _check_access_token(db_session, token)
if not is_token_valid:
return None
if not access_token or not access_token.scope:
raise ValueError("Should never happen")
access_token_info = AccessTokenInfo(
scopes=access_token.scope.split(),
client_id=(
access_token.indieauth_authorization_request.client_id
if access_token.indieauth_authorization_request
else None
),
)
logger.info(
"Authenticated with access token from client_id="
f"{access_token_info.client_id} scopes={access_token.scope}"
)
return access_token_info
async def enforce_access_token(
request: Request,
db_session: AsyncSession = Depends(get_db_session),
) -> AccessTokenInfo:
maybe_access_token_info = await check_access_token(request, db_session)
if not maybe_access_token_info:
raise HTTPException(status_code=401, detail="access token required")
return maybe_access_token_info
@router.post("/revoke_token")
async def indieauth_revocation_endpoint(
request: Request,