Remove dead code and re-organize

This commit is contained in:
Thomas Sileo 2019-08-04 17:18:52 +02:00
parent a21121308f
commit 02d1640e08
7 changed files with 114 additions and 401 deletions

28
app.py
View file

@ -43,7 +43,10 @@ from config import ME
from config import MEDIA_CACHE
from config import VERSION
from core import activitypub
from core.activitypub import activity_url
from core.activitypub import embed_collection
from core.activitypub import post_to_inbox
from core.activitypub import post_to_outbox
from core.db import find_one_activity
from core.meta import Box
from core.meta import MetaKey
@ -55,14 +58,10 @@ from core.shared import MY_PERSON
from core.shared import _add_answers_to_question
from core.shared import _build_thread
from core.shared import _get_ip
from core.shared import activity_url
from core.shared import back
from core.shared import csrf
from core.shared import login_required
from core.shared import noindex
from core.shared import paginated_query
from core.shared import post_to_outbox
from core.tasks import Tasks
from utils import now
from utils.key import get_secret_key
from utils.template_filters import filters
@ -1006,24 +1005,3 @@ def rss_feed():
response=activitypub.gen_feed().rss_str(),
headers={"Content-Type": "application/rss+xml"},
)
def post_to_inbox(activity: ap.BaseActivity) -> None:
# Check for Block activity
actor = activity.get_actor()
if back.outbox_is_blocked(MY_PERSON, actor.id):
app.logger.info(
f"actor {actor!r} is blocked, dropping the received activity {activity!r}"
)
return
if back.inbox_check_duplicate(MY_PERSON, activity.id):
# The activity is already in the inbox
app.logger.info(f"received duplicate activity {activity!r}, dropping it")
return
back.save(Box.INBOX, activity)
Tasks.process_new_activity(activity.id)
app.logger.info(f"spawning task for {activity!r}")
Tasks.finish_post_to_inbox(activity.id)

View file

@ -23,6 +23,7 @@ from config import DB
from config import ID
from config import PASS
from core.activitypub import Box
from core.activitypub import post_to_outbox
from core.shared import MY_PERSON
from core.shared import _build_thread
from core.shared import _Response
@ -31,7 +32,6 @@ from core.shared import login_required
from core.shared import noindex
from core.shared import p
from core.shared import paginated_query
from core.shared import post_to_outbox
from utils import now
from utils.lookup import lookup

View file

@ -33,15 +33,15 @@ from config import JWT
from config import MEDIA_CACHE
from config import _drop_db
from core import activitypub
from core.activitypub import activity_url
from core.activitypub import post_to_outbox
from core.meta import Box
from core.meta import MetaKey
from core.meta import _meta
from core.shared import MY_PERSON
from core.shared import _Response
from core.shared import activity_url
from core.shared import csrf
from core.shared import login_required
from core.shared import post_to_outbox
from core.tasks import Tasks
from utils import now

View file

@ -17,6 +17,7 @@ import config
from config import DB
from core import gc
from core.activitypub import Box
from core.activitypub import post_to_outbox
from core.inbox import process_inbox
from core.meta import MetaKey
from core.meta import _meta
@ -27,7 +28,6 @@ from core.shared import _add_answers_to_question
from core.shared import _Response
from core.shared import back
from core.shared import p
from core.shared import post_to_outbox
from core.tasks import Tasks
from utils import now
from utils import opengraph

View file

@ -1,24 +1,24 @@
import binascii
import hashlib
import logging
import os
from datetime import datetime
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from urllib.parse import urljoin
from urllib.parse import urlparse
from bson.objectid import ObjectId
from cachetools import LRUCache
from feedgen.feed import FeedGenerator
from flask import url_for
from html2text import html2text
from little_boxes import activitypub as ap
from little_boxes import strtobool
from little_boxes.activitypub import _to_list
from little_boxes.backend import Backend
from little_boxes.errors import ActivityGoneError
from little_boxes.errors import Error
from little_boxes.errors import NotAnActivityError
from config import BASE_URL
from config import DB
@ -39,26 +39,6 @@ ACTORS_CACHE = LRUCache(maxsize=256)
MY_PERSON = ap.Person(**ME)
def _actor_to_meta(actor: ap.BaseActivity, with_inbox=False) -> Dict[str, Any]:
meta = {
"id": actor.id,
"url": actor.url,
"icon": actor.icon,
"name": actor.name,
"preferredUsername": actor.preferredUsername,
}
if with_inbox:
meta.update(
{
"inbox": actor.inbox,
"sharedInbox": actor._data.get("endpoints", {}).get("sharedInbox"),
}
)
logger.debug(f"meta={meta}")
return meta
def _remove_id(doc: ap.ObjectType) -> ap.ObjectType:
"""Helper for removing MongoDB's `_id` field."""
doc = doc.copy()
@ -67,17 +47,6 @@ def _remove_id(doc: ap.ObjectType) -> ap.ObjectType:
return doc
def ensure_it_is_me(f):
"""Method decorator used to track the events fired during tests."""
def wrapper(*args, **kwargs):
if args[1].id != ME["id"]:
raise Error("unexpected actor")
return f(*args, **kwargs)
return wrapper
def _answer_key(choice: str) -> str:
h = hashlib.new("sha1")
h.update(choice.encode())
@ -96,6 +65,109 @@ def _is_local_reply(create: ap.Create) -> bool:
return False
def save(box: Box, activity: ap.BaseActivity) -> None:
"""Custom helper for saving an activity to the DB."""
visibility = ap.get_visibility(activity)
is_public = False
if visibility in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]:
is_public = True
object_id = None
try:
object_id = activity.get_object_id()
except Exception: # TODO(tsileo): should be ValueError, but replies trigger a KeyError on object
pass
object_visibility = None
if activity.has_type(
[ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE, ap.ActivityType.LIKE]
):
object_visibility = ap.get_visibility(activity.get_object()).name
actor_id = activity.get_actor().id
DB.activities.insert_one(
{
"box": box.value,
"activity": activity.to_dict(),
"type": _to_list(activity.type),
"remote_id": activity.id,
"meta": {
"undo": False,
"deleted": False,
"public": is_public,
"server": urlparse(activity.id).netloc,
"visibility": visibility.name,
"actor_id": actor_id,
"object_id": object_id,
"object_visibility": object_visibility,
"poll_answer": False,
},
}
)
def outbox_is_blocked(actor_id: str) -> bool:
return bool(
DB.activities.find_one(
{
"box": Box.OUTBOX.value,
"type": ap.ActivityType.BLOCK.value,
"activity.object": actor_id,
"meta.undo": False,
}
)
)
def activity_url(item_id: str) -> str:
return urljoin(BASE_URL, url_for("outbox_detail", item_id=item_id))
def post_to_inbox(activity: ap.BaseActivity) -> None:
# Check for Block activity
actor = activity.get_actor()
if outbox_is_blocked(actor.id):
logger.info(
f"actor {actor!r} is blocked, dropping the received activity {activity!r}"
)
return
if DB.activities.find_one({"box": Box.INBOX.value, "remote_id": activity.id}):
# The activity is already in the inbox
logger.info(f"received duplicate activity {activity!r}, dropping it")
return
save(Box.INBOX, activity)
Tasks.process_new_activity(activity.id)
logger.info(f"spawning task for {activity!r}")
Tasks.finish_post_to_inbox(activity.id)
def post_to_outbox(activity: ap.BaseActivity) -> str:
if activity.has_type(ap.CREATE_TYPES):
activity = activity.build_create()
# Assign create a random ID
obj_id = binascii.hexlify(os.urandom(8)).decode("utf-8")
uri = activity_url(obj_id)
activity._data["id"] = uri
if activity.has_type(ap.ActivityType.CREATE):
activity._data["object"]["id"] = urljoin(
BASE_URL, url_for("outbox_activity", item_id=obj_id)
)
activity._data["object"]["url"] = urljoin(
BASE_URL, url_for("note_by_id", note_id=obj_id)
)
activity.reset_object_cache()
save(Box.OUTBOX, activity)
Tasks.cache_actor(activity.id)
Tasks.finish_post_to_outbox(activity.id)
return activity.id
class MicroblogPubBackend(Backend):
"""Implements a Little Boxes backend, backed by MongoDB."""
@ -112,47 +184,6 @@ class MicroblogPubBackend(Backend):
def extra_inboxes(self) -> List[str]:
return EXTRA_INBOXES
def save(self, box: Box, activity: ap.BaseActivity) -> None:
"""Custom helper for saving an activity to the DB."""
visibility = ap.get_visibility(activity)
is_public = False
if visibility in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]:
is_public = True
object_id = None
try:
object_id = activity.get_object_id()
except Exception: # TODO(tsileo): should be ValueError, but replies trigger a KeyError on object
pass
object_visibility = None
if activity.has_type(
[ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE, ap.ActivityType.LIKE]
):
object_visibility = ap.get_visibility(activity.get_object()).name
actor_id = activity.get_actor().id
DB.activities.insert_one(
{
"box": box.value,
"activity": activity.to_dict(),
"type": _to_list(activity.type),
"remote_id": activity.id,
"meta": {
"undo": False,
"deleted": False,
"public": is_public,
"server": urlparse(activity.id).netloc,
"visibility": visibility.name,
"actor_id": actor_id,
"object_id": object_id,
"object_visibility": object_visibility,
"poll_answer": False,
},
}
)
def followers(self) -> List[str]:
q = {
"box": Box.INBOX.value,
@ -195,19 +226,6 @@ class MicroblogPubBackend(Backend):
return super().parse_collection(payload, url)
@ensure_it_is_me
def outbox_is_blocked(self, as_actor: ap.Person, actor_id: str) -> bool:
return bool(
DB.activities.find_one(
{
"box": Box.OUTBOX.value,
"type": ap.ActivityType.BLOCK.value,
"activity.object": actor_id,
"meta.undo": False,
}
)
)
def _fetch_iri(self, iri: str) -> ap.ObjectType: # noqa: C901
# Shortcut if the instance actor is fetched
if iri == ME["id"]:
@ -317,259 +335,9 @@ class MicroblogPubBackend(Backend):
return data
@ensure_it_is_me
def inbox_check_duplicate(self, as_actor: ap.Person, iri: str) -> bool:
return bool(DB.activities.find_one({"box": Box.INBOX.value, "remote_id": iri}))
def set_post_to_remote_inbox(self, cb):
self.post_to_remote_inbox_cb = cb
@ensure_it_is_me
def undo_new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None:
DB.activities.update_one(
{"remote_id": follow.id}, {"$set": {"meta.undo": True}}
)
@ensure_it_is_me
def undo_new_following(self, as_actor: ap.Person, follow: ap.Follow) -> None:
DB.activities.update_one(
{"remote_id": follow.id}, {"$set": {"meta.undo": True}}
)
@ensure_it_is_me
def inbox_like(self, as_actor: ap.Person, like: ap.Like) -> None:
obj = like.get_object()
# Update the meta counter if the object is published by the server
DB.activities.update_one(
{"box": Box.OUTBOX.value, "activity.object.id": obj.id},
{"$inc": {"meta.count_like": 1}},
)
@ensure_it_is_me
def inbox_undo_like(self, as_actor: ap.Person, like: ap.Like) -> None:
obj = like.get_object()
# Update the meta counter if the object is published by the server
DB.activities.update_one(
{"box": Box.OUTBOX.value, "activity.object.id": obj.id},
{"$inc": {"meta.count_like": -1}},
)
DB.activities.update_one({"remote_id": like.id}, {"$set": {"meta.undo": True}})
@ensure_it_is_me
def outbox_like(self, as_actor: ap.Person, like: ap.Like) -> None:
obj = like.get_object()
if obj.has_type(ap.ActivityType.QUESTION):
Tasks.fetch_remote_question(obj)
DB.activities.update_one(
{"activity.object.id": obj.id},
{"$inc": {"meta.count_like": 1}, "$set": {"meta.liked": like.id}},
)
@ensure_it_is_me
def outbox_undo_like(self, as_actor: ap.Person, like: ap.Like) -> None:
obj = like.get_object()
DB.activities.update_one(
{"activity.object.id": obj.id},
{"$inc": {"meta.count_like": -1}, "$set": {"meta.liked": False}},
)
DB.activities.update_one({"remote_id": like.id}, {"$set": {"meta.undo": True}})
@ensure_it_is_me
def inbox_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None:
# TODO(tsileo): actually drop it without storing it and better logging, also move the check somewhere else
# or remove it?
try:
obj = announce.get_object()
except NotAnActivityError:
logger.exception(
f'received an Annouce referencing an OStatus notice ({announce._data["object"]}), dropping the message'
)
return
if obj.has_type(ap.ActivityType.QUESTION):
Tasks.fetch_remote_question(obj)
DB.activities.update_one(
{"remote_id": announce.id},
{
"$set": {
"meta.object": obj.to_dict(embed=True),
"meta.object_actor": _actor_to_meta(obj.get_actor()),
}
},
)
DB.activities.update_one(
{"activity.object.id": obj.id}, {"$inc": {"meta.count_boost": 1}}
)
@ensure_it_is_me
def inbox_undo_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None:
obj = announce.get_object()
# Update the meta counter if the object is published by the server
DB.activities.update_one(
{"activity.object.id": obj.id}, {"$inc": {"meta.count_boost": -1}}
)
DB.activities.update_one(
{"remote_id": announce.id}, {"$set": {"meta.undo": True}}
)
@ensure_it_is_me
def outbox_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None:
obj = announce.get_object()
if obj.has_type(ap.ActivityType.QUESTION):
Tasks.fetch_remote_question(obj)
DB.activities.update_one(
{"remote_id": announce.id},
{
"$set": {
"meta.object": obj.to_dict(embed=True),
"meta.object_actor": _actor_to_meta(obj.get_actor()),
}
},
)
DB.activities.update_one(
{"activity.object.id": obj.id}, {"$set": {"meta.boosted": announce.id}}
)
@ensure_it_is_me
def outbox_undo_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None:
obj = announce.get_object()
DB.activities.update_one(
{"activity.object.id": obj.id}, {"$set": {"meta.boosted": False}}
)
DB.activities.update_one(
{"remote_id": announce.id}, {"$set": {"meta.undo": True}}
)
@ensure_it_is_me
def inbox_delete(self, as_actor: ap.Person, delete: ap.Delete) -> None:
obj_id = delete.get_object_id()
logger.debug("delete object={obj_id}")
try:
obj = ap.fetch_remote_activity(obj_id)
logger.info(f"inbox_delete handle_replies obj={obj!r}")
in_reply_to = obj.get_in_reply_to() if obj.inReplyTo else None
if obj.has_type(ap.CREATE_TYPES):
in_reply_to = ap._get_id(
DB.activities.find_one(
{"meta.object_id": obj_id, "type": ap.ActivityType.CREATE.value}
)["activity"]["object"].get("inReplyTo")
)
if in_reply_to:
self._handle_replies_delete(as_actor, in_reply_to)
except Exception:
logger.exception(f"failed to handle delete replies for {obj_id}")
DB.activities.update_one(
{"meta.object_id": obj_id, "type": "Create"},
{"$set": {"meta.deleted": True}},
)
# Foce undo other related activities
DB.activities.update({"meta.object_id": obj_id}, {"$set": {"meta.undo": True}})
@ensure_it_is_me
def outbox_delete(self, as_actor: ap.Person, delete: ap.Delete) -> None:
DB.activities.update(
{"meta.object_id": delete.get_object_id()},
{"$set": {"meta.deleted": True, "meta.undo": True}},
)
obj = delete.get_object()
if delete.get_object().ACTIVITY_TYPE != ap.ActivityType.NOTE:
obj = ap.parse_activity(
DB.activities.find_one(
{
"activity.object.id": delete.get_object().id,
"type": ap.ActivityType.CREATE.value,
}
)["activity"]
).get_object()
self._handle_replies_delete(as_actor, obj.get_in_reply_to())
@ensure_it_is_me
def inbox_update(self, as_actor: ap.Person, update: ap.Update) -> None:
obj = update.get_object()
if obj.ACTIVITY_TYPE == ap.ActivityType.NOTE:
DB.activities.update_one(
{"activity.object.id": obj.id},
{"$set": {"activity.object": obj.to_dict()}},
)
elif obj.has_type(ap.ActivityType.QUESTION):
choices = obj._data.get("oneOf", obj.anyOf)
total_replies = 0
_set = {}
for choice in choices:
answer_key = _answer_key(choice["name"])
cnt = choice["replies"]["totalItems"]
total_replies += cnt
_set[f"meta.question_answers.{answer_key}"] = cnt
_set["meta.question_replies"] = total_replies
DB.activities.update_one(
{"box": Box.INBOX.value, "activity.object.id": obj.id}, {"$set": _set}
)
# Also update the cached copies of the question (like Announce and Like)
DB.activities.update_many(
{"meta.object.id": obj.id}, {"$set": {"meta.object": obj.to_dict()}}
)
# FIXME(tsileo): handle update actor amd inbox_update_note/inbox_update_actor
@ensure_it_is_me
def outbox_update(self, as_actor: ap.Person, _update: ap.Update) -> None:
obj = _update._data["object"]
update_prefix = "activity.object."
update: Dict[str, Any] = {"$set": dict(), "$unset": dict()}
update["$set"][f"{update_prefix}updated"] = (
datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
)
for k, v in obj.items():
if k in ["id", "type"]:
continue
if v is None:
update["$unset"][f"{update_prefix}{k}"] = ""
else:
update["$set"][f"{update_prefix}{k}"] = v
if len(update["$unset"]) == 0:
del update["$unset"]
print(f"updating note from outbox {obj!r} {update}")
logger.info(f"updating note from outbox {obj!r} {update}")
DB.activities.update_one({"activity.object.id": obj["id"]}, update)
# FIXME(tsileo): should send an Update (but not a partial one, to all the note's recipients
# (create a new Update with the result of the update, and send it without saving it?)
@ensure_it_is_me
def outbox_create(self, as_actor: ap.Person, create: ap.Create) -> None:
obj = create.get_object()
# Flag the activity as a poll answer if needed
print(f"POLL ANSWER ChECK {obj.get_in_reply_to()} {obj.name} {obj.content}")
if obj.get_in_reply_to() and obj.name and not obj.content:
DB.activities.update_one(
{"remote_id": create.id}, {"$set": {"meta.poll_answer": True}}
)
self._handle_replies(as_actor, create)
@ensure_it_is_me
def inbox_create(self, as_actor: ap.Person, create: ap.Create) -> None:
# If it's a `Quesiion`, trigger an async task for updating it later (by fetching the remote and updating the
# local copy)
question = create.get_object()
if question.has_type(ap.ActivityType.QUESTION):
Tasks.fetch_remote_question(question)
self._handle_replies(as_actor, create)
@ensure_it_is_me
def _handle_replies_delete(
self, as_actor: ap.Person, in_reply_to: Optional[str]
) -> None:
@ -627,7 +395,6 @@ class MicroblogPubBackend(Backend):
return None
@ensure_it_is_me
def _handle_replies(self, as_actor: ap.Person, create: ap.Create) -> None:
"""Go up to the root reply, store unknown replies in the `threads` DB and set the "meta.thread_root_parent"
key to make it easy to query a whole thread."""

View file

@ -8,11 +8,11 @@ from little_boxes.errors import NotAnActivityError
import config
from core.activitypub import _answer_key
from core.activitypub import post_to_outbox
from core.db import DB
from core.meta import Box
from core.shared import MY_PERSON
from core.shared import back
from core.shared import post_to_outbox
from core.tasks import Tasks
from utils import now

View file

@ -1,11 +1,9 @@
import binascii
import os
from datetime import datetime
from datetime import timezone
from functools import wraps
from typing import Any
from typing import Dict
from urllib.parse import urljoin
import flask
from bson.objectid import ObjectId
@ -19,13 +17,10 @@ from little_boxes import activitypub as ap
from little_boxes.activitypub import format_datetime
from poussetaches import PousseTaches
from config import BASE_URL
from config import DB
from config import ME
from core import activitypub
from core.activitypub import _answer_key
from core.meta import Box
from core.tasks import Tasks
# _Response = Union[flask.Response, werkzeug.wrappers.Response, str, Any]
_Response = Any
@ -94,33 +89,6 @@ def _get_ip():
return ip, geoip
def activity_url(item_id: str) -> str:
return urljoin(BASE_URL, url_for("outbox_detail", item_id=item_id))
def post_to_outbox(activity: ap.BaseActivity) -> str:
if activity.has_type(ap.CREATE_TYPES):
activity = activity.build_create()
# Assign create a random ID
obj_id = binascii.hexlify(os.urandom(8)).decode("utf-8")
uri = activity_url(obj_id)
activity._data["id"] = uri
if activity.has_type(ap.ActivityType.CREATE):
activity._data["object"]["id"] = urljoin(
BASE_URL, url_for("outbox_activity", item_id=obj_id)
)
activity._data["object"]["url"] = urljoin(
BASE_URL, url_for("note_by_id", note_id=obj_id)
)
activity.reset_object_cache()
back.save(Box.OUTBOX, activity)
Tasks.cache_actor(activity.id)
Tasks.finish_post_to_outbox(activity.id)
return activity.id
def _build_thread(data, include_children=True): # noqa: C901
data["_requested"] = True
app.logger.info(f"_build_thread({data!r})")