Cleanup utils

This commit is contained in:
Thomas Sileo 2022-06-22 21:15:07 +02:00
parent b7b04cad03
commit f9f9e62e13
13 changed files with 21 additions and 13 deletions

0
app/utils/__init__.py Normal file
View file

27
app/utils/highlight.py Normal file
View file

@ -0,0 +1,27 @@
from functools import lru_cache
from bs4 import BeautifulSoup # type: ignore
from pygments import highlight as phighlight # type: ignore
from pygments.formatters import HtmlFormatter # type: ignore
from pygments.lexers import guess_lexer # type: ignore
_FORMATTER = HtmlFormatter(style="vim")
HIGHLIGHT_CSS = _FORMATTER.get_style_defs()
@lru_cache(256)
def highlight(html: str) -> str:
soup = BeautifulSoup(html, "html5lib")
for code in soup.find_all("code"):
if not code.parent.name == "pre":
continue
lexer = guess_lexer(code.text)
tag = BeautifulSoup(
phighlight(code.text, lexer, _FORMATTER), "html5lib"
).body.next
pre = code.parent
pre.replaceWith(tag)
out = soup.body
out.name = "div"
return str(out)

90
app/utils/opengraph.py Normal file
View file

@ -0,0 +1,90 @@
import mimetypes
import re
from urllib.parse import urlparse
import httpx
from bs4 import BeautifulSoup # type: ignore
from pydantic import BaseModel
from app import activitypub as ap
from app import config
from app.utils.url import is_url_valid
class OpenGraphMeta(BaseModel):
url: str
title: str
image: str
description: str
site_name: str
def _scrap_og_meta(html: str) -> OpenGraphMeta | None:
soup = BeautifulSoup(html, "html5lib")
ogs = {
og.attrs["property"]: og.attrs.get("content")
for og in soup.html.head.findAll(property=re.compile(r"^og"))
}
raw = {}
for field in OpenGraphMeta.__fields__.keys():
og_field = f"og:{field}"
if not ogs.get(og_field):
return None
raw[field] = ogs[og_field]
return OpenGraphMeta.parse_obj(raw)
def _urls_from_note(note: ap.RawObject) -> set[str]:
note_host = urlparse(ap.get_id(note["id"]) or "").netloc
urls = set()
if "content" in note:
soup = BeautifulSoup(note["content"], "html5lib")
for link in soup.find_all("a"):
h = link.get("href")
ph = urlparse(h)
mimetype, _ = mimetypes.guess_type(h)
if (
ph.scheme in {"http", "https"}
and ph.netloc != note_host
and is_url_valid(h)
and (
not mimetype
or mimetype.split("/")[0] in ["image", "video", "audio"]
)
):
urls.add(h)
return urls
def _og_meta_from_url(url: str) -> OpenGraphMeta | None:
resp = httpx.get(
url,
headers={
"User-Agent": config.USER_AGENT,
},
follow_redirects=True,
)
resp.raise_for_status()
if not (ct := resp.headers.get("content-type")) or not ct.startswith("text/html"):
return None
return _scrap_og_meta(resp.text)
def og_meta_from_note(note: ap.RawObject) -> list[OpenGraphMeta]:
og_meta = []
urls = _urls_from_note(note)
for url in urls:
try:
maybe_og_meta = _og_meta_from_url(url)
if maybe_og_meta:
og_meta.append(maybe_og_meta)
except httpx.HTTPError:
pass
return og_meta

61
app/utils/url.py Normal file
View file

@ -0,0 +1,61 @@
import functools
import ipaddress
import socket
from urllib.parse import urlparse
from loguru import logger
from app.config import DEBUG
class InvalidURLError(Exception):
pass
@functools.lru_cache
def _getaddrinfo(hostname: str, port: int) -> str:
try:
ip_address = str(ipaddress.ip_address(hostname))
except ValueError:
try:
ip_address = socket.getaddrinfo(hostname, port)[0][4][0]
logger.debug(f"DNS lookup: {hostname} -> {ip_address}")
except socket.gaierror:
logger.exception(f"failed to lookup addr info for {hostname}")
raise
return ip_address
def is_url_valid(url: str) -> bool:
"""Implements basic SSRF protection."""
parsed = urlparse(url)
if parsed.scheme not in ["http", "https"]:
return False
# XXX in debug mode, we want to allow requests to localhost to test the
# federation with local instances
if DEBUG: # pragma: no cover
return True
if not parsed.hostname or parsed.hostname.lower() in ["localhost"]:
return False
ip_address = _getaddrinfo(
parsed.hostname, parsed.port or (80 if parsed.scheme == "http" else 443)
)
logger.debug(f"{ip_address=}")
if ipaddress.ip_address(ip_address).is_private:
logger.info(f"rejecting private URL {url} -> {ip_address}")
return False
return True
def check_url(url: str, debug: bool = False) -> None:
logger.debug(f"check_url {url=}")
if not is_url_valid(url):
raise InvalidURLError(f'"{url}" is invalid')
return None