Support for processing Questions answers/votes

This commit is contained in:
Thomas Sileo 2022-07-24 10:50:58 +02:00
parent f834596197
commit 3d5a86d51e
5 changed files with 184 additions and 9 deletions

View file

@ -475,7 +475,7 @@ async def send_question(
"tag": tags,
"votersCount": 0,
"endTime": (now() + timedelta(minutes=5)).isoformat().replace("+00:00", "Z"),
"anyOf": [
"oneOf": [
{
"type": "Note",
"name": "A",
@ -1027,13 +1027,22 @@ async def _process_note_object(
)
if replied_object:
if replied_object.is_from_outbox:
await db_session.execute(
update(models.OutboxObject)
.where(
models.OutboxObject.id == replied_object.id,
if replied_object.ap_type == "Question" and inbox_object.ap_object.get(
"name"
):
await _handle_vote_answer(
db_session,
inbox_object,
replied_object, # type: ignore # outbox check below
)
else:
await db_session.execute(
update(models.OutboxObject)
.where(
models.OutboxObject.id == replied_object.id,
)
.values(replies_count=models.OutboxObject.replies_count + 1)
)
.values(replies_count=models.OutboxObject.replies_count + 1)
)
else:
await db_session.execute(
update(models.InboxObject)
@ -1049,6 +1058,7 @@ async def _process_note_object(
parent_activity.ap_type == "Create"
and replied_object
and replied_object.is_from_outbox
and replied_object.ap_type != "Question"
and parent_activity.has_ld_signature
):
logger.info("Forwarding Create activity as it's a local reply")
@ -1070,6 +1080,82 @@ async def _process_note_object(
db_session.add(notif)
async def _handle_vote_answer(
db_session: AsyncSession,
answer: models.InboxObject,
question: models.OutboxObject,
) -> None:
logger.info(f"Processing poll answer for {question.ap_id}: {answer.ap_id}")
if question.is_poll_ended:
logger.warning("Poll is ended, discarding answer")
return
if not question.poll_items:
raise ValueError("Should never happen")
answer_name = answer.ap_object["name"]
if answer_name not in {pi["name"] for pi in question.poll_items}:
logger.warning(f"Invalid answer {answer_name=}")
return
poll_answer = models.PollAnswer(
outbox_object_id=question.id,
poll_type="oneOf" if question.is_one_of_poll else "anyOf",
inbox_object_id=answer.id,
actor_id=answer.actor.id,
name=answer_name,
)
db_session.add(poll_answer)
await db_session.flush()
voters_count = await db_session.scalar(
select(func.count(func.distinct(models.PollAnswer.actor_id))).where(
models.PollAnswer.outbox_object_id == question.id
)
)
all_answers = await db_session.execute(
select(
func.count(models.PollAnswer.name).label("answer_count"),
models.PollAnswer.name,
)
.where(models.PollAnswer.outbox_object_id == question.id)
.group_by(models.PollAnswer.name)
)
all_answers_count = {a["name"]: a["answer_count"] for a in all_answers}
logger.info(f"{voters_count=}")
logger.info(f"{all_answers_count=}")
question_ap_object = dict(question.ap_object)
question_ap_object["votersCount"] = voters_count
items_key = "oneOf" if question.is_one_of_poll else "anyOf"
question_ap_object[items_key] = [
{
"type": "Note",
"name": item["name"],
"replies": {
"type": "Collection",
"totalItems": all_answers_count.get(item["name"], 0),
},
}
for item in question.poll_items
]
updated = now().replace(microsecond=0).isoformat().replace("+00:00", "Z")
question_ap_object["updated"] = updated
question.ap_object = question_ap_object
logger.info(f"Updated question: {question.ap_object}")
await db_session.flush()
# Finally send an update
recipients = await _compute_recipients(db_session, question.ap_object)
for rcp in recipients:
await new_outgoing_activity(db_session, rcp, question.id)
async def _process_transient_object(
db_session: AsyncSession,
raw_object: ap.RawObject,