Formatting
This commit is contained in:
parent
7570f3c03d
commit
e2e29709ce
3 changed files with 319 additions and 1 deletions
|
@ -14,7 +14,6 @@ from little_boxes.httpsig import HTTPSigAuth
|
||||||
from requests.exceptions import HTTPError
|
from requests.exceptions import HTTPError
|
||||||
|
|
||||||
import config
|
import config
|
||||||
from core.shared import _Response
|
|
||||||
from config import DB
|
from config import DB
|
||||||
from core import gc
|
from core import gc
|
||||||
from core.activitypub import Box
|
from core.activitypub import Box
|
||||||
|
@ -25,6 +24,7 @@ from core.notifications import set_inbox_flags
|
||||||
from core.outbox import process_outbox
|
from core.outbox import process_outbox
|
||||||
from core.shared import MY_PERSON
|
from core.shared import MY_PERSON
|
||||||
from core.shared import _add_answers_to_question
|
from core.shared import _add_answers_to_question
|
||||||
|
from core.shared import _Response
|
||||||
from core.shared import back
|
from core.shared import back
|
||||||
from core.shared import p
|
from core.shared import p
|
||||||
from core.shared import post_to_outbox
|
from core.shared import post_to_outbox
|
||||||
|
|
181
core/inbox.py
Normal file
181
core/inbox.py
Normal file
|
@ -0,0 +1,181 @@
|
||||||
|
import logging
|
||||||
|
from functools import singledispatch
|
||||||
|
from typing import Any
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
from little_boxes import activitypub as ap
|
||||||
|
from little_boxes.errors import NotAnActivityError
|
||||||
|
|
||||||
|
import config
|
||||||
|
from core.activitypub import _answer_key
|
||||||
|
from core.db import DB
|
||||||
|
from core.meta import Box
|
||||||
|
from core.shared import MY_PERSON
|
||||||
|
from core.shared import back
|
||||||
|
from core.shared import post_to_outbox
|
||||||
|
from core.tasks import Tasks
|
||||||
|
from utils import now
|
||||||
|
|
||||||
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_NewMeta = Dict[str, Any]
|
||||||
|
|
||||||
|
|
||||||
|
@singledispatch
|
||||||
|
def process_inbox(activity: ap.BaseActivity, new_meta: _NewMeta) -> None:
|
||||||
|
_logger.warning(f"skipping {activity!r}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@process_inbox.register
|
||||||
|
def _delete_process_inbox(delete: ap.Delete, new_meta: _NewMeta) -> None:
|
||||||
|
_logger.info(f"process_inbox activity={delete!r}")
|
||||||
|
obj_id = delete.get_object_id()
|
||||||
|
_logger.debug("delete object={obj_id}")
|
||||||
|
try:
|
||||||
|
obj = ap.fetch_remote_activity(obj_id)
|
||||||
|
_logger.info(f"inbox_delete handle_replies obj={obj!r}")
|
||||||
|
in_reply_to = obj.get_in_reply_to() if obj.inReplyTo else None
|
||||||
|
if obj.has_type(ap.CREATE_TYPES):
|
||||||
|
in_reply_to = ap._get_id(
|
||||||
|
DB.activities.find_one(
|
||||||
|
{"meta.object_id": obj_id, "type": ap.ActivityType.CREATE.value}
|
||||||
|
)["activity"]["object"].get("inReplyTo")
|
||||||
|
)
|
||||||
|
if in_reply_to:
|
||||||
|
back._handle_replies_delete(MY_PERSON, in_reply_to)
|
||||||
|
except Exception:
|
||||||
|
_logger.exception(f"failed to handle delete replies for {obj_id}")
|
||||||
|
|
||||||
|
DB.activities.update_one(
|
||||||
|
{"meta.object_id": obj_id, "type": "Create"}, {"$set": {"meta.deleted": True}}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Foce undo other related activities
|
||||||
|
DB.activities.update({"meta.object_id": obj_id}, {"$set": {"meta.undo": True}})
|
||||||
|
|
||||||
|
|
||||||
|
@process_inbox.register
|
||||||
|
def _update_process_inbox(update: ap.Update, new_meta: _NewMeta) -> None:
|
||||||
|
_logger.info(f"process_inbox activity={update!r}")
|
||||||
|
obj = update.get_object()
|
||||||
|
if obj.ACTIVITY_TYPE == ap.ActivityType.NOTE:
|
||||||
|
DB.activities.update_one(
|
||||||
|
{"activity.object.id": obj.id}, {"$set": {"activity.object": obj.to_dict()}}
|
||||||
|
)
|
||||||
|
elif obj.has_type(ap.ActivityType.QUESTION):
|
||||||
|
choices = obj._data.get("oneOf", obj.anyOf)
|
||||||
|
total_replies = 0
|
||||||
|
_set = {}
|
||||||
|
for choice in choices:
|
||||||
|
answer_key = _answer_key(choice["name"])
|
||||||
|
cnt = choice["replies"]["totalItems"]
|
||||||
|
total_replies += cnt
|
||||||
|
_set[f"meta.question_answers.{answer_key}"] = cnt
|
||||||
|
|
||||||
|
_set["meta.question_replies"] = total_replies
|
||||||
|
|
||||||
|
DB.activities.update_one(
|
||||||
|
{"box": Box.INBOX.value, "activity.object.id": obj.id}, {"$set": _set}
|
||||||
|
)
|
||||||
|
# Also update the cached copies of the question (like Announce and Like)
|
||||||
|
DB.activities.update_many(
|
||||||
|
{"meta.object.id": obj.id}, {"$set": {"meta.object": obj.to_dict()}}
|
||||||
|
)
|
||||||
|
|
||||||
|
# FIXME(tsileo): handle update actor amd inbox_update_note/inbox_update_actor
|
||||||
|
|
||||||
|
|
||||||
|
@process_inbox.register
|
||||||
|
def _create_process_inbox(create: ap.Create, new_meta: _NewMeta) -> None:
|
||||||
|
_logger.info(f"process_inbox activity={create!r}")
|
||||||
|
# If it's a `Quesiion`, trigger an async task for updating it later (by fetching the remote and updating the
|
||||||
|
# local copy)
|
||||||
|
question = create.get_object()
|
||||||
|
if question.has_type(ap.ActivityType.QUESTION):
|
||||||
|
Tasks.fetch_remote_question(question)
|
||||||
|
|
||||||
|
back._handle_replies(MY_PERSON, create)
|
||||||
|
|
||||||
|
|
||||||
|
@process_inbox.register
|
||||||
|
def _announce_process_inbox(announce: ap.Announce, new_meta: _NewMeta) -> None:
|
||||||
|
_logger.info(f"process_inbox activity={announce!r}")
|
||||||
|
# TODO(tsileo): actually drop it without storing it and better logging, also move the check somewhere else
|
||||||
|
# or remove it?
|
||||||
|
try:
|
||||||
|
obj = announce.get_object()
|
||||||
|
except NotAnActivityError:
|
||||||
|
_logger.exception(
|
||||||
|
f'received an Annouce referencing an OStatus notice ({announce._data["object"]}), dropping the message'
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
if obj.has_type(ap.ActivityType.QUESTION):
|
||||||
|
Tasks.fetch_remote_question(obj)
|
||||||
|
|
||||||
|
DB.activities.update_one(
|
||||||
|
{"remote_id": announce.id},
|
||||||
|
{
|
||||||
|
"$set": {
|
||||||
|
"meta.object": obj.to_dict(embed=True),
|
||||||
|
"meta.object_actor": obj.get_actor(embed=True),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
DB.activities.update_one(
|
||||||
|
{"activity.object.id": obj.id}, {"$inc": {"meta.count_boost": 1}}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@process_inbox.register
|
||||||
|
def _like_process_inbox(like: ap.Like, new_meta: _NewMeta) -> None:
|
||||||
|
_logger.info(f"process_inbox activity={like!r}")
|
||||||
|
obj = like.get_object()
|
||||||
|
# Update the meta counter if the object is published by the server
|
||||||
|
DB.activities.update_one(
|
||||||
|
{"box": Box.OUTBOX.value, "activity.object.id": obj.id},
|
||||||
|
{"$inc": {"meta.count_like": 1}},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@process_inbox.register
|
||||||
|
def _follow_process_inbox(activity: ap.Follow, new_meta: _NewMeta) -> None:
|
||||||
|
_logger.info(f"process_inbox activity={activity!r}")
|
||||||
|
# Reply to a Follow with an Accept
|
||||||
|
actor_id = activity.get_actor().id
|
||||||
|
accept = ap.Accept(
|
||||||
|
actor=config.ID,
|
||||||
|
object={
|
||||||
|
"type": "Follow",
|
||||||
|
"id": activity.id,
|
||||||
|
"object": activity.get_object_id(),
|
||||||
|
"actor": actor_id,
|
||||||
|
},
|
||||||
|
to=[actor_id],
|
||||||
|
published=now(),
|
||||||
|
)
|
||||||
|
post_to_outbox(accept)
|
||||||
|
|
||||||
|
|
||||||
|
@process_inbox.register
|
||||||
|
def _undo_process_inbox(activity: ap.Undo, new_meta: _NewMeta) -> None:
|
||||||
|
_logger.info(f"process_inbox activity={activity!r}")
|
||||||
|
obj = activity.get_object()
|
||||||
|
if obj.has_type(ap.ActivityType.LIKE):
|
||||||
|
liked = activity.get_object()
|
||||||
|
# Update the meta counter if the object is published by the server
|
||||||
|
DB.activities.update_one(
|
||||||
|
{"box": Box.OUTBOX.value, "activity.object.id": liked.id},
|
||||||
|
{"$inc": {"meta.count_like": -1}},
|
||||||
|
)
|
||||||
|
DB.activities.update_one({"remote_id": obj.id}, {"$set": {"meta.undo": True}})
|
||||||
|
elif obj.has_type(ap.ActivityType.ANNOUNCE):
|
||||||
|
announced = obj.get_object()
|
||||||
|
# Update the meta counter if the object is published by the server
|
||||||
|
DB.activities.update_one(
|
||||||
|
{"activity.object.id": announced.id}, {"$inc": {"meta.count_boost": -1}}
|
||||||
|
)
|
||||||
|
DB.activities.update_one({"remote_id": obj.id}, {"$set": {"meta.undo": True}})
|
||||||
|
elif obj.has_type(ap.ActivityType.FOLLOW):
|
||||||
|
DB.activities.update_one({"remote_id": obj.id}, {"$set": {"meta.undo": True}})
|
137
core/outbox.py
Normal file
137
core/outbox.py
Normal file
|
@ -0,0 +1,137 @@
|
||||||
|
import logging
|
||||||
|
from datetime import datetime
|
||||||
|
from functools import singledispatch
|
||||||
|
from typing import Any
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
from little_boxes import activitypub as ap
|
||||||
|
|
||||||
|
from core.db import DB
|
||||||
|
from core.db import update_many_activities
|
||||||
|
from core.shared import MY_PERSON
|
||||||
|
from core.shared import back
|
||||||
|
from core.tasks import Tasks
|
||||||
|
|
||||||
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_NewMeta = Dict[str, Any]
|
||||||
|
|
||||||
|
|
||||||
|
@singledispatch
|
||||||
|
def process_outbox(activity: ap.BaseActivity, new_meta: _NewMeta) -> None:
|
||||||
|
_logger.warning(f"skipping {activity!r}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@process_outbox.register
|
||||||
|
def _delete_process_outbox(delete: ap.Delete, new_meta: _NewMeta) -> None:
|
||||||
|
_logger.info(f"process_outbox activity={delete!r}")
|
||||||
|
obj = delete.get_object()
|
||||||
|
|
||||||
|
update_many_activities(
|
||||||
|
{"meta.object_id": obj.id}, {"$set": {"meta.deleted": True, "meta.undo": True}}
|
||||||
|
)
|
||||||
|
|
||||||
|
in_reply_to = obj.get_in_reply_to()
|
||||||
|
if in_reply_to:
|
||||||
|
DB.activities.update_one(
|
||||||
|
{"activity.object.id": in_reply_to},
|
||||||
|
{"$inc": {"meta.count_reply": -1, "meta.count_direct_reply": -1}},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@process_outbox.register
|
||||||
|
def _update_process_outbox(update: ap.Update, new_meta: _NewMeta) -> None:
|
||||||
|
_logger.info(f"process_outbox activity={update!r}")
|
||||||
|
|
||||||
|
obj = update._data["object"]
|
||||||
|
|
||||||
|
update_prefix = "activity.object."
|
||||||
|
to_update: Dict[str, Any] = {"$set": dict(), "$unset": dict()}
|
||||||
|
to_update["$set"][f"{update_prefix}updated"] = (
|
||||||
|
datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
|
||||||
|
)
|
||||||
|
for k, v in obj.items():
|
||||||
|
if k in ["id", "type"]:
|
||||||
|
continue
|
||||||
|
if v is None:
|
||||||
|
to_update["$unset"][f"{update_prefix}{k}"] = ""
|
||||||
|
else:
|
||||||
|
to_update["$set"][f"{update_prefix}{k}"] = v
|
||||||
|
|
||||||
|
if len(to_update["$unset"]) == 0:
|
||||||
|
del to_update["$unset"]
|
||||||
|
|
||||||
|
_logger.info(f"updating note from outbox {obj!r} {to_update}")
|
||||||
|
DB.activities.update_one({"activity.object.id": obj["id"]}, to_update)
|
||||||
|
# FIXME(tsileo): should send an Update (but not a partial one, to all the note's recipients
|
||||||
|
# (create a new Update with the result of the update, and send it without saving it?)
|
||||||
|
|
||||||
|
|
||||||
|
@process_outbox.register
|
||||||
|
def _create_process_outbox(create: ap.Create, new_meta: _NewMeta) -> None:
|
||||||
|
_logger.info(f"process_outbox activity={create!r}")
|
||||||
|
back._handle_replies(MY_PERSON, create)
|
||||||
|
|
||||||
|
|
||||||
|
@process_outbox.register
|
||||||
|
def _announce_process_outbox(announce: ap.Announce, new_meta: _NewMeta) -> None:
|
||||||
|
_logger.info(f"process_outbox activity={announce!r}")
|
||||||
|
|
||||||
|
obj = announce.get_object()
|
||||||
|
if obj.has_type(ap.ActivityType.QUESTION):
|
||||||
|
Tasks.fetch_remote_question(obj)
|
||||||
|
|
||||||
|
DB.activities.update_one(
|
||||||
|
{"remote_id": announce.id},
|
||||||
|
{
|
||||||
|
"$set": {
|
||||||
|
"meta.object": obj.to_dict(embed=True),
|
||||||
|
"meta.object_actor": obj.get_actor(embed=True),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
DB.activities.update_one(
|
||||||
|
{"activity.object.id": obj.id}, {"$set": {"meta.boosted": announce.id}}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@process_outbox.register
|
||||||
|
def _like_process_outbox(like: ap.Like, new_meta: _NewMeta) -> None:
|
||||||
|
_logger.info(f"process_outbox activity={like!r}")
|
||||||
|
|
||||||
|
obj = like.get_object()
|
||||||
|
if obj.has_type(ap.ActivityType.QUESTION):
|
||||||
|
Tasks.fetch_remote_question(obj)
|
||||||
|
|
||||||
|
DB.activities.update_one(
|
||||||
|
{"activity.object.id": obj.id},
|
||||||
|
{"$inc": {"meta.count_like": 1}, "$set": {"meta.liked": like.id}},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@process_outbox.register
|
||||||
|
def _undo_process_outbox(undo: ap.Undo, new_meta: _NewMeta) -> None:
|
||||||
|
_logger.info(f"process_outbox activity={undo!r}")
|
||||||
|
obj = undo.get_object()
|
||||||
|
DB.activities.update_one({"remote_id": obj.id}, {"$set": {"meta.undo": True}})
|
||||||
|
|
||||||
|
# Undo Like
|
||||||
|
if obj.has_type(ap.ActivityType.LIKE):
|
||||||
|
liked = obj.get_objec_id()
|
||||||
|
DB.activities.update_one(
|
||||||
|
{"activity.object.id": liked},
|
||||||
|
{"$inc": {"meta.count_like": -1}, "$set": {"meta.liked": False}},
|
||||||
|
)
|
||||||
|
|
||||||
|
elif obj.has_type(ap.ActivityType.ANNOUNCE):
|
||||||
|
announced = obj.get_object_id()
|
||||||
|
DB.activities.update_one(
|
||||||
|
{"activity.object.id": announced}, {"$set": {"meta.boosted": False}}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Undo Follow (undo new following)
|
||||||
|
elif obj.has_type(ap.ActivityType.FOLLOW):
|
||||||
|
pass
|
||||||
|
# do nothing
|
Loading…
Add table
Add a link
Reference in a new issue