Switch to new queries helper

This commit is contained in:
Thomas Sileo 2019-08-04 18:05:49 +02:00
parent 77e4f00171
commit 09f8de9d0d
2 changed files with 41 additions and 29 deletions

View file

@ -10,13 +10,14 @@ import config
from core.activitypub import _answer_key from core.activitypub import _answer_key
from core.activitypub import post_to_outbox from core.activitypub import post_to_outbox
from core.db import DB from core.db import DB
from core.meta import by_remote_id from core.db import update_one_activity
from core.meta import MetaKey from core.meta import MetaKey
from core.meta import inc
from core.meta import Box
from core.meta import in_outbox
from core.meta import by_object_id from core.meta import by_object_id
from core.meta import by_remote_id
from core.meta import by_type from core.meta import by_type
from core.meta import in_inbox
from core.meta import inc
from core.meta import upsert
from core.shared import MY_PERSON from core.shared import MY_PERSON
from core.shared import back from core.shared import back
from core.tasks import Tasks from core.tasks import Tasks
@ -53,13 +54,13 @@ def _delete_process_inbox(delete: ap.Delete, new_meta: _NewMeta) -> None:
except Exception: except Exception:
_logger.exception(f"failed to handle delete replies for {obj_id}") _logger.exception(f"failed to handle delete replies for {obj_id}")
DB.activities.update_one( update_one_activity(
{**by_object_id(obj_id), **by_type(ap.ActivityType.CREATE)}, {**by_object_id(obj_id), **by_type(ap.ActivityType.CREATE)},
{"$set": {"meta.deleted": True}}, upsert({MetaKey.DELETED: True}),
) )
# Foce undo other related activities # Foce undo other related activities
DB.activities.update({"meta.object_id": obj_id}, {"$set": {"meta.undo": True}}) DB.activities.update(by_object_id(obj_id), upsert({MetaKey.UNDO: True}))
@process_inbox.register @process_inbox.register
@ -67,7 +68,7 @@ def _update_process_inbox(update: ap.Update, new_meta: _NewMeta) -> None:
_logger.info(f"process_inbox activity={update!r}") _logger.info(f"process_inbox activity={update!r}")
obj = update.get_object() obj = update.get_object()
if obj.ACTIVITY_TYPE == ap.ActivityType.NOTE: if obj.ACTIVITY_TYPE == ap.ActivityType.NOTE:
DB.activities.update_one( update_one_activity(
{"activity.object.id": obj.id}, {"$set": {"activity.object": obj.to_dict()}} {"activity.object.id": obj.id}, {"$set": {"activity.object": obj.to_dict()}}
) )
elif obj.has_type(ap.ActivityType.QUESTION): elif obj.has_type(ap.ActivityType.QUESTION):
@ -82,12 +83,10 @@ def _update_process_inbox(update: ap.Update, new_meta: _NewMeta) -> None:
_set["meta.question_replies"] = total_replies _set["meta.question_replies"] = total_replies
DB.activities.update_one( update_one_activity({**in_inbox(), **by_object_id(obj.id)}, {"$set": _set})
{"box": Box.INBOX.value, "activity.object.id": obj.id}, {"$set": _set}
)
# Also update the cached copies of the question (like Announce and Like) # Also update the cached copies of the question (like Announce and Like)
DB.activities.update_many( DB.activities.update_many(
by_object_id(obj.id), {"$set": {"meta.object": obj.to_dict()}} by_object_id(obj.id), upsert({MetaKey.OBJECT: obj.to_dict()})
) )
# FIXME(tsileo): handle update actor amd inbox_update_note/inbox_update_actor # FIXME(tsileo): handle update actor amd inbox_update_note/inbox_update_actor
@ -121,16 +120,16 @@ def _announce_process_inbox(announce: ap.Announce, new_meta: _NewMeta) -> None:
if obj.has_type(ap.ActivityType.QUESTION): if obj.has_type(ap.ActivityType.QUESTION):
Tasks.fetch_remote_question(obj) Tasks.fetch_remote_question(obj)
DB.activities.update_one( update_one_activity(
by_remote_id(announce.id), by_remote_id(announce.id),
{ upsert(
"$set": { {
"meta.object": obj.to_dict(embed=True), MetaKey.OBJECT: obj.to_dict(embed=True),
"meta.object_actor": obj.get_actor().to_dict(embed=True), MetaKey.OBJECT_ACTOR: obj.get_actor().to_dict(embed=True),
} }
}, ),
) )
DB.activities.update_one( update_one_activity(
{**by_type(ap.ActivityType.CREATE), **by_object_id(obj.id)}, {**by_type(ap.ActivityType.CREATE), **by_object_id(obj.id)},
inc(MetaKey.COUNT_BOOST, 1), inc(MetaKey.COUNT_BOOST, 1),
) )
@ -141,8 +140,9 @@ def _like_process_inbox(like: ap.Like, new_meta: _NewMeta) -> None:
_logger.info(f"process_inbox activity={like!r}") _logger.info(f"process_inbox activity={like!r}")
obj = like.get_object() obj = like.get_object()
# Update the meta counter if the object is published by the server # Update the meta counter if the object is published by the server
DB.activities.update_one( update_one_activity(
{**in_outbox(), **by_object_id(obj.id)}, inc(MetaKey.COUNT_LIKE, 1) {**by_type(ap.ActivityType.CREATE), **by_object_id(obj.id)},
inc(MetaKey.COUNT_LIKE, 1),
) )
@ -168,22 +168,23 @@ def _follow_process_inbox(activity: ap.Follow, new_meta: _NewMeta) -> None:
@process_inbox.register @process_inbox.register
def _undo_process_inbox(activity: ap.Undo, new_meta: _NewMeta) -> None: def _undo_process_inbox(activity: ap.Undo, new_meta: _NewMeta) -> None:
_logger.info(f"process_inbox activity={activity!r}") _logger.info(f"process_inbox activity={activity!r}")
# Fetch the object that's been undo'ed
obj = activity.get_object() obj = activity.get_object()
DB.activities.update_one({"remote_id": obj.id}, {"$set": {"meta.undo": True}})
# Set the undo flag on the mentionned activity
update_one_activity(by_remote_id(obj.id), upsert({MetaKey.UNDO: True}))
# Handle cached counters
if obj.has_type(ap.ActivityType.LIKE): if obj.has_type(ap.ActivityType.LIKE):
# Update the meta counter if the object is published by the server # Update the meta counter if the object is published by the server
DB.activities.update_one( update_one_activity(
{ {**by_object_id(obj.get_object_id()), **by_type(ap.ActivityType.CREATE)},
**in_outbox(),
**by_object_id(obj.get_object_id()),
**by_type(ap.ActivityType.CREATE),
},
inc(MetaKey.COUNT_LIKE, -1), inc(MetaKey.COUNT_LIKE, -1),
) )
elif obj.has_type(ap.ActivityType.ANNOUNCE): elif obj.has_type(ap.ActivityType.ANNOUNCE):
announced = obj.get_object() announced = obj.get_object()
# Update the meta counter if the object is published by the server # Update the meta counter if the object is published by the server
DB.activities.update_one( update_one_activity(
{**by_type(ap.ActivityType.CREATE), **by_object_id(announced.id)}, {**by_type(ap.ActivityType.CREATE), **by_object_id(announced.id)},
inc(MetaKey.COUNT_BOOST, -1), inc(MetaKey.COUNT_BOOST, -1),
) )

View file

@ -31,6 +31,8 @@ class MetaKey(Enum):
OBJECT_ACTOR = "object_actor" OBJECT_ACTOR = "object_actor"
PUBLIC = "public" PUBLIC = "public"
DELETED = "deleted"
COUNT_LIKE = "count_like" COUNT_LIKE = "count_like"
COUNT_BOOST = "count_boost" COUNT_BOOST = "count_boost"
@ -73,3 +75,12 @@ def is_public() -> _SubQuery:
def inc(mk: MetaKey, val: int) -> _SubQuery: def inc(mk: MetaKey, val: int) -> _SubQuery:
return {"$inc": {_meta(mk): val}} return {"$inc": {_meta(mk): val}}
def upsert(data: Dict[MetaKey, Any]) -> _SubQuery:
sq: Dict[str, Any] = {}
for mk, val in data.items():
sq[_meta(mk)] = val
return {"$set": sq}