Add admin object page (expanded note)

This commit is contained in:
Thomas Sileo 2022-06-25 12:29:35 +02:00
parent ba1d3657b9
commit 1f67d4e71c
4 changed files with 97 additions and 80 deletions

View file

@ -1,5 +1,7 @@
"""Actions related to the AP inbox/outbox."""
import uuid
from collections import defaultdict
from dataclasses import dataclass
from urllib.parse import urlparse
import httpx
@ -732,3 +734,78 @@ def fetch_collection(db: Session, url: str) -> list[ap.RawObject]:
raise ValueError(f"internal collection for {url}) not supported")
return ap.parse_collection(url)
@dataclass
class ReplyTreeNode:
ap_object: AnyboxObject
children: list["ReplyTreeNode"]
is_requested: bool = False
is_root: bool = False
def get_replies_tree(
db: Session,
requested_object: AnyboxObject,
) -> ReplyTreeNode:
# TODO: handle visibility
tree_nodes: list[AnyboxObject] = []
tree_nodes.extend(
db.query(models.InboxObject)
.filter(
models.InboxObject.ap_context == requested_object.ap_context,
)
.all()
)
tree_nodes.extend(
db.query(models.OutboxObject)
.filter(
models.OutboxObject.ap_context == requested_object.ap_context,
models.OutboxObject.is_deleted.is_(False),
)
.all()
)
nodes_by_in_reply_to = defaultdict(list)
for node in tree_nodes:
nodes_by_in_reply_to[node.in_reply_to].append(node)
logger.info(nodes_by_in_reply_to)
# TODO: get oldest if we cannot get to root?
if len(nodes_by_in_reply_to.get(None, [])) != 1:
raise ValueError("Failed to compute replies tree")
def _get_reply_node_children(
node: ReplyTreeNode,
index: defaultdict[str | None, list[AnyboxObject]],
) -> list[ReplyTreeNode]:
children = []
for child in index.get(node.ap_object.ap_id, []): # type: ignore
child_node = ReplyTreeNode(
ap_object=child,
is_requested=child.ap_id == requested_object.ap_id, # type: ignore
children=[],
)
child_node.children = _get_reply_node_children(child_node, index)
children.append(child_node)
return sorted(
children,
key=lambda node: node.ap_object.ap_published_at, # type: ignore
)
if None in nodes_by_in_reply_to:
root_ap_object = nodes_by_in_reply_to[None][0]
else:
root_ap_object = sorted(
tree_nodes,
lambda ap_obj: ap_obj.ap_published_at, # type: ignore
)[0]
root_node = ReplyTreeNode(
ap_object=root_ap_object,
is_root=True,
is_requested=root_ap_object.ap_id == requested_object.ap_id,
children=[],
)
root_node.children = _get_reply_node_children(root_node, nodes_by_in_reply_to)
return root_node