From 02d1640e08f715a36b73710217c0c19cc129146c Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Sun, 4 Aug 2019 17:18:52 +0200 Subject: [PATCH] Remove dead code and re-organize --- app.py | 28 +-- blueprints/admin.py | 2 +- blueprints/api.py | 4 +- blueprints/tasks.py | 2 +- core/activitypub.py | 445 +++++++++++--------------------------------- core/inbox.py | 2 +- core/shared.py | 32 ---- 7 files changed, 114 insertions(+), 401 deletions(-) diff --git a/app.py b/app.py index 9981ed3..5a1934c 100644 --- a/app.py +++ b/app.py @@ -43,7 +43,10 @@ from config import ME from config import MEDIA_CACHE from config import VERSION from core import activitypub +from core.activitypub import activity_url from core.activitypub import embed_collection +from core.activitypub import post_to_inbox +from core.activitypub import post_to_outbox from core.db import find_one_activity from core.meta import Box from core.meta import MetaKey @@ -55,14 +58,10 @@ from core.shared import MY_PERSON from core.shared import _add_answers_to_question from core.shared import _build_thread from core.shared import _get_ip -from core.shared import activity_url -from core.shared import back from core.shared import csrf from core.shared import login_required from core.shared import noindex from core.shared import paginated_query -from core.shared import post_to_outbox -from core.tasks import Tasks from utils import now from utils.key import get_secret_key from utils.template_filters import filters @@ -1006,24 +1005,3 @@ def rss_feed(): response=activitypub.gen_feed().rss_str(), headers={"Content-Type": "application/rss+xml"}, ) - - -def post_to_inbox(activity: ap.BaseActivity) -> None: - # Check for Block activity - actor = activity.get_actor() - if back.outbox_is_blocked(MY_PERSON, actor.id): - app.logger.info( - f"actor {actor!r} is blocked, dropping the received activity {activity!r}" - ) - return - - if back.inbox_check_duplicate(MY_PERSON, activity.id): - # The activity is already in the inbox - app.logger.info(f"received duplicate activity {activity!r}, dropping it") - return - - back.save(Box.INBOX, activity) - Tasks.process_new_activity(activity.id) - - app.logger.info(f"spawning task for {activity!r}") - Tasks.finish_post_to_inbox(activity.id) diff --git a/blueprints/admin.py b/blueprints/admin.py index 94204bf..876167e 100644 --- a/blueprints/admin.py +++ b/blueprints/admin.py @@ -23,6 +23,7 @@ from config import DB from config import ID from config import PASS from core.activitypub import Box +from core.activitypub import post_to_outbox from core.shared import MY_PERSON from core.shared import _build_thread from core.shared import _Response @@ -31,7 +32,6 @@ from core.shared import login_required from core.shared import noindex from core.shared import p from core.shared import paginated_query -from core.shared import post_to_outbox from utils import now from utils.lookup import lookup diff --git a/blueprints/api.py b/blueprints/api.py index 4a29aa9..c8bd3db 100644 --- a/blueprints/api.py +++ b/blueprints/api.py @@ -33,15 +33,15 @@ from config import JWT from config import MEDIA_CACHE from config import _drop_db from core import activitypub +from core.activitypub import activity_url +from core.activitypub import post_to_outbox from core.meta import Box from core.meta import MetaKey from core.meta import _meta from core.shared import MY_PERSON from core.shared import _Response -from core.shared import activity_url from core.shared import csrf from core.shared import login_required -from core.shared import post_to_outbox from core.tasks import Tasks from utils import now diff --git a/blueprints/tasks.py b/blueprints/tasks.py index 69e1a01..4d970c5 100644 --- a/blueprints/tasks.py +++ b/blueprints/tasks.py @@ -17,6 +17,7 @@ import config from config import DB from core import gc from core.activitypub import Box +from core.activitypub import post_to_outbox from core.inbox import process_inbox from core.meta import MetaKey from core.meta import _meta @@ -27,7 +28,6 @@ from core.shared import _add_answers_to_question from core.shared import _Response from core.shared import back from core.shared import p -from core.shared import post_to_outbox from core.tasks import Tasks from utils import now from utils import opengraph diff --git a/core/activitypub.py b/core/activitypub.py index 442d461..37e1f11 100644 --- a/core/activitypub.py +++ b/core/activitypub.py @@ -1,24 +1,24 @@ +import binascii import hashlib import logging import os -from datetime import datetime from typing import Any from typing import Dict from typing import List from typing import Optional +from urllib.parse import urljoin from urllib.parse import urlparse from bson.objectid import ObjectId from cachetools import LRUCache from feedgen.feed import FeedGenerator +from flask import url_for from html2text import html2text from little_boxes import activitypub as ap from little_boxes import strtobool from little_boxes.activitypub import _to_list from little_boxes.backend import Backend from little_boxes.errors import ActivityGoneError -from little_boxes.errors import Error -from little_boxes.errors import NotAnActivityError from config import BASE_URL from config import DB @@ -39,26 +39,6 @@ ACTORS_CACHE = LRUCache(maxsize=256) MY_PERSON = ap.Person(**ME) -def _actor_to_meta(actor: ap.BaseActivity, with_inbox=False) -> Dict[str, Any]: - meta = { - "id": actor.id, - "url": actor.url, - "icon": actor.icon, - "name": actor.name, - "preferredUsername": actor.preferredUsername, - } - if with_inbox: - meta.update( - { - "inbox": actor.inbox, - "sharedInbox": actor._data.get("endpoints", {}).get("sharedInbox"), - } - ) - logger.debug(f"meta={meta}") - - return meta - - def _remove_id(doc: ap.ObjectType) -> ap.ObjectType: """Helper for removing MongoDB's `_id` field.""" doc = doc.copy() @@ -67,17 +47,6 @@ def _remove_id(doc: ap.ObjectType) -> ap.ObjectType: return doc -def ensure_it_is_me(f): - """Method decorator used to track the events fired during tests.""" - - def wrapper(*args, **kwargs): - if args[1].id != ME["id"]: - raise Error("unexpected actor") - return f(*args, **kwargs) - - return wrapper - - def _answer_key(choice: str) -> str: h = hashlib.new("sha1") h.update(choice.encode()) @@ -96,6 +65,109 @@ def _is_local_reply(create: ap.Create) -> bool: return False +def save(box: Box, activity: ap.BaseActivity) -> None: + """Custom helper for saving an activity to the DB.""" + visibility = ap.get_visibility(activity) + is_public = False + if visibility in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]: + is_public = True + + object_id = None + try: + object_id = activity.get_object_id() + except Exception: # TODO(tsileo): should be ValueError, but replies trigger a KeyError on object + pass + + object_visibility = None + if activity.has_type( + [ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE, ap.ActivityType.LIKE] + ): + object_visibility = ap.get_visibility(activity.get_object()).name + + actor_id = activity.get_actor().id + + DB.activities.insert_one( + { + "box": box.value, + "activity": activity.to_dict(), + "type": _to_list(activity.type), + "remote_id": activity.id, + "meta": { + "undo": False, + "deleted": False, + "public": is_public, + "server": urlparse(activity.id).netloc, + "visibility": visibility.name, + "actor_id": actor_id, + "object_id": object_id, + "object_visibility": object_visibility, + "poll_answer": False, + }, + } + ) + + +def outbox_is_blocked(actor_id: str) -> bool: + return bool( + DB.activities.find_one( + { + "box": Box.OUTBOX.value, + "type": ap.ActivityType.BLOCK.value, + "activity.object": actor_id, + "meta.undo": False, + } + ) + ) + + +def activity_url(item_id: str) -> str: + return urljoin(BASE_URL, url_for("outbox_detail", item_id=item_id)) + + +def post_to_inbox(activity: ap.BaseActivity) -> None: + # Check for Block activity + actor = activity.get_actor() + if outbox_is_blocked(actor.id): + logger.info( + f"actor {actor!r} is blocked, dropping the received activity {activity!r}" + ) + return + + if DB.activities.find_one({"box": Box.INBOX.value, "remote_id": activity.id}): + # The activity is already in the inbox + logger.info(f"received duplicate activity {activity!r}, dropping it") + return + + save(Box.INBOX, activity) + Tasks.process_new_activity(activity.id) + + logger.info(f"spawning task for {activity!r}") + Tasks.finish_post_to_inbox(activity.id) + + +def post_to_outbox(activity: ap.BaseActivity) -> str: + if activity.has_type(ap.CREATE_TYPES): + activity = activity.build_create() + + # Assign create a random ID + obj_id = binascii.hexlify(os.urandom(8)).decode("utf-8") + uri = activity_url(obj_id) + activity._data["id"] = uri + if activity.has_type(ap.ActivityType.CREATE): + activity._data["object"]["id"] = urljoin( + BASE_URL, url_for("outbox_activity", item_id=obj_id) + ) + activity._data["object"]["url"] = urljoin( + BASE_URL, url_for("note_by_id", note_id=obj_id) + ) + activity.reset_object_cache() + + save(Box.OUTBOX, activity) + Tasks.cache_actor(activity.id) + Tasks.finish_post_to_outbox(activity.id) + return activity.id + + class MicroblogPubBackend(Backend): """Implements a Little Boxes backend, backed by MongoDB.""" @@ -112,47 +184,6 @@ class MicroblogPubBackend(Backend): def extra_inboxes(self) -> List[str]: return EXTRA_INBOXES - def save(self, box: Box, activity: ap.BaseActivity) -> None: - """Custom helper for saving an activity to the DB.""" - visibility = ap.get_visibility(activity) - is_public = False - if visibility in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]: - is_public = True - - object_id = None - try: - object_id = activity.get_object_id() - except Exception: # TODO(tsileo): should be ValueError, but replies trigger a KeyError on object - pass - - object_visibility = None - if activity.has_type( - [ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE, ap.ActivityType.LIKE] - ): - object_visibility = ap.get_visibility(activity.get_object()).name - - actor_id = activity.get_actor().id - - DB.activities.insert_one( - { - "box": box.value, - "activity": activity.to_dict(), - "type": _to_list(activity.type), - "remote_id": activity.id, - "meta": { - "undo": False, - "deleted": False, - "public": is_public, - "server": urlparse(activity.id).netloc, - "visibility": visibility.name, - "actor_id": actor_id, - "object_id": object_id, - "object_visibility": object_visibility, - "poll_answer": False, - }, - } - ) - def followers(self) -> List[str]: q = { "box": Box.INBOX.value, @@ -195,19 +226,6 @@ class MicroblogPubBackend(Backend): return super().parse_collection(payload, url) - @ensure_it_is_me - def outbox_is_blocked(self, as_actor: ap.Person, actor_id: str) -> bool: - return bool( - DB.activities.find_one( - { - "box": Box.OUTBOX.value, - "type": ap.ActivityType.BLOCK.value, - "activity.object": actor_id, - "meta.undo": False, - } - ) - ) - def _fetch_iri(self, iri: str) -> ap.ObjectType: # noqa: C901 # Shortcut if the instance actor is fetched if iri == ME["id"]: @@ -317,259 +335,9 @@ class MicroblogPubBackend(Backend): return data - @ensure_it_is_me - def inbox_check_duplicate(self, as_actor: ap.Person, iri: str) -> bool: - return bool(DB.activities.find_one({"box": Box.INBOX.value, "remote_id": iri})) - def set_post_to_remote_inbox(self, cb): self.post_to_remote_inbox_cb = cb - @ensure_it_is_me - def undo_new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None: - DB.activities.update_one( - {"remote_id": follow.id}, {"$set": {"meta.undo": True}} - ) - - @ensure_it_is_me - def undo_new_following(self, as_actor: ap.Person, follow: ap.Follow) -> None: - DB.activities.update_one( - {"remote_id": follow.id}, {"$set": {"meta.undo": True}} - ) - - @ensure_it_is_me - def inbox_like(self, as_actor: ap.Person, like: ap.Like) -> None: - obj = like.get_object() - # Update the meta counter if the object is published by the server - DB.activities.update_one( - {"box": Box.OUTBOX.value, "activity.object.id": obj.id}, - {"$inc": {"meta.count_like": 1}}, - ) - - @ensure_it_is_me - def inbox_undo_like(self, as_actor: ap.Person, like: ap.Like) -> None: - obj = like.get_object() - # Update the meta counter if the object is published by the server - DB.activities.update_one( - {"box": Box.OUTBOX.value, "activity.object.id": obj.id}, - {"$inc": {"meta.count_like": -1}}, - ) - DB.activities.update_one({"remote_id": like.id}, {"$set": {"meta.undo": True}}) - - @ensure_it_is_me - def outbox_like(self, as_actor: ap.Person, like: ap.Like) -> None: - obj = like.get_object() - if obj.has_type(ap.ActivityType.QUESTION): - Tasks.fetch_remote_question(obj) - - DB.activities.update_one( - {"activity.object.id": obj.id}, - {"$inc": {"meta.count_like": 1}, "$set": {"meta.liked": like.id}}, - ) - - @ensure_it_is_me - def outbox_undo_like(self, as_actor: ap.Person, like: ap.Like) -> None: - obj = like.get_object() - DB.activities.update_one( - {"activity.object.id": obj.id}, - {"$inc": {"meta.count_like": -1}, "$set": {"meta.liked": False}}, - ) - DB.activities.update_one({"remote_id": like.id}, {"$set": {"meta.undo": True}}) - - @ensure_it_is_me - def inbox_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None: - # TODO(tsileo): actually drop it without storing it and better logging, also move the check somewhere else - # or remove it? - try: - obj = announce.get_object() - except NotAnActivityError: - logger.exception( - f'received an Annouce referencing an OStatus notice ({announce._data["object"]}), dropping the message' - ) - return - - if obj.has_type(ap.ActivityType.QUESTION): - Tasks.fetch_remote_question(obj) - - DB.activities.update_one( - {"remote_id": announce.id}, - { - "$set": { - "meta.object": obj.to_dict(embed=True), - "meta.object_actor": _actor_to_meta(obj.get_actor()), - } - }, - ) - DB.activities.update_one( - {"activity.object.id": obj.id}, {"$inc": {"meta.count_boost": 1}} - ) - - @ensure_it_is_me - def inbox_undo_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None: - obj = announce.get_object() - # Update the meta counter if the object is published by the server - DB.activities.update_one( - {"activity.object.id": obj.id}, {"$inc": {"meta.count_boost": -1}} - ) - DB.activities.update_one( - {"remote_id": announce.id}, {"$set": {"meta.undo": True}} - ) - - @ensure_it_is_me - def outbox_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None: - obj = announce.get_object() - if obj.has_type(ap.ActivityType.QUESTION): - Tasks.fetch_remote_question(obj) - - DB.activities.update_one( - {"remote_id": announce.id}, - { - "$set": { - "meta.object": obj.to_dict(embed=True), - "meta.object_actor": _actor_to_meta(obj.get_actor()), - } - }, - ) - - DB.activities.update_one( - {"activity.object.id": obj.id}, {"$set": {"meta.boosted": announce.id}} - ) - - @ensure_it_is_me - def outbox_undo_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None: - obj = announce.get_object() - DB.activities.update_one( - {"activity.object.id": obj.id}, {"$set": {"meta.boosted": False}} - ) - DB.activities.update_one( - {"remote_id": announce.id}, {"$set": {"meta.undo": True}} - ) - - @ensure_it_is_me - def inbox_delete(self, as_actor: ap.Person, delete: ap.Delete) -> None: - obj_id = delete.get_object_id() - logger.debug("delete object={obj_id}") - try: - obj = ap.fetch_remote_activity(obj_id) - logger.info(f"inbox_delete handle_replies obj={obj!r}") - in_reply_to = obj.get_in_reply_to() if obj.inReplyTo else None - if obj.has_type(ap.CREATE_TYPES): - in_reply_to = ap._get_id( - DB.activities.find_one( - {"meta.object_id": obj_id, "type": ap.ActivityType.CREATE.value} - )["activity"]["object"].get("inReplyTo") - ) - if in_reply_to: - self._handle_replies_delete(as_actor, in_reply_to) - except Exception: - logger.exception(f"failed to handle delete replies for {obj_id}") - - DB.activities.update_one( - {"meta.object_id": obj_id, "type": "Create"}, - {"$set": {"meta.deleted": True}}, - ) - - # Foce undo other related activities - DB.activities.update({"meta.object_id": obj_id}, {"$set": {"meta.undo": True}}) - - @ensure_it_is_me - def outbox_delete(self, as_actor: ap.Person, delete: ap.Delete) -> None: - DB.activities.update( - {"meta.object_id": delete.get_object_id()}, - {"$set": {"meta.deleted": True, "meta.undo": True}}, - ) - obj = delete.get_object() - if delete.get_object().ACTIVITY_TYPE != ap.ActivityType.NOTE: - obj = ap.parse_activity( - DB.activities.find_one( - { - "activity.object.id": delete.get_object().id, - "type": ap.ActivityType.CREATE.value, - } - )["activity"] - ).get_object() - - self._handle_replies_delete(as_actor, obj.get_in_reply_to()) - - @ensure_it_is_me - def inbox_update(self, as_actor: ap.Person, update: ap.Update) -> None: - obj = update.get_object() - if obj.ACTIVITY_TYPE == ap.ActivityType.NOTE: - DB.activities.update_one( - {"activity.object.id": obj.id}, - {"$set": {"activity.object": obj.to_dict()}}, - ) - elif obj.has_type(ap.ActivityType.QUESTION): - choices = obj._data.get("oneOf", obj.anyOf) - total_replies = 0 - _set = {} - for choice in choices: - answer_key = _answer_key(choice["name"]) - cnt = choice["replies"]["totalItems"] - total_replies += cnt - _set[f"meta.question_answers.{answer_key}"] = cnt - - _set["meta.question_replies"] = total_replies - - DB.activities.update_one( - {"box": Box.INBOX.value, "activity.object.id": obj.id}, {"$set": _set} - ) - # Also update the cached copies of the question (like Announce and Like) - DB.activities.update_many( - {"meta.object.id": obj.id}, {"$set": {"meta.object": obj.to_dict()}} - ) - - # FIXME(tsileo): handle update actor amd inbox_update_note/inbox_update_actor - - @ensure_it_is_me - def outbox_update(self, as_actor: ap.Person, _update: ap.Update) -> None: - obj = _update._data["object"] - - update_prefix = "activity.object." - update: Dict[str, Any] = {"$set": dict(), "$unset": dict()} - update["$set"][f"{update_prefix}updated"] = ( - datetime.utcnow().replace(microsecond=0).isoformat() + "Z" - ) - for k, v in obj.items(): - if k in ["id", "type"]: - continue - if v is None: - update["$unset"][f"{update_prefix}{k}"] = "" - else: - update["$set"][f"{update_prefix}{k}"] = v - - if len(update["$unset"]) == 0: - del update["$unset"] - - print(f"updating note from outbox {obj!r} {update}") - logger.info(f"updating note from outbox {obj!r} {update}") - DB.activities.update_one({"activity.object.id": obj["id"]}, update) - # FIXME(tsileo): should send an Update (but not a partial one, to all the note's recipients - # (create a new Update with the result of the update, and send it without saving it?) - - @ensure_it_is_me - def outbox_create(self, as_actor: ap.Person, create: ap.Create) -> None: - obj = create.get_object() - - # Flag the activity as a poll answer if needed - print(f"POLL ANSWER ChECK {obj.get_in_reply_to()} {obj.name} {obj.content}") - if obj.get_in_reply_to() and obj.name and not obj.content: - DB.activities.update_one( - {"remote_id": create.id}, {"$set": {"meta.poll_answer": True}} - ) - - self._handle_replies(as_actor, create) - - @ensure_it_is_me - def inbox_create(self, as_actor: ap.Person, create: ap.Create) -> None: - # If it's a `Quesiion`, trigger an async task for updating it later (by fetching the remote and updating the - # local copy) - question = create.get_object() - if question.has_type(ap.ActivityType.QUESTION): - Tasks.fetch_remote_question(question) - - self._handle_replies(as_actor, create) - - @ensure_it_is_me def _handle_replies_delete( self, as_actor: ap.Person, in_reply_to: Optional[str] ) -> None: @@ -627,7 +395,6 @@ class MicroblogPubBackend(Backend): return None - @ensure_it_is_me def _handle_replies(self, as_actor: ap.Person, create: ap.Create) -> None: """Go up to the root reply, store unknown replies in the `threads` DB and set the "meta.thread_root_parent" key to make it easy to query a whole thread.""" diff --git a/core/inbox.py b/core/inbox.py index 376f694..1fc6dd9 100644 --- a/core/inbox.py +++ b/core/inbox.py @@ -8,11 +8,11 @@ from little_boxes.errors import NotAnActivityError import config from core.activitypub import _answer_key +from core.activitypub import post_to_outbox from core.db import DB from core.meta import Box from core.shared import MY_PERSON from core.shared import back -from core.shared import post_to_outbox from core.tasks import Tasks from utils import now diff --git a/core/shared.py b/core/shared.py index 98e2629..8432bc7 100644 --- a/core/shared.py +++ b/core/shared.py @@ -1,11 +1,9 @@ -import binascii import os from datetime import datetime from datetime import timezone from functools import wraps from typing import Any from typing import Dict -from urllib.parse import urljoin import flask from bson.objectid import ObjectId @@ -19,13 +17,10 @@ from little_boxes import activitypub as ap from little_boxes.activitypub import format_datetime from poussetaches import PousseTaches -from config import BASE_URL from config import DB from config import ME from core import activitypub from core.activitypub import _answer_key -from core.meta import Box -from core.tasks import Tasks # _Response = Union[flask.Response, werkzeug.wrappers.Response, str, Any] _Response = Any @@ -94,33 +89,6 @@ def _get_ip(): return ip, geoip -def activity_url(item_id: str) -> str: - return urljoin(BASE_URL, url_for("outbox_detail", item_id=item_id)) - - -def post_to_outbox(activity: ap.BaseActivity) -> str: - if activity.has_type(ap.CREATE_TYPES): - activity = activity.build_create() - - # Assign create a random ID - obj_id = binascii.hexlify(os.urandom(8)).decode("utf-8") - uri = activity_url(obj_id) - activity._data["id"] = uri - if activity.has_type(ap.ActivityType.CREATE): - activity._data["object"]["id"] = urljoin( - BASE_URL, url_for("outbox_activity", item_id=obj_id) - ) - activity._data["object"]["url"] = urljoin( - BASE_URL, url_for("note_by_id", note_id=obj_id) - ) - activity.reset_object_cache() - - back.save(Box.OUTBOX, activity) - Tasks.cache_actor(activity.id) - Tasks.finish_post_to_outbox(activity.id) - return activity.id - - def _build_thread(data, include_children=True): # noqa: C901 data["_requested"] = True app.logger.info(f"_build_thread({data!r})")